Easier and more self documenting patterns for loading/saving Persistent
[python_utils.git] / thread_utils.py
1 #!/usr/bin/env python3
2
3 # © Copyright 2021-2022, Scott Gasch
4
5 """Utilities for dealing with threads + threading."""
6
7 import functools
8 import logging
9 import os
10 import threading
11 from typing import Any, Callable, Optional, Tuple
12
13 # This module is commonly used by others in here and should avoid
14 # taking any unnecessary dependencies back on them.
15
16 logger = logging.getLogger(__name__)
17
18
19 def current_thread_id() -> str:
20     """
21     Returns:
22         a string composed of the parent process' id, the current
23         process' id and the current thread identifier.  The former two are
24         numbers (pids) whereas the latter is a thread id passed during thread
25         creation time.
26
27     >>> ret = current_thread_id()
28     >>> (ppid, pid, tid) = ret.split('/')
29     >>> ppid.isnumeric()
30     True
31     >>> pid.isnumeric()
32     True
33
34     """
35     ppid = os.getppid()
36     pid = os.getpid()
37     tid = threading.current_thread().name
38     return f'{ppid}/{pid}/{tid}:'
39
40
41 def is_current_thread_main_thread() -> bool:
42     """
43     Returns:
44         True is the current (calling) thread is the process' main
45         thread and False otherwise.
46
47     >>> is_current_thread_main_thread()
48     True
49
50     >>> result = None
51     >>> def thunk():
52     ...     global result
53     ...     result = is_current_thread_main_thread()
54
55     >>> thunk()
56     >>> result
57     True
58
59     >>> import threading
60     >>> thread = threading.Thread(target=thunk)
61     >>> thread.start()
62     >>> thread.join()
63     >>> result
64     False
65
66     """
67     return threading.current_thread() is threading.main_thread()
68
69
70 def background_thread(
71     _funct: Optional[Callable[..., Any]],
72 ) -> Callable[..., Tuple[threading.Thread, threading.Event]]:
73     """A function decorator to create a background thread.
74
75     Usage::
76
77         @background_thread
78         def random(a: int, b: str, stop_event: threading.Event) -> None:
79             while True:
80                 print(f"Hi there {b}: {a}!")
81                 time.sleep(10.0)
82                 if stop_event.is_set():
83                     return
84
85         def main() -> None:
86             (thread, event) = random(22, "dude")
87             print("back!")
88             time.sleep(30.0)
89             event.set()
90             thread.join()
91
92     .. warning::
93
94         In addition to any other arguments the function has, it must
95         take a stop_event as the last unnamed argument which it should
96         periodically check.  If the event is set, it means the thread has
97         been requested to terminate ASAP.
98     """
99
100     def wrapper(funct: Callable):
101         @functools.wraps(funct)
102         def inner_wrapper(*a, **kwa) -> Tuple[threading.Thread, threading.Event]:
103             should_terminate = threading.Event()
104             should_terminate.clear()
105             newargs = (*a, should_terminate)
106             thread = threading.Thread(
107                 target=funct,
108                 args=newargs,
109                 kwargs=kwa,
110             )
111             thread.start()
112             logger.debug('Started thread "%s" tid=%d', thread.name, thread.ident)
113             return (thread, should_terminate)
114
115         return inner_wrapper
116
117     if _funct is None:
118         return wrapper  # type: ignore
119     else:
120         return wrapper(_funct)
121
122
123 class ThreadWithReturnValue(threading.Thread):
124     """A thread whose return value is plumbed back out as the return
125     value of :meth:`join`.
126     """
127
128     def __init__(self, group=None, target=None, name=None, args=(), kwargs={}, Verbose=None):
129         threading.Thread.__init__(
130             self, group=None, target=target, name=None, args=args, kwargs=kwargs
131         )
132         self._target = target
133         self._return = None
134
135     def run(self):
136         if self._target is not None:
137             self._return = self._target(*self._args, **self._kwargs)
138
139     def join(self, *args):
140         threading.Thread.join(self, *args)
141         return self._return
142
143
144 def periodically_invoke(
145     period_sec: float,
146     stop_after: Optional[int],
147 ):
148     """
149     Periodically invoke the decorated function.
150
151     Args:
152         period_sec: the delay period in seconds between invocations
153         stop_after: total number of invocations to make or, if None,
154             call forever
155
156     Returns:
157         a :class:`Thread` object and an :class:`Event` that, when
158         signaled, will stop the invocations.
159
160     .. note::
161         It is possible to be invoked one time after the :class:`Event`
162         is set.  This event can be used to stop infinite
163         invocation style or finite invocation style decorations.
164
165     Usage::
166
167         @periodically_invoke(period_sec=0.5, stop_after=None)
168         def there(name: str, age: int) -> None:
169             print(f"   ...there {name}, {age}")
170
171         @periodically_invoke(period_sec=1.0, stop_after=3)
172         def hello(name: str) -> None:
173             print(f"Hello, {name}")
174     """
175
176     def decorator_repeat(func):
177         def helper_thread(should_terminate, *args, **kwargs) -> None:
178             if stop_after is None:
179                 while True:
180                     func(*args, **kwargs)
181                     res = should_terminate.wait(period_sec)
182                     if res:
183                         return
184             else:
185                 for _ in range(stop_after):
186                     func(*args, **kwargs)
187                     res = should_terminate.wait(period_sec)
188                     if res:
189                         return
190                 return
191
192         @functools.wraps(func)
193         def wrapper_repeat(*args, **kwargs):
194             should_terminate = threading.Event()
195             should_terminate.clear()
196             newargs = (should_terminate, *args)
197             thread = threading.Thread(target=helper_thread, args=newargs, kwargs=kwargs)
198             thread.start()
199             logger.debug('Started thread "%s" tid=%d', thread.name, thread.ident)
200             return (thread, should_terminate)
201
202         return wrapper_repeat
203
204     return decorator_repeat
205
206
207 if __name__ == '__main__':
208     import doctest
209
210     doctest.testmod()