ad1f0bf9029b3232ba9cbd28b085afade91f0186
[python_utils.git] / thread_utils.py
1 #!/usr/bin/env python3
2
3 import functools
4 import logging
5 import os
6 import threading
7 from typing import Callable, Optional, Tuple
8
9 # This module is commonly used by others in here and should avoid
10 # taking any unnecessary dependencies back on them.
11
12 logger = logging.getLogger(__name__)
13
14
15 def current_thread_id() -> str:
16     ppid = os.getppid()
17     pid = os.getpid()
18     tid = threading.current_thread().name
19     return f'{ppid}/{pid}/{tid}:'
20
21
22 def is_current_thread_main_thread() -> bool:
23     """Returns True is the current (calling) thread is the process' main
24     thread and False otherwise.
25     """
26     return threading.current_thread() is threading.main_thread()
27
28
29 def background_thread(
30     _funct: Optional[Callable],
31 ) -> Tuple[threading.Thread, threading.Event]:
32     """A function decorator to create a background thread.
33
34     *** Please note: the decorated function must take an shutdown ***
35     *** event as an input parameter and should periodically check ***
36     *** it and stop if the event is set.                          ***
37
38     Usage:
39
40         @background_thread
41         def random(a: int, b: str, stop_event: threading.Event) -> None:
42             while True:
43                 print(f"Hi there {b}: {a}!")
44                 time.sleep(10.0)
45                 if stop_event.is_set():
46                     return
47
48
49         def main() -> None:
50             (thread, event) = random(22, "dude")
51             print("back!")
52             time.sleep(30.0)
53             event.set()
54             thread.join()
55
56     Note: in addition to any other arguments the function has, it must
57     take a stop_event as the last unnamed argument which it should
58     periodically check.  If the event is set, it means the thread has
59     been requested to terminate ASAP.
60     """
61
62     def wrapper(funct: Callable):
63         @functools.wraps(funct)
64         def inner_wrapper(
65             *a, **kwa
66         ) -> Tuple[threading.Thread, threading.Event]:
67             should_terminate = threading.Event()
68             should_terminate.clear()
69             newargs = (*a, should_terminate)
70             thread = threading.Thread(
71                 target=funct,
72                 args=newargs,
73                 kwargs=kwa,
74             )
75             thread.start()
76             logger.debug(f'Started thread {thread.name} tid={thread.ident}')
77             return (thread, should_terminate)
78
79         return inner_wrapper
80
81     if _funct is None:
82         return wrapper
83     else:
84         return wrapper(_funct)
85
86
87 def periodically_invoke(
88     period_sec: float,
89     stop_after: Optional[int],
90 ):
91     """
92     Periodically invoke a decorated function.  Stop after N invocations
93     (or, if stop_after is None, call forever).  Delay period_sec between
94     invocations.
95
96     Returns a Thread object and an Event that, when signaled, will stop
97     the invocations.  Note that it is possible to be invoked one time
98     after the Event is set.  This event can be used to stop infinite
99     invocation style or finite invocation style decorations.
100
101         @periodically_invoke(period_sec=0.5, stop_after=None)
102         def there(name: str, age: int) -> None:
103             print(f"   ...there {name}, {age}")
104
105
106         @periodically_invoke(period_sec=1.0, stop_after=3)
107         def hello(name: str) -> None:
108             print(f"Hello, {name}")
109
110     """
111
112     def decorator_repeat(func):
113         def helper_thread(should_terminate, *args, **kwargs) -> None:
114             if stop_after is None:
115                 while True:
116                     func(*args, **kwargs)
117                     res = should_terminate.wait(period_sec)
118                     if res:
119                         return
120             else:
121                 for _ in range(stop_after):
122                     func(*args, **kwargs)
123                     res = should_terminate.wait(period_sec)
124                     if res:
125                         return
126                 return
127
128         @functools.wraps(func)
129         def wrapper_repeat(*args, **kwargs):
130             should_terminate = threading.Event()
131             should_terminate.clear()
132             newargs = (should_terminate, *args)
133             thread = threading.Thread(
134                 target=helper_thread, args=newargs, kwargs=kwargs
135             )
136             thread.start()
137             logger.debug(f'Started thread {thread.name} tid={thread.ident}')
138             return (thread, should_terminate)
139
140         return wrapper_repeat
141
142     return decorator_repeat