22161275605d76a1199df8f18d536fd04e2fe17b
[python_utils.git] / thread_utils.py
1 #!/usr/bin/env python3
2
3 import functools
4 import logging
5 import os
6 import threading
7 from typing import Callable, Optional, Tuple
8
9 # This module is commonly used by others in here and should avoid
10 # taking any unnecessary dependencies back on them.
11
12 logger = logging.getLogger(__name__)
13
14
15 def current_thread_id() -> str:
16     """Returns a string composed of the parent process' id, the current
17     process' id and the current thread identifier.  The former two are
18     numbers (pids) whereas the latter is a thread id passed during thread
19     creation time.
20
21     >>> ret = current_thread_id()
22     >>> (ppid, pid, tid) = ret.split('/')
23     >>> ppid.isnumeric()
24     True
25     >>> pid.isnumeric()
26     True
27
28     """
29     ppid = os.getppid()
30     pid = os.getpid()
31     tid = threading.current_thread().name
32     return f'{ppid}/{pid}/{tid}:'
33
34
35 def is_current_thread_main_thread() -> bool:
36     """Returns True is the current (calling) thread is the process' main
37     thread and False otherwise.
38
39     >>> is_current_thread_main_thread()
40     True
41
42     >>> result = None
43     >>> def thunk():
44     ...     global result
45     ...     result = is_current_thread_main_thread()
46
47     >>> thunk()
48     >>> result
49     True
50
51     >>> import threading
52     >>> thread = threading.Thread(target=thunk)
53     >>> thread.start()
54     >>> thread.join()
55     >>> result
56     False
57
58     """
59     return threading.current_thread() is threading.main_thread()
60
61
62 def background_thread(
63     _funct: Optional[Callable],
64 ) -> Callable[..., Tuple[threading.Thread, threading.Event]]:
65     """A function decorator to create a background thread.
66
67     *** Please note: the decorated function must take an shutdown ***
68     *** event as an input parameter and should periodically check ***
69     *** it and stop if the event is set.                          ***
70
71     Usage:
72
73         @background_thread
74         def random(a: int, b: str, stop_event: threading.Event) -> None:
75             while True:
76                 print(f"Hi there {b}: {a}!")
77                 time.sleep(10.0)
78                 if stop_event.is_set():
79                     return
80
81
82         def main() -> None:
83             (thread, event) = random(22, "dude")
84             print("back!")
85             time.sleep(30.0)
86             event.set()
87             thread.join()
88
89     Note: in addition to any other arguments the function has, it must
90     take a stop_event as the last unnamed argument which it should
91     periodically check.  If the event is set, it means the thread has
92     been requested to terminate ASAP.
93     """
94
95     def wrapper(funct: Callable):
96         @functools.wraps(funct)
97         def inner_wrapper(*a, **kwa) -> Tuple[threading.Thread, threading.Event]:
98             should_terminate = threading.Event()
99             should_terminate.clear()
100             newargs = (*a, should_terminate)
101             thread = threading.Thread(
102                 target=funct,
103                 args=newargs,
104                 kwargs=kwa,
105             )
106             thread.start()
107             logger.debug(f'Started thread {thread.name} tid={thread.ident}')
108             return (thread, should_terminate)
109
110         return inner_wrapper
111
112     if _funct is None:
113         return wrapper  # type: ignore
114     else:
115         return wrapper(_funct)
116
117
118 def periodically_invoke(
119     period_sec: float,
120     stop_after: Optional[int],
121 ):
122     """
123     Periodically invoke a decorated function.  Stop after N invocations
124     (or, if stop_after is None, call forever).  Delay period_sec between
125     invocations.
126
127     Returns a Thread object and an Event that, when signaled, will stop
128     the invocations.  Note that it is possible to be invoked one time
129     after the Event is set.  This event can be used to stop infinite
130     invocation style or finite invocation style decorations.
131
132         @periodically_invoke(period_sec=0.5, stop_after=None)
133         def there(name: str, age: int) -> None:
134             print(f"   ...there {name}, {age}")
135
136
137         @periodically_invoke(period_sec=1.0, stop_after=3)
138         def hello(name: str) -> None:
139             print(f"Hello, {name}")
140
141     """
142
143     def decorator_repeat(func):
144         def helper_thread(should_terminate, *args, **kwargs) -> None:
145             if stop_after is None:
146                 while True:
147                     func(*args, **kwargs)
148                     res = should_terminate.wait(period_sec)
149                     if res:
150                         return
151             else:
152                 for _ in range(stop_after):
153                     func(*args, **kwargs)
154                     res = should_terminate.wait(period_sec)
155                     if res:
156                         return
157                 return
158
159         @functools.wraps(func)
160         def wrapper_repeat(*args, **kwargs):
161             should_terminate = threading.Event()
162             should_terminate.clear()
163             newargs = (should_terminate, *args)
164             thread = threading.Thread(target=helper_thread, args=newargs, kwargs=kwargs)
165             thread.start()
166             logger.debug(f'Started thread {thread.name} tid={thread.ident}')
167             return (thread, should_terminate)
168
169         return wrapper_repeat
170
171     return decorator_repeat
172
173
174 if __name__ == '__main__':
175     import doctest
176
177     doctest.testmod()