3 # © Copyright 2021-2022, Scott Gasch
5 """Utilities for dealing with threads + threading."""
11 from typing import Any, Callable, Optional, Tuple
13 # This module is commonly used by others in here and should avoid
14 # taking any unnecessary dependencies back on them.
16 logger = logging.getLogger(__name__)
19 def current_thread_id() -> str:
20 """Returns a string composed of the parent process' id, the current
21 process' id and the current thread identifier. The former two are
22 numbers (pids) whereas the latter is a thread id passed during thread
25 >>> ret = current_thread_id()
26 >>> (ppid, pid, tid) = ret.split('/')
35 tid = threading.current_thread().name
36 return f'{ppid}/{pid}/{tid}:'
39 def is_current_thread_main_thread() -> bool:
40 """Returns True is the current (calling) thread is the process' main
41 thread and False otherwise.
43 >>> is_current_thread_main_thread()
49 ... result = is_current_thread_main_thread()
56 >>> thread = threading.Thread(target=thunk)
63 return threading.current_thread() is threading.main_thread()
66 def background_thread(
67 _funct: Optional[Callable[..., Any]],
68 ) -> Callable[..., Tuple[threading.Thread, threading.Event]]:
69 """A function decorator to create a background thread.
71 *** Please note: the decorated function must take an shutdown ***
72 *** event as an input parameter and should periodically check ***
73 *** it and stop if the event is set. ***
78 def random(a: int, b: str, stop_event: threading.Event) -> None:
80 print(f"Hi there {b}: {a}!")
82 if stop_event.is_set():
87 (thread, event) = random(22, "dude")
93 Note: in addition to any other arguments the function has, it must
94 take a stop_event as the last unnamed argument which it should
95 periodically check. If the event is set, it means the thread has
96 been requested to terminate ASAP.
99 def wrapper(funct: Callable):
100 @functools.wraps(funct)
101 def inner_wrapper(*a, **kwa) -> Tuple[threading.Thread, threading.Event]:
102 should_terminate = threading.Event()
103 should_terminate.clear()
104 newargs = (*a, should_terminate)
105 thread = threading.Thread(
111 logger.debug('Started thread "%s" tid=%d', thread.name, thread.ident)
112 return (thread, should_terminate)
117 return wrapper # type: ignore
119 return wrapper(_funct)
122 def periodically_invoke(
124 stop_after: Optional[int],
127 Periodically invoke a decorated function. Stop after N invocations
128 (or, if stop_after is None, call forever). Delay period_sec between
131 Returns a Thread object and an Event that, when signaled, will stop
132 the invocations. Note that it is possible to be invoked one time
133 after the Event is set. This event can be used to stop infinite
134 invocation style or finite invocation style decorations.
136 @periodically_invoke(period_sec=0.5, stop_after=None)
137 def there(name: str, age: int) -> None:
138 print(f" ...there {name}, {age}")
141 @periodically_invoke(period_sec=1.0, stop_after=3)
142 def hello(name: str) -> None:
143 print(f"Hello, {name}")
147 def decorator_repeat(func):
148 def helper_thread(should_terminate, *args, **kwargs) -> None:
149 if stop_after is None:
151 func(*args, **kwargs)
152 res = should_terminate.wait(period_sec)
156 for _ in range(stop_after):
157 func(*args, **kwargs)
158 res = should_terminate.wait(period_sec)
163 @functools.wraps(func)
164 def wrapper_repeat(*args, **kwargs):
165 should_terminate = threading.Event()
166 should_terminate.clear()
167 newargs = (should_terminate, *args)
168 thread = threading.Thread(target=helper_thread, args=newargs, kwargs=kwargs)
170 logger.debug('Started thread "%s" tid=%d', thread.name, thread.ident)
171 return (thread, should_terminate)
173 return wrapper_repeat
175 return decorator_repeat
178 if __name__ == '__main__':