3 # © Copyright 2021-2023, Scott Gasch
5 """Utilities for dealing with threads + threading."""
11 from typing import Any, Callable, Optional, Tuple
13 from pyutils.typez.typing import Runnable
15 # This module is commonly used by others in here and should avoid
16 # taking any unnecessary dependencies back on them.
18 logger = logging.getLogger(__name__)
21 def current_thread_id() -> str:
24 A string composed of the parent process' id, the
25 current process' id and the current thread name that can be used
26 as a unique identifier for the current thread. The former two are
27 numbers (pids) whereas the latter is a thread id passed during
30 >>> from pyutils.parallelize import thread_utils
31 >>> ret = thread_utils.current_thread_id()
32 >>> ret # doctest: +SKIP
33 '76891/84444/MainThread:'
34 >>> (ppid, pid, tid) = ret.split('/')
42 tid = threading.current_thread().name
43 return f"{ppid}/{pid}/{tid}:"
46 def is_current_thread_main_thread() -> bool:
49 True is the current (calling) thread is the process' main
50 thread and False otherwise.
52 >>> from pyutils.parallelize import thread_utils
53 >>> thread_utils.is_current_thread_main_thread()
57 >>> def am_i_the_main_thread():
59 ... result = thread_utils.is_current_thread_main_thread()
61 >>> am_i_the_main_thread()
66 >>> thread = threading.Thread(target=am_i_the_main_thread)
72 return threading.current_thread() is threading.main_thread()
75 def background_thread(
76 _funct: Optional[Callable[..., Any]],
77 ) -> Callable[..., Tuple[threading.Thread, threading.Event]]:
78 """A function decorator to create a background thread.
81 _funct: The function being wrapped such that it is invoked
82 on a background thread.
89 from pyutils.parallelize import thread_utils
91 @thread_utils.background_thread
92 def random(a: int, b: str, stop_event: threading.Event) -> None:
94 print(f"Hi there {b}: {a}!")
96 if stop_event.is_set():
100 (thread, event) = random(22, "dude")
108 In addition to any other arguments the function has, it must
109 take a stop_event as the last unnamed argument which it should
110 periodically check. If the event is set, it means the thread has
111 been requested to terminate ASAP.
114 def wrapper(funct: Callable):
115 @functools.wraps(funct)
116 def inner_wrapper(*a, **kwa) -> Tuple[threading.Thread, threading.Event]:
117 should_terminate = threading.Event()
118 should_terminate.clear()
119 newargs = (*a, should_terminate)
120 thread = threading.Thread(
126 logger.debug('Started thread "%s" tid=%d', thread.name, thread.ident)
127 return (thread, should_terminate)
132 return wrapper # type: ignore
134 return wrapper(_funct)
137 class ThreadWithReturnValue(threading.Thread, Runnable):
138 """A thread whose return value is plumbed back out as the return
139 value of :meth:`join`. Use like a normal thread::
143 from pyutils.parallelize import thread_utils
145 def thread_entry_point(args):
146 # do something interesting...
149 if __name__ == "__main__":
150 thread = thread_utils.ThreadWithReturnValue(
151 target=thread_entry_point,
155 result = thread.join()
156 print(f"thread finished and returned {result}")
161 self, group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None
163 threading.Thread.__init__(
172 self._target = target
175 self._kwargs = kwargs
177 def run(self) -> None:
178 """Create a little wrapper around invoking the real thread entry
179 point so we can pay attention to its return value."""
180 if self._target is not None:
181 self._return = self._target(*self._args, **self._kwargs)
183 def join(self, *args) -> Any:
184 """Wait until the thread terminates and return the value it terminated with
185 as the result of join.
187 Like normal :meth:`join`, this blocks the calling thread until
188 the thread whose :meth:`join` is called terminates – either
189 normally or through an unhandled exception or until the
190 optional timeout occurs.
192 When the timeout argument is present and not None, it should
193 be a floating point number specifying a timeout for the
194 operation in seconds (or fractions thereof).
196 When the timeout argument is not present or None, the
197 operation will block until the thread terminates.
199 A thread can be joined many times.
201 :meth:`join` raises a RuntimeError if an attempt is made to join the
202 current thread as that would cause a deadlock. It is also an
203 error to join a thread before it has been started and
204 attempts to do so raises the same exception.
206 threading.Thread.join(self, *args)
210 def periodically_invoke(
212 stop_after: Optional[int],
215 Periodically invoke the decorated function on a background thread.
218 period_sec: the delay period in seconds between invocations
219 stop_after: total number of invocations to make or, if None,
223 a :class:`Thread` object and an :class:`Event` that, when
224 signaled, will stop the invocations.
227 It is possible to be invoked one time after the :class:`Event`
228 is set. This event can be used to stop infinite
229 invocation style or finite invocation style decorations.
233 from pyutils.parallelize import thread_utils
235 @thread_utils.periodically_invoke(period_sec=1.0, stop_after=3)
236 def hello(name: str) -> None:
237 print(f"Hello, {name}")
239 @thread_utils.periodically_invoke(period_sec=0.5, stop_after=None)
240 def there(name: str, age: int) -> None:
241 print(f" ...there {name}, {age}")
243 Usage as a decorator doesn't give you access to the returned stop event or
244 thread object. To get those, wrap your periodic function manually::
246 from pyutils.parallelize import thread_utils
248 def periodic(m: str) -> None:
251 f = thread_utils.periodically_invoke(period_sec=5.0, stop_after=None)(periodic)
252 thread, event = f("testing")
257 See also :mod:`pyutils.state_tracker`.
260 def decorator_repeat(func):
261 def helper_thread(should_terminate, *args, **kwargs) -> None:
262 if stop_after is None:
264 func(*args, **kwargs)
265 res = should_terminate.wait(period_sec)
269 for _ in range(stop_after):
270 func(*args, **kwargs)
271 res = should_terminate.wait(period_sec)
276 @functools.wraps(func)
277 def wrapper_repeat(*args, **kwargs):
278 should_terminate = threading.Event()
279 should_terminate.clear()
280 newargs = (should_terminate, *args)
281 thread = threading.Thread(target=helper_thread, args=newargs, kwargs=kwargs)
283 logger.debug('Started thread "%s" tid=%d', thread.name, thread.ident)
284 return (thread, should_terminate)
286 return wrapper_repeat
288 return decorator_repeat
291 if __name__ == "__main__":