3 # © Copyright 2021-2023, Scott Gasch
5 """Utilities for dealing with threads + threading."""
11 from typing import Any, Callable, Optional, Tuple
13 from pyutils.typez.typing import Runnable
15 # This module is commonly used by others in here and should avoid
16 # taking any unnecessary dependencies back on them.
18 logger = logging.getLogger(__name__)
21 def current_thread_id() -> str:
24 A string composed of the parent process' id, the
25 current process' id and the current thread name that can be used
26 as a unique identifier for the current thread. The former two are
27 numbers (pids) whereas the latter is a thread id passed during
30 >>> from pyutils.parallelize import thread_utils
31 >>> ret = thread_utils.current_thread_id()
32 >>> ret # doctest: +SKIP
33 '76891/84444/MainThread:'
34 >>> (ppid, pid, tid) = ret.split('/')
42 tid = threading.current_thread().name
43 return f"{ppid}/{pid}/{tid}:"
46 def is_current_thread_main_thread() -> bool:
49 True is the current (calling) thread is the process' main
50 thread and False otherwise.
52 >>> from pyutils.parallelize import thread_utils
53 >>> thread_utils.is_current_thread_main_thread()
57 >>> def am_i_the_main_thread():
59 ... result = thread_utils.is_current_thread_main_thread()
61 >>> am_i_the_main_thread()
66 >>> thread = threading.Thread(target=am_i_the_main_thread)
72 return threading.current_thread() is threading.main_thread()
75 def background_thread(
76 _funct: Optional[Callable[..., Any]],
77 ) -> Callable[..., Tuple[threading.Thread, threading.Event]]:
78 """A function decorator to create a background thread.
81 _funct: The function being wrapped such that it is invoked
82 on a background thread.
89 from pyutils.parallelize import thread_utils
91 @thread_utils.background_thread
92 def random(a: int, b: str, stop_event: threading.Event) -> None:
94 print(f"Hi there {b}: {a}!")
96 if stop_event.is_set():
100 (thread, event) = random(22, "dude")
108 In addition to any other arguments the function has, it must
109 take a stop_event as the last unnamed argument which it should
110 periodically check. If the event is set, it means the thread has
111 been requested to terminate ASAP.
114 def wrapper(funct: Callable):
115 @functools.wraps(funct)
116 def inner_wrapper(*a, **kwa) -> Tuple[threading.Thread, threading.Event]:
117 should_terminate = threading.Event()
118 should_terminate.clear()
119 newargs = (*a, should_terminate)
120 thread = threading.Thread(
126 logger.debug('Started thread "%s" tid=%d', thread.name, thread.ident)
127 return (thread, should_terminate)
132 return wrapper # type: ignore
134 return wrapper(_funct)
137 class ThreadWithReturnValue(threading.Thread, Runnable):
138 """A thread whose return value is plumbed back out as the return
139 value of :meth:`join`. Use like a normal thread::
143 from pyutils.parallelize import thread_utils
145 def thread_entry_point(args):
146 # do something interesting...
149 if __name__ == "__main__":
150 thread = thread_utils.ThreadWithReturnValue(
151 target=thread_entry_point,
155 result = thread.join()
156 print(f"thread finished and returned {result}")
161 self, group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None
163 threading.Thread.__init__(
172 self._target = target
175 self._kwargs = kwargs
177 def run(self) -> None:
178 """Create a little wrapper around invoking the real thread entry
179 point so we can pay attention to its return value."""
180 if self._target is not None:
181 self._return = self._target(*self._args, **self._kwargs)
183 def join(self, *args) -> Any:
184 """Wait until the thread terminates and return the value it terminated with
185 as the result of join.
187 Like normal :meth:`join`, this blocks the calling thread until
188 the thread whose :meth:`join` is called terminates – either
189 normally or through an unhandled exception or until the
190 optional timeout occurs.
192 When the timeout argument is present and not None, it should
193 be a floating point number specifying a timeout for the
194 operation in seconds (or fractions thereof).
196 When the timeout argument is not present or None, the
197 operation will block until the thread terminates.
199 A thread can be joined many times.
202 RuntimeError: an attempt is made to join the current thread
203 as that would cause a deadlock. It is also an error to join
204 a thread before it has been started and attempts to do so
205 raises the same exception.
207 threading.Thread.join(self, *args)
211 def periodically_invoke(
213 stop_after: Optional[int],
216 Periodically invoke the decorated function on a background thread.
219 period_sec: the delay period in seconds between invocations
220 stop_after: total number of invocations to make or, if None,
224 a :class:`Thread` object and an :class:`Event` that, when
225 signaled, will stop the invocations.
228 It is possible to be invoked one time after the :class:`Event`
229 is set. This event can be used to stop infinite
230 invocation style or finite invocation style decorations.
234 from pyutils.parallelize import thread_utils
236 @thread_utils.periodically_invoke(period_sec=1.0, stop_after=3)
237 def hello(name: str) -> None:
238 print(f"Hello, {name}")
240 @thread_utils.periodically_invoke(period_sec=0.5, stop_after=None)
241 def there(name: str, age: int) -> None:
242 print(f" ...there {name}, {age}")
244 Usage as a decorator doesn't give you access to the returned stop event or
245 thread object. To get those, wrap your periodic function manually::
247 from pyutils.parallelize import thread_utils
249 def periodic(m: str) -> None:
252 f = thread_utils.periodically_invoke(period_sec=5.0, stop_after=None)(periodic)
253 thread, event = f("testing")
258 See also :mod:`pyutils.state_tracker`.
261 def decorator_repeat(func):
262 def helper_thread(should_terminate, *args, **kwargs) -> None:
263 if stop_after is None:
265 func(*args, **kwargs)
266 res = should_terminate.wait(period_sec)
270 for _ in range(stop_after):
271 func(*args, **kwargs)
272 res = should_terminate.wait(period_sec)
277 @functools.wraps(func)
278 def wrapper_repeat(*args, **kwargs):
279 should_terminate = threading.Event()
280 should_terminate.clear()
281 newargs = (should_terminate, *args)
282 thread = threading.Thread(target=helper_thread, args=newargs, kwargs=kwargs)
284 logger.debug('Started thread "%s" tid=%d', thread.name, thread.ident)
285 return (thread, should_terminate)
287 return wrapper_repeat
289 return decorator_repeat
292 if __name__ == "__main__":