More docs cleanup.
[python_utils.git] / thread_utils.py
1 #!/usr/bin/env python3
2
3 # © Copyright 2021-2022, Scott Gasch
4
5 """Utilities for dealing with threads + threading."""
6
7 import functools
8 import logging
9 import os
10 import threading
11 from typing import Any, Callable, Optional, Tuple
12
13 # This module is commonly used by others in here and should avoid
14 # taking any unnecessary dependencies back on them.
15
16 logger = logging.getLogger(__name__)
17
18
19 def current_thread_id() -> str:
20     """
21     Returns:
22         a string composed of the parent process' id, the current
23         process' id and the current thread identifier.  The former two are
24         numbers (pids) whereas the latter is a thread id passed during thread
25         creation time.
26
27     >>> ret = current_thread_id()
28     >>> (ppid, pid, tid) = ret.split('/')
29     >>> ppid.isnumeric()
30     True
31     >>> pid.isnumeric()
32     True
33
34     """
35     ppid = os.getppid()
36     pid = os.getpid()
37     tid = threading.current_thread().name
38     return f'{ppid}/{pid}/{tid}:'
39
40
41 def is_current_thread_main_thread() -> bool:
42     """
43     Returns:
44         True is the current (calling) thread is the process' main
45         thread and False otherwise.
46
47     >>> is_current_thread_main_thread()
48     True
49
50     >>> result = None
51     >>> def thunk():
52     ...     global result
53     ...     result = is_current_thread_main_thread()
54
55     >>> thunk()
56     >>> result
57     True
58
59     >>> import threading
60     >>> thread = threading.Thread(target=thunk)
61     >>> thread.start()
62     >>> thread.join()
63     >>> result
64     False
65
66     """
67     return threading.current_thread() is threading.main_thread()
68
69
70 def background_thread(
71     _funct: Optional[Callable[..., Any]],
72 ) -> Callable[..., Tuple[threading.Thread, threading.Event]]:
73     """A function decorator to create a background thread.
74
75     Usage::
76
77         @background_thread
78         def random(a: int, b: str, stop_event: threading.Event) -> None:
79             while True:
80                 print(f"Hi there {b}: {a}!")
81                 time.sleep(10.0)
82                 if stop_event.is_set():
83                     return
84
85         def main() -> None:
86             (thread, event) = random(22, "dude")
87             print("back!")
88             time.sleep(30.0)
89             event.set()
90             thread.join()
91
92     .. warning::
93
94         In addition to any other arguments the function has, it must
95         take a stop_event as the last unnamed argument which it should
96         periodically check.  If the event is set, it means the thread has
97         been requested to terminate ASAP.
98     """
99
100     def wrapper(funct: Callable):
101         @functools.wraps(funct)
102         def inner_wrapper(*a, **kwa) -> Tuple[threading.Thread, threading.Event]:
103             should_terminate = threading.Event()
104             should_terminate.clear()
105             newargs = (*a, should_terminate)
106             thread = threading.Thread(
107                 target=funct,
108                 args=newargs,
109                 kwargs=kwa,
110             )
111             thread.start()
112             logger.debug('Started thread "%s" tid=%d', thread.name, thread.ident)
113             return (thread, should_terminate)
114
115         return inner_wrapper
116
117     if _funct is None:
118         return wrapper  # type: ignore
119     else:
120         return wrapper(_funct)
121
122
123 def periodically_invoke(
124     period_sec: float,
125     stop_after: Optional[int],
126 ):
127     """
128     Periodically invoke the decorated function.
129
130     Args:
131         period_sec: the delay period in seconds between invocations
132         stop_after: total number of invocations to make or, if None,
133             call forever
134
135     Returns:
136         a :class:`Thread` object and an :class:`Event` that, when
137         signaled, will stop the invocations.
138
139     .. note::
140         It is possible to be invoked one time after the :class:`Event`
141         is set.  This event can be used to stop infinite
142         invocation style or finite invocation style decorations.
143
144     Usage::
145
146         @periodically_invoke(period_sec=0.5, stop_after=None)
147         def there(name: str, age: int) -> None:
148             print(f"   ...there {name}, {age}")
149
150         @periodically_invoke(period_sec=1.0, stop_after=3)
151         def hello(name: str) -> None:
152             print(f"Hello, {name}")
153     """
154
155     def decorator_repeat(func):
156         def helper_thread(should_terminate, *args, **kwargs) -> None:
157             if stop_after is None:
158                 while True:
159                     func(*args, **kwargs)
160                     res = should_terminate.wait(period_sec)
161                     if res:
162                         return
163             else:
164                 for _ in range(stop_after):
165                     func(*args, **kwargs)
166                     res = should_terminate.wait(period_sec)
167                     if res:
168                         return
169                 return
170
171         @functools.wraps(func)
172         def wrapper_repeat(*args, **kwargs):
173             should_terminate = threading.Event()
174             should_terminate.clear()
175             newargs = (should_terminate, *args)
176             thread = threading.Thread(target=helper_thread, args=newargs, kwargs=kwargs)
177             thread.start()
178             logger.debug('Started thread "%s" tid=%d', thread.name, thread.ident)
179             return (thread, should_terminate)
180
181         return wrapper_repeat
182
183     return decorator_repeat
184
185
186 if __name__ == '__main__':
187     import doctest
188
189     doctest.testmod()