Migration from old pyutilz package name (which, in turn, came from
[pyutils.git] / src / pyutils / parallelize / thread_utils.py
1 #!/usr/bin/env python3
2
3 # © Copyright 2021-2022, Scott Gasch
4
5 """Utilities for dealing with threads + threading."""
6
7 import functools
8 import logging
9 import os
10 import threading
11 from typing import Any, Callable, Optional, Tuple
12
13 # This module is commonly used by others in here and should avoid
14 # taking any unnecessary dependencies back on them.
15
16 logger = logging.getLogger(__name__)
17
18
19 def current_thread_id() -> str:
20     """
21     Returns:
22         a string composed of the parent process' id, the current
23         process' id and the current thread identifier.  The former two are
24         numbers (pids) whereas the latter is a thread id passed during thread
25         creation time.
26
27     >>> ret = current_thread_id()
28     >>> (ppid, pid, tid) = ret.split('/')
29     >>> ppid.isnumeric()
30     True
31     >>> pid.isnumeric()
32     True
33
34     """
35     ppid = os.getppid()
36     pid = os.getpid()
37     tid = threading.current_thread().name
38     return f'{ppid}/{pid}/{tid}:'
39
40
41 def is_current_thread_main_thread() -> bool:
42     """
43     Returns:
44         True is the current (calling) thread is the process' main
45         thread and False otherwise.
46
47     >>> is_current_thread_main_thread()
48     True
49
50     >>> result = None
51     >>> def thunk():
52     ...     global result
53     ...     result = is_current_thread_main_thread()
54
55     >>> thunk()
56     >>> result
57     True
58
59     >>> import threading
60     >>> thread = threading.Thread(target=thunk)
61     >>> thread.start()
62     >>> thread.join()
63     >>> result
64     False
65
66     """
67     return threading.current_thread() is threading.main_thread()
68
69
70 def background_thread(
71     _funct: Optional[Callable[..., Any]],
72 ) -> Callable[..., Tuple[threading.Thread, threading.Event]]:
73     """A function decorator to create a background thread.
74
75     Usage::
76
77         @background_thread
78         def random(a: int, b: str, stop_event: threading.Event) -> None:
79             while True:
80                 print(f"Hi there {b}: {a}!")
81                 time.sleep(10.0)
82                 if stop_event.is_set():
83                     return
84
85         def main() -> None:
86             (thread, event) = random(22, "dude")
87             print("back!")
88             time.sleep(30.0)
89             event.set()
90             thread.join()
91
92     .. warning::
93
94         In addition to any other arguments the function has, it must
95         take a stop_event as the last unnamed argument which it should
96         periodically check.  If the event is set, it means the thread has
97         been requested to terminate ASAP.
98     """
99
100     def wrapper(funct: Callable):
101         @functools.wraps(funct)
102         def inner_wrapper(*a, **kwa) -> Tuple[threading.Thread, threading.Event]:
103             should_terminate = threading.Event()
104             should_terminate.clear()
105             newargs = (*a, should_terminate)
106             thread = threading.Thread(
107                 target=funct,
108                 args=newargs,
109                 kwargs=kwa,
110             )
111             thread.start()
112             logger.debug('Started thread "%s" tid=%d', thread.name, thread.ident)
113             return (thread, should_terminate)
114
115         return inner_wrapper
116
117     if _funct is None:
118         return wrapper  # type: ignore
119     else:
120         return wrapper(_funct)
121
122
123 class ThreadWithReturnValue(threading.Thread):
124     """A thread whose return value is plumbed back out as the return
125     value of :meth:`join`.
126     """
127
128     def __init__(
129         self, group=None, target=None, name=None, args=(), kwargs={}, Verbose=None
130     ):
131         threading.Thread.__init__(
132             self, group=None, target=target, name=None, args=args, kwargs=kwargs
133         )
134         self._target = target
135         self._return = None
136
137     def run(self):
138         if self._target is not None:
139             self._return = self._target(*self._args, **self._kwargs)
140
141     def join(self, *args):
142         threading.Thread.join(self, *args)
143         return self._return
144
145
146 def periodically_invoke(
147     period_sec: float,
148     stop_after: Optional[int],
149 ):
150     """
151     Periodically invoke the decorated function.
152
153     Args:
154         period_sec: the delay period in seconds between invocations
155         stop_after: total number of invocations to make or, if None,
156             call forever
157
158     Returns:
159         a :class:`Thread` object and an :class:`Event` that, when
160         signaled, will stop the invocations.
161
162     .. note::
163         It is possible to be invoked one time after the :class:`Event`
164         is set.  This event can be used to stop infinite
165         invocation style or finite invocation style decorations.
166
167     Usage::
168
169         @periodically_invoke(period_sec=0.5, stop_after=None)
170         def there(name: str, age: int) -> None:
171             print(f"   ...there {name}, {age}")
172
173         @periodically_invoke(period_sec=1.0, stop_after=3)
174         def hello(name: str) -> None:
175             print(f"Hello, {name}")
176     """
177
178     def decorator_repeat(func):
179         def helper_thread(should_terminate, *args, **kwargs) -> None:
180             if stop_after is None:
181                 while True:
182                     func(*args, **kwargs)
183                     res = should_terminate.wait(period_sec)
184                     if res:
185                         return
186             else:
187                 for _ in range(stop_after):
188                     func(*args, **kwargs)
189                     res = should_terminate.wait(period_sec)
190                     if res:
191                         return
192                 return
193
194         @functools.wraps(func)
195         def wrapper_repeat(*args, **kwargs):
196             should_terminate = threading.Event()
197             should_terminate.clear()
198             newargs = (should_terminate, *args)
199             thread = threading.Thread(target=helper_thread, args=newargs, kwargs=kwargs)
200             thread.start()
201             logger.debug('Started thread "%s" tid=%d', thread.name, thread.ident)
202             return (thread, should_terminate)
203
204         return wrapper_repeat
205
206     return decorator_repeat
207
208
209 if __name__ == '__main__':
210     import doctest
211
212     doctest.testmod()