2375f3df3938b6a5562911af28202a909b0a0cd7
[python_utils.git] / thread_utils.py
1 #!/usr/bin/env python3
2
3 """Utilities for dealing with threads + threading."""
4
5 import functools
6 import logging
7 import os
8 import threading
9 from typing import Any, Callable, Optional, Tuple
10
11 # This module is commonly used by others in here and should avoid
12 # taking any unnecessary dependencies back on them.
13
14 logger = logging.getLogger(__name__)
15
16
17 def current_thread_id() -> str:
18     """Returns a string composed of the parent process' id, the current
19     process' id and the current thread identifier.  The former two are
20     numbers (pids) whereas the latter is a thread id passed during thread
21     creation time.
22
23     >>> ret = current_thread_id()
24     >>> (ppid, pid, tid) = ret.split('/')
25     >>> ppid.isnumeric()
26     True
27     >>> pid.isnumeric()
28     True
29
30     """
31     ppid = os.getppid()
32     pid = os.getpid()
33     tid = threading.current_thread().name
34     return f'{ppid}/{pid}/{tid}:'
35
36
37 def is_current_thread_main_thread() -> bool:
38     """Returns True is the current (calling) thread is the process' main
39     thread and False otherwise.
40
41     >>> is_current_thread_main_thread()
42     True
43
44     >>> result = None
45     >>> def thunk():
46     ...     global result
47     ...     result = is_current_thread_main_thread()
48
49     >>> thunk()
50     >>> result
51     True
52
53     >>> import threading
54     >>> thread = threading.Thread(target=thunk)
55     >>> thread.start()
56     >>> thread.join()
57     >>> result
58     False
59
60     """
61     return threading.current_thread() is threading.main_thread()
62
63
64 def background_thread(
65     _funct: Optional[Callable[..., Any]],
66 ) -> Callable[..., Tuple[threading.Thread, threading.Event]]:
67     """A function decorator to create a background thread.
68
69     *** Please note: the decorated function must take an shutdown ***
70     *** event as an input parameter and should periodically check ***
71     *** it and stop if the event is set.                          ***
72
73     Usage:
74
75         @background_thread
76         def random(a: int, b: str, stop_event: threading.Event) -> None:
77             while True:
78                 print(f"Hi there {b}: {a}!")
79                 time.sleep(10.0)
80                 if stop_event.is_set():
81                     return
82
83
84         def main() -> None:
85             (thread, event) = random(22, "dude")
86             print("back!")
87             time.sleep(30.0)
88             event.set()
89             thread.join()
90
91     Note: in addition to any other arguments the function has, it must
92     take a stop_event as the last unnamed argument which it should
93     periodically check.  If the event is set, it means the thread has
94     been requested to terminate ASAP.
95     """
96
97     def wrapper(funct: Callable):
98         @functools.wraps(funct)
99         def inner_wrapper(*a, **kwa) -> Tuple[threading.Thread, threading.Event]:
100             should_terminate = threading.Event()
101             should_terminate.clear()
102             newargs = (*a, should_terminate)
103             thread = threading.Thread(
104                 target=funct,
105                 args=newargs,
106                 kwargs=kwa,
107             )
108             thread.start()
109             logger.debug('Started thread "%s" tid=%d', thread.name, thread.ident)
110             return (thread, should_terminate)
111
112         return inner_wrapper
113
114     if _funct is None:
115         return wrapper  # type: ignore
116     else:
117         return wrapper(_funct)
118
119
120 def periodically_invoke(
121     period_sec: float,
122     stop_after: Optional[int],
123 ):
124     """
125     Periodically invoke a decorated function.  Stop after N invocations
126     (or, if stop_after is None, call forever).  Delay period_sec between
127     invocations.
128
129     Returns a Thread object and an Event that, when signaled, will stop
130     the invocations.  Note that it is possible to be invoked one time
131     after the Event is set.  This event can be used to stop infinite
132     invocation style or finite invocation style decorations.
133
134         @periodically_invoke(period_sec=0.5, stop_after=None)
135         def there(name: str, age: int) -> None:
136             print(f"   ...there {name}, {age}")
137
138
139         @periodically_invoke(period_sec=1.0, stop_after=3)
140         def hello(name: str) -> None:
141             print(f"Hello, {name}")
142
143     """
144
145     def decorator_repeat(func):
146         def helper_thread(should_terminate, *args, **kwargs) -> None:
147             if stop_after is None:
148                 while True:
149                     func(*args, **kwargs)
150                     res = should_terminate.wait(period_sec)
151                     if res:
152                         return
153             else:
154                 for _ in range(stop_after):
155                     func(*args, **kwargs)
156                     res = should_terminate.wait(period_sec)
157                     if res:
158                         return
159                 return
160
161         @functools.wraps(func)
162         def wrapper_repeat(*args, **kwargs):
163             should_terminate = threading.Event()
164             should_terminate.clear()
165             newargs = (should_terminate, *args)
166             thread = threading.Thread(target=helper_thread, args=newargs, kwargs=kwargs)
167             thread.start()
168             logger.debug('Started thread "%s" tid=%d', thread.name, thread.ident)
169             return (thread, should_terminate)
170
171         return wrapper_repeat
172
173     return decorator_repeat
174
175
176 if __name__ == '__main__':
177     import doctest
178
179     doctest.testmod()