7 from typing import Callable, Optional, Tuple
9 # This module is commonly used by others in here and should avoid
10 # taking any unnecessary dependencies back on them.
12 logger = logging.getLogger(__name__)
15 def current_thread_id() -> str:
18 tid = threading.current_thread().name
19 return f'{ppid}/{pid}/{tid}:'
22 def is_current_thread_main_thread() -> bool:
23 """Returns True is the current (calling) thread is the process' main
24 thread and False otherwise.
26 return threading.current_thread() is threading.main_thread()
29 def background_thread(
30 _funct: Optional[Callable],
31 ) -> Tuple[threading.Thread, threading.Event]:
32 """A function decorator to create a background thread.
34 *** Please note: the decorated function must take an shutdown ***
35 *** event as an input parameter and should periodically check ***
36 *** it and stop if the event is set. ***
41 def random(a: int, b: str, stop_event: threading.Event) -> None:
43 print(f"Hi there {b}: {a}!")
45 if stop_event.is_set():
50 (thread, event) = random(22, "dude")
56 Note: in addition to any other arguments the function has, it must
57 take a stop_event as the last unnamed argument which it should
58 periodically check. If the event is set, it means the thread has
59 been requested to terminate ASAP.
62 def wrapper(funct: Callable):
63 @functools.wraps(funct)
64 def inner_wrapper(*a, **kwa) -> Tuple[threading.Thread, threading.Event]:
65 should_terminate = threading.Event()
66 should_terminate.clear()
67 newargs = (*a, should_terminate)
68 thread = threading.Thread(
74 logger.debug(f'Started thread {thread.name} tid={thread.ident}')
75 return (thread, should_terminate)
82 return wrapper(_funct)
85 def periodically_invoke(
87 stop_after: Optional[int],
90 Periodically invoke a decorated function. Stop after N invocations
91 (or, if stop_after is None, call forever). Delay period_sec between
94 Returns a Thread object and an Event that, when signaled, will stop
95 the invocations. Note that it is possible to be invoked one time
96 after the Event is set. This event can be used to stop infinite
97 invocation style or finite invocation style decorations.
99 @periodically_invoke(period_sec=0.5, stop_after=None)
100 def there(name: str, age: int) -> None:
101 print(f" ...there {name}, {age}")
104 @periodically_invoke(period_sec=1.0, stop_after=3)
105 def hello(name: str) -> None:
106 print(f"Hello, {name}")
110 def decorator_repeat(func):
111 def helper_thread(should_terminate, *args, **kwargs) -> None:
112 if stop_after is None:
114 func(*args, **kwargs)
115 res = should_terminate.wait(period_sec)
119 for _ in range(stop_after):
120 func(*args, **kwargs)
121 res = should_terminate.wait(period_sec)
126 @functools.wraps(func)
127 def wrapper_repeat(*args, **kwargs):
128 should_terminate = threading.Event()
129 should_terminate.clear()
130 newargs = (should_terminate, *args)
131 thread = threading.Thread(target=helper_thread, args=newargs, kwargs=kwargs)
133 logger.debug(f'Started thread {thread.name} tid={thread.ident}')
134 return (thread, should_terminate)
136 return wrapper_repeat
138 return decorator_repeat