More work to improve documentation generated by sphinx. Also fixes
[pyutils.git] / src / pyutils / parallelize / thread_utils.py
1 #!/usr/bin/env python3
2
3 # © Copyright 2021-2022, Scott Gasch
4
5 """Utilities for dealing with threads + threading."""
6
7 import functools
8 import logging
9 import os
10 import threading
11 from typing import Any, Callable, Optional, Tuple
12
13 # This module is commonly used by others in here and should avoid
14 # taking any unnecessary dependencies back on them.
15
16 logger = logging.getLogger(__name__)
17
18
19 def current_thread_id() -> str:
20     """
21     Returns:
22         a string composed of the parent process' id, the current
23         process' id and the current thread identifier.  The former two are
24         numbers (pids) whereas the latter is a thread id passed during thread
25         creation time.
26
27     >>> ret = current_thread_id()
28     >>> (ppid, pid, tid) = ret.split('/')
29     >>> ppid.isnumeric()
30     True
31     >>> pid.isnumeric()
32     True
33
34     """
35     ppid = os.getppid()
36     pid = os.getpid()
37     tid = threading.current_thread().name
38     return f'{ppid}/{pid}/{tid}:'
39
40
41 def is_current_thread_main_thread() -> bool:
42     """
43     Returns:
44         True is the current (calling) thread is the process' main
45         thread and False otherwise.
46
47     >>> is_current_thread_main_thread()
48     True
49
50     >>> result = None
51     >>> def thunk():
52     ...     global result
53     ...     result = is_current_thread_main_thread()
54
55     >>> thunk()
56     >>> result
57     True
58
59     >>> import threading
60     >>> thread = threading.Thread(target=thunk)
61     >>> thread.start()
62     >>> thread.join()
63     >>> result
64     False
65
66     """
67     return threading.current_thread() is threading.main_thread()
68
69
70 def background_thread(
71     _funct: Optional[Callable[..., Any]],
72 ) -> Callable[..., Tuple[threading.Thread, threading.Event]]:
73     """A function decorator to create a background thread.
74
75     Args:
76         _funct: The function being wrapped such that it is invoked
77             on a background thread.
78
79     Example usage::
80
81         @background_thread
82         def random(a: int, b: str, stop_event: threading.Event) -> None:
83             while True:
84                 print(f"Hi there {b}: {a}!")
85                 time.sleep(10.0)
86                 if stop_event.is_set():
87                     return
88
89         def main() -> None:
90             (thread, event) = random(22, "dude")
91             print("back!")
92             time.sleep(30.0)
93             event.set()
94             thread.join()
95
96     .. warning::
97
98         In addition to any other arguments the function has, it must
99         take a stop_event as the last unnamed argument which it should
100         periodically check.  If the event is set, it means the thread has
101         been requested to terminate ASAP.
102     """
103
104     def wrapper(funct: Callable):
105         @functools.wraps(funct)
106         def inner_wrapper(*a, **kwa) -> Tuple[threading.Thread, threading.Event]:
107             should_terminate = threading.Event()
108             should_terminate.clear()
109             newargs = (*a, should_terminate)
110             thread = threading.Thread(
111                 target=funct,
112                 args=newargs,
113                 kwargs=kwa,
114             )
115             thread.start()
116             logger.debug('Started thread "%s" tid=%d', thread.name, thread.ident)
117             return (thread, should_terminate)
118
119         return inner_wrapper
120
121     if _funct is None:
122         return wrapper  # type: ignore
123     else:
124         return wrapper(_funct)
125
126
127 class ThreadWithReturnValue(threading.Thread):
128     """A thread whose return value is plumbed back out as the return
129     value of :meth:`join`.
130     """
131
132     def __init__(
133         self, group=None, target=None, name=None, args=(), kwargs={}, Verbose=None
134     ):
135         threading.Thread.__init__(
136             self, group=None, target=target, name=None, args=args, kwargs=kwargs
137         )
138         self._target = target
139         self._return = None
140
141     def run(self):
142         if self._target is not None:
143             self._return = self._target(*self._args, **self._kwargs)
144
145     def join(self, *args):
146         threading.Thread.join(self, *args)
147         return self._return
148
149
150 def periodically_invoke(
151     period_sec: float,
152     stop_after: Optional[int],
153 ):
154     """
155     Periodically invoke the decorated function.
156
157     Args:
158         period_sec: the delay period in seconds between invocations
159         stop_after: total number of invocations to make or, if None,
160             call forever
161
162     Returns:
163         a :class:`Thread` object and an :class:`Event` that, when
164         signaled, will stop the invocations.
165
166     .. note::
167         It is possible to be invoked one time after the :class:`Event`
168         is set.  This event can be used to stop infinite
169         invocation style or finite invocation style decorations.
170
171     Usage::
172
173         @periodically_invoke(period_sec=0.5, stop_after=None)
174         def there(name: str, age: int) -> None:
175             print(f"   ...there {name}, {age}")
176
177         @periodically_invoke(period_sec=1.0, stop_after=3)
178         def hello(name: str) -> None:
179             print(f"Hello, {name}")
180     """
181
182     def decorator_repeat(func):
183         def helper_thread(should_terminate, *args, **kwargs) -> None:
184             if stop_after is None:
185                 while True:
186                     func(*args, **kwargs)
187                     res = should_terminate.wait(period_sec)
188                     if res:
189                         return
190             else:
191                 for _ in range(stop_after):
192                     func(*args, **kwargs)
193                     res = should_terminate.wait(period_sec)
194                     if res:
195                         return
196                 return
197
198         @functools.wraps(func)
199         def wrapper_repeat(*args, **kwargs):
200             should_terminate = threading.Event()
201             should_terminate.clear()
202             newargs = (should_terminate, *args)
203             thread = threading.Thread(target=helper_thread, args=newargs, kwargs=kwargs)
204             thread.start()
205             logger.debug('Started thread "%s" tid=%d', thread.name, thread.ident)
206             return (thread, should_terminate)
207
208         return wrapper_repeat
209
210     return decorator_repeat
211
212
213 if __name__ == '__main__':
214     import doctest
215
216     doctest.testmod()