3 # © Copyright 2021-2023, Scott Gasch
5 """Utilities for dealing with threads + threading."""
11 from typing import Any, Callable, Optional, Tuple
13 # This module is commonly used by others in here and should avoid
14 # taking any unnecessary dependencies back on them.
16 logger = logging.getLogger(__name__)
19 def current_thread_id() -> str:
22 A string composed of the parent process' id, the
23 current process' id and the current thread name that can be used
24 as a unique identifier for the current thread. The former two are
25 numbers (pids) whereas the latter is a thread id passed during
28 >>> from pyutils.parallelize import thread_utils
29 >>> ret = thread_utils.current_thread_id()
30 >>> ret # doctest: +SKIP
31 '76891/84444/MainThread:'
32 >>> (ppid, pid, tid) = ret.split('/')
40 tid = threading.current_thread().name
41 return f"{ppid}/{pid}/{tid}:"
44 def is_current_thread_main_thread() -> bool:
47 True is the current (calling) thread is the process' main
48 thread and False otherwise.
50 >>> from pyutils.parallelize import thread_utils
51 >>> thread_utils.is_current_thread_main_thread()
55 >>> def am_i_the_main_thread():
57 ... result = thread_utils.is_current_thread_main_thread()
59 >>> am_i_the_main_thread()
64 >>> thread = threading.Thread(target=am_i_the_main_thread)
70 return threading.current_thread() is threading.main_thread()
73 def background_thread(
74 _funct: Optional[Callable[..., Any]],
75 ) -> Callable[..., Tuple[threading.Thread, threading.Event]]:
76 """A function decorator to create a background thread.
79 _funct: The function being wrapped such that it is invoked
80 on a background thread.
87 from pyutils.parallelize import thread_utils
89 @thread_utils.background_thread
90 def random(a: int, b: str, stop_event: threading.Event) -> None:
92 print(f"Hi there {b}: {a}!")
94 if stop_event.is_set():
98 (thread, event) = random(22, "dude")
106 In addition to any other arguments the function has, it must
107 take a stop_event as the last unnamed argument which it should
108 periodically check. If the event is set, it means the thread has
109 been requested to terminate ASAP.
112 def wrapper(funct: Callable):
113 @functools.wraps(funct)
114 def inner_wrapper(*a, **kwa) -> Tuple[threading.Thread, threading.Event]:
115 should_terminate = threading.Event()
116 should_terminate.clear()
117 newargs = (*a, should_terminate)
118 thread = threading.Thread(
124 logger.debug('Started thread "%s" tid=%d', thread.name, thread.ident)
125 return (thread, should_terminate)
130 return wrapper # type: ignore
132 return wrapper(_funct)
135 class ThreadWithReturnValue(threading.Thread):
136 """A thread whose return value is plumbed back out as the return
137 value of :meth:`join`. Use like a normal thread::
141 from pyutils.parallelize import thread_utils
143 def thread_entry_point(args):
144 # do something interesting...
147 if __name__ == "__main__":
148 thread = thread_utils.ThreadWithReturnValue(
149 target=thread_entry_point,
153 result = thread.join()
154 print(f"thread finished and returned {result}")
159 self, group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None
161 threading.Thread.__init__(
170 self._target = target
173 self._kwargs = kwargs
175 def run(self) -> None:
176 """Create a little wrapper around invoking the real thread entry
177 point so we can pay attention to its return value."""
178 if self._target is not None:
179 self._return = self._target(*self._args, **self._kwargs)
181 def join(self, *args) -> Any:
182 """Wait until the thread terminates and return the value it terminated with
183 as the result of join.
185 Like normal :meth:`join`, this blocks the calling thread until
186 the thread whose :meth:`join` is called terminates – either
187 normally or through an unhandled exception or until the
188 optional timeout occurs.
190 When the timeout argument is present and not None, it should
191 be a floating point number specifying a timeout for the
192 operation in seconds (or fractions thereof).
194 When the timeout argument is not present or None, the
195 operation will block until the thread terminates.
197 A thread can be joined many times.
199 :meth:`join` raises a RuntimeError if an attempt is made to join the
200 current thread as that would cause a deadlock. It is also an
201 error to join a thread before it has been started and
202 attempts to do so raises the same exception.
204 threading.Thread.join(self, *args)
208 def periodically_invoke(
210 stop_after: Optional[int],
213 Periodically invoke the decorated function on a background thread.
216 period_sec: the delay period in seconds between invocations
217 stop_after: total number of invocations to make or, if None,
221 a :class:`Thread` object and an :class:`Event` that, when
222 signaled, will stop the invocations.
225 It is possible to be invoked one time after the :class:`Event`
226 is set. This event can be used to stop infinite
227 invocation style or finite invocation style decorations.
231 from pyutils.parallelize import thread_utils
233 @thread_utils.periodically_invoke(period_sec=1.0, stop_after=3)
234 def hello(name: str) -> None:
235 print(f"Hello, {name}")
237 @thread_utils.periodically_invoke(period_sec=0.5, stop_after=None)
238 def there(name: str, age: int) -> None:
239 print(f" ...there {name}, {age}")
241 Usage as a decorator doesn't give you access to the returned stop event or
242 thread object. To get those, wrap your periodic function manually::
244 from pyutils.parallelize import thread_utils
246 def periodic(m: str) -> None:
249 f = thread_utils.periodically_invoke(period_sec=5.0, stop_after=None)(periodic)
250 thread, event = f("testing")
255 See also :mod:`pyutils.state_tracker`.
258 def decorator_repeat(func):
259 def helper_thread(should_terminate, *args, **kwargs) -> None:
260 if stop_after is None:
262 func(*args, **kwargs)
263 res = should_terminate.wait(period_sec)
267 for _ in range(stop_after):
268 func(*args, **kwargs)
269 res = should_terminate.wait(period_sec)
274 @functools.wraps(func)
275 def wrapper_repeat(*args, **kwargs):
276 should_terminate = threading.Event()
277 should_terminate.clear()
278 newargs = (should_terminate, *args)
279 thread = threading.Thread(target=helper_thread, args=newargs, kwargs=kwargs)
281 logger.debug('Started thread "%s" tid=%d', thread.name, thread.ident)
282 return (thread, should_terminate)
284 return wrapper_repeat
286 return decorator_repeat
289 if __name__ == "__main__":