Clean up more docs to work with sphinx.
[python_utils.git] / thread_utils.py
1 #!/usr/bin/env python3
2
3 # © Copyright 2021-2022, Scott Gasch
4
5 """Utilities for dealing with threads + threading."""
6
7 import functools
8 import logging
9 import os
10 import threading
11 from typing import Any, Callable, Optional, Tuple
12
13 # This module is commonly used by others in here and should avoid
14 # taking any unnecessary dependencies back on them.
15
16 logger = logging.getLogger(__name__)
17
18
19 def current_thread_id() -> str:
20     """Returns a string composed of the parent process' id, the current
21     process' id and the current thread identifier.  The former two are
22     numbers (pids) whereas the latter is a thread id passed during thread
23     creation time.
24
25     >>> ret = current_thread_id()
26     >>> (ppid, pid, tid) = ret.split('/')
27     >>> ppid.isnumeric()
28     True
29     >>> pid.isnumeric()
30     True
31
32     """
33     ppid = os.getppid()
34     pid = os.getpid()
35     tid = threading.current_thread().name
36     return f'{ppid}/{pid}/{tid}:'
37
38
39 def is_current_thread_main_thread() -> bool:
40     """Returns True is the current (calling) thread is the process' main
41     thread and False otherwise.
42
43     >>> is_current_thread_main_thread()
44     True
45
46     >>> result = None
47     >>> def thunk():
48     ...     global result
49     ...     result = is_current_thread_main_thread()
50
51     >>> thunk()
52     >>> result
53     True
54
55     >>> import threading
56     >>> thread = threading.Thread(target=thunk)
57     >>> thread.start()
58     >>> thread.join()
59     >>> result
60     False
61
62     """
63     return threading.current_thread() is threading.main_thread()
64
65
66 def background_thread(
67     _funct: Optional[Callable[..., Any]],
68 ) -> Callable[..., Tuple[threading.Thread, threading.Event]]:
69     """A function decorator to create a background thread.
70
71     *** Please note: the decorated function must take an shutdown ***
72     *** event as an input parameter and should periodically check ***
73     *** it and stop if the event is set.                          ***
74
75     Usage::
76
77         @background_thread
78         def random(a: int, b: str, stop_event: threading.Event) -> None:
79             while True:
80                 print(f"Hi there {b}: {a}!")
81                 time.sleep(10.0)
82                 if stop_event.is_set():
83                     return
84
85         def main() -> None:
86             (thread, event) = random(22, "dude")
87             print("back!")
88             time.sleep(30.0)
89             event.set()
90             thread.join()
91
92     Note: in addition to any other arguments the function has, it must
93     take a stop_event as the last unnamed argument which it should
94     periodically check.  If the event is set, it means the thread has
95     been requested to terminate ASAP.
96     """
97
98     def wrapper(funct: Callable):
99         @functools.wraps(funct)
100         def inner_wrapper(*a, **kwa) -> Tuple[threading.Thread, threading.Event]:
101             should_terminate = threading.Event()
102             should_terminate.clear()
103             newargs = (*a, should_terminate)
104             thread = threading.Thread(
105                 target=funct,
106                 args=newargs,
107                 kwargs=kwa,
108             )
109             thread.start()
110             logger.debug('Started thread "%s" tid=%d', thread.name, thread.ident)
111             return (thread, should_terminate)
112
113         return inner_wrapper
114
115     if _funct is None:
116         return wrapper  # type: ignore
117     else:
118         return wrapper(_funct)
119
120
121 def periodically_invoke(
122     period_sec: float,
123     stop_after: Optional[int],
124 ):
125     """
126     Periodically invoke a decorated function.  Stop after N invocations
127     (or, if stop_after is None, call forever).  Delay period_sec between
128     invocations.
129
130     Returns a Thread object and an Event that, when signaled, will stop
131     the invocations.  Note that it is possible to be invoked one time
132     after the Event is set.  This event can be used to stop infinite
133     invocation style or finite invocation style decorations.::
134
135         @periodically_invoke(period_sec=0.5, stop_after=None)
136         def there(name: str, age: int) -> None:
137             print(f"   ...there {name}, {age}")
138
139         @periodically_invoke(period_sec=1.0, stop_after=3)
140         def hello(name: str) -> None:
141             print(f"Hello, {name}")
142
143     """
144
145     def decorator_repeat(func):
146         def helper_thread(should_terminate, *args, **kwargs) -> None:
147             if stop_after is None:
148                 while True:
149                     func(*args, **kwargs)
150                     res = should_terminate.wait(period_sec)
151                     if res:
152                         return
153             else:
154                 for _ in range(stop_after):
155                     func(*args, **kwargs)
156                     res = should_terminate.wait(period_sec)
157                     if res:
158                         return
159                 return
160
161         @functools.wraps(func)
162         def wrapper_repeat(*args, **kwargs):
163             should_terminate = threading.Event()
164             should_terminate.clear()
165             newargs = (should_terminate, *args)
166             thread = threading.Thread(target=helper_thread, args=newargs, kwargs=kwargs)
167             thread.start()
168             logger.debug('Started thread "%s" tid=%d', thread.name, thread.ident)
169             return (thread, should_terminate)
170
171         return wrapper_repeat
172
173     return decorator_repeat
174
175
176 if __name__ == '__main__':
177     import doctest
178
179     doctest.testmod()