Geocoder.
[python_utils.git] / thread_utils.py
1 #!/usr/bin/env python3
2
3 # © Copyright 2021-2022, Scott Gasch
4
5 """Utilities for dealing with threads + threading."""
6
7 import functools
8 import logging
9 import os
10 import threading
11 from typing import Any, Callable, Optional, Tuple
12
13 # This module is commonly used by others in here and should avoid
14 # taking any unnecessary dependencies back on them.
15
16 logger = logging.getLogger(__name__)
17
18
19 def current_thread_id() -> str:
20     """Returns a string composed of the parent process' id, the current
21     process' id and the current thread identifier.  The former two are
22     numbers (pids) whereas the latter is a thread id passed during thread
23     creation time.
24
25     >>> ret = current_thread_id()
26     >>> (ppid, pid, tid) = ret.split('/')
27     >>> ppid.isnumeric()
28     True
29     >>> pid.isnumeric()
30     True
31
32     """
33     ppid = os.getppid()
34     pid = os.getpid()
35     tid = threading.current_thread().name
36     return f'{ppid}/{pid}/{tid}:'
37
38
39 def is_current_thread_main_thread() -> bool:
40     """Returns True is the current (calling) thread is the process' main
41     thread and False otherwise.
42
43     >>> is_current_thread_main_thread()
44     True
45
46     >>> result = None
47     >>> def thunk():
48     ...     global result
49     ...     result = is_current_thread_main_thread()
50
51     >>> thunk()
52     >>> result
53     True
54
55     >>> import threading
56     >>> thread = threading.Thread(target=thunk)
57     >>> thread.start()
58     >>> thread.join()
59     >>> result
60     False
61
62     """
63     return threading.current_thread() is threading.main_thread()
64
65
66 def background_thread(
67     _funct: Optional[Callable[..., Any]],
68 ) -> Callable[..., Tuple[threading.Thread, threading.Event]]:
69     """A function decorator to create a background thread.
70
71     *** Please note: the decorated function must take an shutdown ***
72     *** event as an input parameter and should periodically check ***
73     *** it and stop if the event is set.                          ***
74
75     Usage:
76
77         @background_thread
78         def random(a: int, b: str, stop_event: threading.Event) -> None:
79             while True:
80                 print(f"Hi there {b}: {a}!")
81                 time.sleep(10.0)
82                 if stop_event.is_set():
83                     return
84
85
86         def main() -> None:
87             (thread, event) = random(22, "dude")
88             print("back!")
89             time.sleep(30.0)
90             event.set()
91             thread.join()
92
93     Note: in addition to any other arguments the function has, it must
94     take a stop_event as the last unnamed argument which it should
95     periodically check.  If the event is set, it means the thread has
96     been requested to terminate ASAP.
97     """
98
99     def wrapper(funct: Callable):
100         @functools.wraps(funct)
101         def inner_wrapper(*a, **kwa) -> Tuple[threading.Thread, threading.Event]:
102             should_terminate = threading.Event()
103             should_terminate.clear()
104             newargs = (*a, should_terminate)
105             thread = threading.Thread(
106                 target=funct,
107                 args=newargs,
108                 kwargs=kwa,
109             )
110             thread.start()
111             logger.debug('Started thread "%s" tid=%d', thread.name, thread.ident)
112             return (thread, should_terminate)
113
114         return inner_wrapper
115
116     if _funct is None:
117         return wrapper  # type: ignore
118     else:
119         return wrapper(_funct)
120
121
122 def periodically_invoke(
123     period_sec: float,
124     stop_after: Optional[int],
125 ):
126     """
127     Periodically invoke a decorated function.  Stop after N invocations
128     (or, if stop_after is None, call forever).  Delay period_sec between
129     invocations.
130
131     Returns a Thread object and an Event that, when signaled, will stop
132     the invocations.  Note that it is possible to be invoked one time
133     after the Event is set.  This event can be used to stop infinite
134     invocation style or finite invocation style decorations.
135
136         @periodically_invoke(period_sec=0.5, stop_after=None)
137         def there(name: str, age: int) -> None:
138             print(f"   ...there {name}, {age}")
139
140
141         @periodically_invoke(period_sec=1.0, stop_after=3)
142         def hello(name: str) -> None:
143             print(f"Hello, {name}")
144
145     """
146
147     def decorator_repeat(func):
148         def helper_thread(should_terminate, *args, **kwargs) -> None:
149             if stop_after is None:
150                 while True:
151                     func(*args, **kwargs)
152                     res = should_terminate.wait(period_sec)
153                     if res:
154                         return
155             else:
156                 for _ in range(stop_after):
157                     func(*args, **kwargs)
158                     res = should_terminate.wait(period_sec)
159                     if res:
160                         return
161                 return
162
163         @functools.wraps(func)
164         def wrapper_repeat(*args, **kwargs):
165             should_terminate = threading.Event()
166             should_terminate.clear()
167             newargs = (should_terminate, *args)
168             thread = threading.Thread(target=helper_thread, args=newargs, kwargs=kwargs)
169             thread.start()
170             logger.debug('Started thread "%s" tid=%d', thread.name, thread.ident)
171             return (thread, should_terminate)
172
173         return wrapper_repeat
174
175     return decorator_repeat
176
177
178 if __name__ == '__main__':
179     import doctest
180
181     doctest.testmod()