ACL uses enums, some more tests, other stuff.
[python_utils.git] / thread_utils.py
1 #!/usr/bin/env python3
2
3 import functools
4 import logging
5 import os
6 import threading
7 from typing import Callable, Optional, Tuple
8
9 # This module is commonly used by others in here and should avoid
10 # taking any unnecessary dependencies back on them.
11
12 logger = logging.getLogger(__name__)
13
14
15 def current_thread_id() -> str:
16     ppid = os.getppid()
17     pid = os.getpid()
18     tid = threading.current_thread().name
19     return f'{ppid}/{pid}/{tid}:'
20
21
22 def is_current_thread_main_thread() -> bool:
23     return threading.current_thread() is threading.main_thread()
24
25
26 def background_thread(
27         _funct: Optional[Callable]
28 ) -> Tuple[threading.Thread, threading.Event]:
29     """A function decorator to create a background thread.
30
31     *** Please note: the decorated function must take an shutdown ***
32     *** event as an input parameter and should periodically check ***
33     *** it and stop if the event is set.                          ***
34
35     Usage:
36
37         @background_thread
38         def random(a: int, b: str, stop_event: threading.Event) -> None:
39             while True:
40                 print(f"Hi there {b}: {a}!")
41                 time.sleep(10.0)
42                 if stop_event.is_set():
43                     return
44
45
46         def main() -> None:
47             (thread, event) = random(22, "dude")
48             print("back!")
49             time.sleep(30.0)
50             event.set()
51             thread.join()
52
53     Note: in addition to any other arguments the function has, it must
54     take a stop_event as the last unnamed argument which it should
55     periodically check.  If the event is set, it means the thread has
56     been requested to terminate ASAP.
57     """
58     def wrapper(funct: Callable):
59         @functools.wraps(funct)
60         def inner_wrapper(
61                 *a, **kwa
62         ) -> Tuple[threading.Thread, threading.Event]:
63             should_terminate = threading.Event()
64             should_terminate.clear()
65             newargs = (*a, should_terminate)
66             thread = threading.Thread(
67                 target=funct,
68                 args=newargs,
69                 kwargs=kwa,
70             )
71             thread.start()
72             logger.debug(
73                 f'Started thread {thread.name} tid={thread.ident}'
74             )
75             return (thread, should_terminate)
76         return inner_wrapper
77
78     if _funct is None:
79         return wrapper
80     else:
81         return wrapper(_funct)
82
83
84 def periodically_invoke(
85         period_sec: float,
86         stop_after: Optional[int],
87 ):
88     """
89     Periodically invoke a decorated function.  Stop after N invocations
90     (or, if stop_after is None, call forever).  Delay period_sec between
91     invocations.
92
93     Returns a Thread object and an Event that, when signaled, will stop
94     the invocations.  Note that it is possible to be invoked one time
95     after the Event is set.  This event can be used to stop infinite
96     invocation style or finite invocation style decorations.
97
98         @periodically_invoke(period_sec=0.5, stop_after=None)
99         def there(name: str, age: int) -> None:
100             print(f"   ...there {name}, {age}")
101
102
103         @periodically_invoke(period_sec=1.0, stop_after=3)
104         def hello(name: str) -> None:
105             print(f"Hello, {name}")
106
107     """
108     def decorator_repeat(func):
109         def helper_thread(should_terminate, *args, **kwargs) -> None:
110             if stop_after is None:
111                 while True:
112                     func(*args, **kwargs)
113                     res = should_terminate.wait(period_sec)
114                     if res:
115                         return
116             else:
117                 for _ in range(stop_after):
118                     func(*args, **kwargs)
119                     res = should_terminate.wait(period_sec)
120                     if res:
121                         return
122                 return
123
124         @functools.wraps(func)
125         def wrapper_repeat(*args, **kwargs):
126             should_terminate = threading.Event()
127             should_terminate.clear()
128             newargs = (should_terminate, *args)
129             thread = threading.Thread(
130                 target=helper_thread,
131                 args = newargs,
132                 kwargs = kwargs
133             )
134             thread.start()
135             logger.debug(
136                 f'Started thread {thread.name} tid={thread.ident}'
137             )
138             return (thread, should_terminate)
139         return wrapper_repeat
140     return decorator_repeat