Ahem. Still running black?
[python_utils.git] / thread_utils.py
1 #!/usr/bin/env python3
2
3 import functools
4 import logging
5 import os
6 import threading
7 from typing import Callable, Optional, Tuple
8
9 # This module is commonly used by others in here and should avoid
10 # taking any unnecessary dependencies back on them.
11
12 logger = logging.getLogger(__name__)
13
14
15 def current_thread_id() -> str:
16     ppid = os.getppid()
17     pid = os.getpid()
18     tid = threading.current_thread().name
19     return f'{ppid}/{pid}/{tid}:'
20
21
22 def is_current_thread_main_thread() -> bool:
23     """Returns True is the current (calling) thread is the process' main
24     thread and False otherwise.
25     """
26     return threading.current_thread() is threading.main_thread()
27
28
29 def background_thread(
30     _funct: Optional[Callable],
31 ) -> Tuple[threading.Thread, threading.Event]:
32     """A function decorator to create a background thread.
33
34     *** Please note: the decorated function must take an shutdown ***
35     *** event as an input parameter and should periodically check ***
36     *** it and stop if the event is set.                          ***
37
38     Usage:
39
40         @background_thread
41         def random(a: int, b: str, stop_event: threading.Event) -> None:
42             while True:
43                 print(f"Hi there {b}: {a}!")
44                 time.sleep(10.0)
45                 if stop_event.is_set():
46                     return
47
48
49         def main() -> None:
50             (thread, event) = random(22, "dude")
51             print("back!")
52             time.sleep(30.0)
53             event.set()
54             thread.join()
55
56     Note: in addition to any other arguments the function has, it must
57     take a stop_event as the last unnamed argument which it should
58     periodically check.  If the event is set, it means the thread has
59     been requested to terminate ASAP.
60     """
61
62     def wrapper(funct: Callable):
63         @functools.wraps(funct)
64         def inner_wrapper(*a, **kwa) -> Tuple[threading.Thread, threading.Event]:
65             should_terminate = threading.Event()
66             should_terminate.clear()
67             newargs = (*a, should_terminate)
68             thread = threading.Thread(
69                 target=funct,
70                 args=newargs,
71                 kwargs=kwa,
72             )
73             thread.start()
74             logger.debug(f'Started thread {thread.name} tid={thread.ident}')
75             return (thread, should_terminate)
76
77         return inner_wrapper
78
79     if _funct is None:
80         return wrapper
81     else:
82         return wrapper(_funct)
83
84
85 def periodically_invoke(
86     period_sec: float,
87     stop_after: Optional[int],
88 ):
89     """
90     Periodically invoke a decorated function.  Stop after N invocations
91     (or, if stop_after is None, call forever).  Delay period_sec between
92     invocations.
93
94     Returns a Thread object and an Event that, when signaled, will stop
95     the invocations.  Note that it is possible to be invoked one time
96     after the Event is set.  This event can be used to stop infinite
97     invocation style or finite invocation style decorations.
98
99         @periodically_invoke(period_sec=0.5, stop_after=None)
100         def there(name: str, age: int) -> None:
101             print(f"   ...there {name}, {age}")
102
103
104         @periodically_invoke(period_sec=1.0, stop_after=3)
105         def hello(name: str) -> None:
106             print(f"Hello, {name}")
107
108     """
109
110     def decorator_repeat(func):
111         def helper_thread(should_terminate, *args, **kwargs) -> None:
112             if stop_after is None:
113                 while True:
114                     func(*args, **kwargs)
115                     res = should_terminate.wait(period_sec)
116                     if res:
117                         return
118             else:
119                 for _ in range(stop_after):
120                     func(*args, **kwargs)
121                     res = should_terminate.wait(period_sec)
122                     if res:
123                         return
124                 return
125
126         @functools.wraps(func)
127         def wrapper_repeat(*args, **kwargs):
128             should_terminate = threading.Event()
129             should_terminate.clear()
130             newargs = (should_terminate, *args)
131             thread = threading.Thread(target=helper_thread, args=newargs, kwargs=kwargs)
132             thread.start()
133             logger.debug(f'Started thread {thread.name} tid={thread.ident}')
134             return (thread, should_terminate)
135
136         return wrapper_repeat
137
138     return decorator_repeat