3 # © Copyright 2021-2022, Scott Gasch
5 """Utilities for dealing with threads + threading."""
11 from typing import Any, Callable, Optional, Tuple
13 # This module is commonly used by others in here and should avoid
14 # taking any unnecessary dependencies back on them.
16 logger = logging.getLogger(__name__)
19 def current_thread_id() -> str:
20 """Returns a string composed of the parent process' id, the current
21 process' id and the current thread identifier. The former two are
22 numbers (pids) whereas the latter is a thread id passed during thread
25 >>> ret = current_thread_id()
26 >>> (ppid, pid, tid) = ret.split('/')
35 tid = threading.current_thread().name
36 return f'{ppid}/{pid}/{tid}:'
39 def is_current_thread_main_thread() -> bool:
40 """Returns True is the current (calling) thread is the process' main
41 thread and False otherwise.
43 >>> is_current_thread_main_thread()
49 ... result = is_current_thread_main_thread()
56 >>> thread = threading.Thread(target=thunk)
63 return threading.current_thread() is threading.main_thread()
66 def background_thread(
67 _funct: Optional[Callable[..., Any]],
68 ) -> Callable[..., Tuple[threading.Thread, threading.Event]]:
69 """A function decorator to create a background thread.
71 *** Please note: the decorated function must take an shutdown ***
72 *** event as an input parameter and should periodically check ***
73 *** it and stop if the event is set. ***
78 def random(a: int, b: str, stop_event: threading.Event) -> None:
80 print(f"Hi there {b}: {a}!")
82 if stop_event.is_set():
86 (thread, event) = random(22, "dude")
92 Note: in addition to any other arguments the function has, it must
93 take a stop_event as the last unnamed argument which it should
94 periodically check. If the event is set, it means the thread has
95 been requested to terminate ASAP.
98 def wrapper(funct: Callable):
99 @functools.wraps(funct)
100 def inner_wrapper(*a, **kwa) -> Tuple[threading.Thread, threading.Event]:
101 should_terminate = threading.Event()
102 should_terminate.clear()
103 newargs = (*a, should_terminate)
104 thread = threading.Thread(
110 logger.debug('Started thread "%s" tid=%d', thread.name, thread.ident)
111 return (thread, should_terminate)
116 return wrapper # type: ignore
118 return wrapper(_funct)
121 def periodically_invoke(
123 stop_after: Optional[int],
126 Periodically invoke a decorated function. Stop after N invocations
127 (or, if stop_after is None, call forever). Delay period_sec between
130 Returns a Thread object and an Event that, when signaled, will stop
131 the invocations. Note that it is possible to be invoked one time
132 after the Event is set. This event can be used to stop infinite
133 invocation style or finite invocation style decorations.::
135 @periodically_invoke(period_sec=0.5, stop_after=None)
136 def there(name: str, age: int) -> None:
137 print(f" ...there {name}, {age}")
139 @periodically_invoke(period_sec=1.0, stop_after=3)
140 def hello(name: str) -> None:
141 print(f"Hello, {name}")
145 def decorator_repeat(func):
146 def helper_thread(should_terminate, *args, **kwargs) -> None:
147 if stop_after is None:
149 func(*args, **kwargs)
150 res = should_terminate.wait(period_sec)
154 for _ in range(stop_after):
155 func(*args, **kwargs)
156 res = should_terminate.wait(period_sec)
161 @functools.wraps(func)
162 def wrapper_repeat(*args, **kwargs):
163 should_terminate = threading.Event()
164 should_terminate.clear()
165 newargs = (should_terminate, *args)
166 thread = threading.Thread(target=helper_thread, args=newargs, kwargs=kwargs)
168 logger.debug('Started thread "%s" tid=%d', thread.name, thread.ident)
169 return (thread, should_terminate)
171 return wrapper_repeat
173 return decorator_repeat
176 if __name__ == '__main__':