7 from typing import Callable, Optional, Tuple
9 # This module is commonly used by others in here and should avoid
10 # taking any unnecessary dependencies back on them.
12 logger = logging.getLogger(__name__)
15 def current_thread_id() -> str:
16 """Returns a string composed of the parent process' id, the current
17 process' id and the current thread identifier. The former two are
18 numbers (pids) whereas the latter is a thread id passed during thread
21 >>> ret = current_thread_id()
22 >>> (ppid, pid, tid) = ret.split('/')
31 tid = threading.current_thread().name
32 return f'{ppid}/{pid}/{tid}:'
35 def is_current_thread_main_thread() -> bool:
36 """Returns True is the current (calling) thread is the process' main
37 thread and False otherwise.
39 >>> is_current_thread_main_thread()
45 ... result = is_current_thread_main_thread()
52 >>> thread = threading.Thread(target=thunk)
59 return threading.current_thread() is threading.main_thread()
62 def background_thread(
63 _funct: Optional[Callable],
64 ) -> Tuple[threading.Thread, threading.Event]:
65 """A function decorator to create a background thread.
67 *** Please note: the decorated function must take an shutdown ***
68 *** event as an input parameter and should periodically check ***
69 *** it and stop if the event is set. ***
74 def random(a: int, b: str, stop_event: threading.Event) -> None:
76 print(f"Hi there {b}: {a}!")
78 if stop_event.is_set():
83 (thread, event) = random(22, "dude")
89 Note: in addition to any other arguments the function has, it must
90 take a stop_event as the last unnamed argument which it should
91 periodically check. If the event is set, it means the thread has
92 been requested to terminate ASAP.
95 def wrapper(funct: Callable):
96 @functools.wraps(funct)
97 def inner_wrapper(*a, **kwa) -> Tuple[threading.Thread, threading.Event]:
98 should_terminate = threading.Event()
99 should_terminate.clear()
100 newargs = (*a, should_terminate)
101 thread = threading.Thread(
107 logger.debug(f'Started thread {thread.name} tid={thread.ident}')
108 return (thread, should_terminate)
115 return wrapper(_funct)
118 def periodically_invoke(
120 stop_after: Optional[int],
123 Periodically invoke a decorated function. Stop after N invocations
124 (or, if stop_after is None, call forever). Delay period_sec between
127 Returns a Thread object and an Event that, when signaled, will stop
128 the invocations. Note that it is possible to be invoked one time
129 after the Event is set. This event can be used to stop infinite
130 invocation style or finite invocation style decorations.
132 @periodically_invoke(period_sec=0.5, stop_after=None)
133 def there(name: str, age: int) -> None:
134 print(f" ...there {name}, {age}")
137 @periodically_invoke(period_sec=1.0, stop_after=3)
138 def hello(name: str) -> None:
139 print(f"Hello, {name}")
143 def decorator_repeat(func):
144 def helper_thread(should_terminate, *args, **kwargs) -> None:
145 if stop_after is None:
147 func(*args, **kwargs)
148 res = should_terminate.wait(period_sec)
152 for _ in range(stop_after):
153 func(*args, **kwargs)
154 res = should_terminate.wait(period_sec)
159 @functools.wraps(func)
160 def wrapper_repeat(*args, **kwargs):
161 should_terminate = threading.Event()
162 should_terminate.clear()
163 newargs = (should_terminate, *args)
164 thread = threading.Thread(target=helper_thread, args=newargs, kwargs=kwargs)
166 logger.debug(f'Started thread {thread.name} tid={thread.ident}')
167 return (thread, should_terminate)
169 return wrapper_repeat
171 return decorator_repeat
174 if __name__ == '__main__':