7 from typing import Callable, Optional, Tuple
9 # This module is commonly used by others in here and should avoid
10 # taking any unnecessary dependencies back on them.
12 logger = logging.getLogger(__name__)
15 def current_thread_id() -> str:
18 tid = threading.current_thread().name
19 return f'{ppid}/{pid}/{tid}:'
22 def is_current_thread_main_thread() -> bool:
23 return threading.current_thread() is threading.main_thread()
26 def background_thread(
27 _funct: Optional[Callable]
28 ) -> Tuple[threading.Thread, threading.Event]:
29 """A function decorator to create a background thread.
31 *** Please note: the decorated function must take an shutdown ***
32 *** event as an input parameter and should periodically check ***
33 *** it and stop if the event is set. ***
38 def random(a: int, b: str, stop_event: threading.Event) -> None:
40 print(f"Hi there {b}: {a}!")
42 if stop_event.is_set():
47 (thread, event) = random(22, "dude")
53 Note: in addition to any other arguments the function has, it must
54 take a stop_event as the last unnamed argument which it should
55 periodically check. If the event is set, it means the thread has
56 been requested to terminate ASAP.
58 def wrapper(funct: Callable):
59 @functools.wraps(funct)
62 ) -> Tuple[threading.Thread, threading.Event]:
63 should_terminate = threading.Event()
64 should_terminate.clear()
65 newargs = (*a, should_terminate)
66 thread = threading.Thread(
73 f'Started thread {thread.name} tid={thread.ident}'
75 return (thread, should_terminate)
81 return wrapper(_funct)
84 def periodically_invoke(
86 stop_after: Optional[int],
89 Periodically invoke a decorated function. Stop after N invocations
90 (or, if stop_after is None, call forever). Delay period_sec between
93 Returns a Thread object and an Event that, when signaled, will stop
94 the invocations. Note that it is possible to be invoked one time
95 after the Event is set. This event can be used to stop infinite
96 invocation style or finite invocation style decorations.
98 @periodically_invoke(period_sec=0.5, stop_after=None)
99 def there(name: str, age: int) -> None:
100 print(f" ...there {name}, {age}")
103 @periodically_invoke(period_sec=1.0, stop_after=3)
104 def hello(name: str) -> None:
105 print(f"Hello, {name}")
108 def decorator_repeat(func):
109 def helper_thread(should_terminate, *args, **kwargs) -> None:
110 if stop_after is None:
112 func(*args, **kwargs)
113 res = should_terminate.wait(period_sec)
117 for _ in range(stop_after):
118 func(*args, **kwargs)
119 res = should_terminate.wait(period_sec)
124 @functools.wraps(func)
125 def wrapper_repeat(*args, **kwargs):
126 should_terminate = threading.Event()
127 should_terminate.clear()
128 newargs = (should_terminate, *args)
129 thread = threading.Thread(
130 target=helper_thread,
136 f'Started thread {thread.name} tid={thread.ident}'
138 return (thread, should_terminate)
139 return wrapper_repeat
140 return decorator_repeat