3 """Utilities for dealing with threads + threading."""
9 from typing import Any, Callable, Optional, Tuple
11 # This module is commonly used by others in here and should avoid
12 # taking any unnecessary dependencies back on them.
14 logger = logging.getLogger(__name__)
17 def current_thread_id() -> str:
18 """Returns a string composed of the parent process' id, the current
19 process' id and the current thread identifier. The former two are
20 numbers (pids) whereas the latter is a thread id passed during thread
23 >>> ret = current_thread_id()
24 >>> (ppid, pid, tid) = ret.split('/')
33 tid = threading.current_thread().name
34 return f'{ppid}/{pid}/{tid}:'
37 def is_current_thread_main_thread() -> bool:
38 """Returns True is the current (calling) thread is the process' main
39 thread and False otherwise.
41 >>> is_current_thread_main_thread()
47 ... result = is_current_thread_main_thread()
54 >>> thread = threading.Thread(target=thunk)
61 return threading.current_thread() is threading.main_thread()
64 def background_thread(
65 _funct: Optional[Callable[..., Any]],
66 ) -> Callable[..., Tuple[threading.Thread, threading.Event]]:
67 """A function decorator to create a background thread.
69 *** Please note: the decorated function must take an shutdown ***
70 *** event as an input parameter and should periodically check ***
71 *** it and stop if the event is set. ***
76 def random(a: int, b: str, stop_event: threading.Event) -> None:
78 print(f"Hi there {b}: {a}!")
80 if stop_event.is_set():
85 (thread, event) = random(22, "dude")
91 Note: in addition to any other arguments the function has, it must
92 take a stop_event as the last unnamed argument which it should
93 periodically check. If the event is set, it means the thread has
94 been requested to terminate ASAP.
97 def wrapper(funct: Callable):
98 @functools.wraps(funct)
99 def inner_wrapper(*a, **kwa) -> Tuple[threading.Thread, threading.Event]:
100 should_terminate = threading.Event()
101 should_terminate.clear()
102 newargs = (*a, should_terminate)
103 thread = threading.Thread(
109 logger.debug('Started thread "%s" tid=%d', thread.name, thread.ident)
110 return (thread, should_terminate)
115 return wrapper # type: ignore
117 return wrapper(_funct)
120 def periodically_invoke(
122 stop_after: Optional[int],
125 Periodically invoke a decorated function. Stop after N invocations
126 (or, if stop_after is None, call forever). Delay period_sec between
129 Returns a Thread object and an Event that, when signaled, will stop
130 the invocations. Note that it is possible to be invoked one time
131 after the Event is set. This event can be used to stop infinite
132 invocation style or finite invocation style decorations.
134 @periodically_invoke(period_sec=0.5, stop_after=None)
135 def there(name: str, age: int) -> None:
136 print(f" ...there {name}, {age}")
139 @periodically_invoke(period_sec=1.0, stop_after=3)
140 def hello(name: str) -> None:
141 print(f"Hello, {name}")
145 def decorator_repeat(func):
146 def helper_thread(should_terminate, *args, **kwargs) -> None:
147 if stop_after is None:
149 func(*args, **kwargs)
150 res = should_terminate.wait(period_sec)
154 for _ in range(stop_after):
155 func(*args, **kwargs)
156 res = should_terminate.wait(period_sec)
161 @functools.wraps(func)
162 def wrapper_repeat(*args, **kwargs):
163 should_terminate = threading.Event()
164 should_terminate.clear()
165 newargs = (should_terminate, *args)
166 thread = threading.Thread(target=helper_thread, args=newargs, kwargs=kwargs)
168 logger.debug('Started thread "%s" tid=%d', thread.name, thread.ident)
169 return (thread, should_terminate)
171 return wrapper_repeat
173 return decorator_repeat
176 if __name__ == '__main__':