Initial revision
[python_utils.git] / thread_utils.py
1 #!/usr/bin/env python3
2
3 import functools
4 import logging
5 import os
6 import threading
7 from typing import Callable, Optional, Tuple
8
9 logger = logging.getLogger(__name__)
10
11
12 def current_thread_id() -> str:
13     ppid = os.getppid()
14     pid = os.getpid()
15     tid = threading.current_thread().name
16     return f'{ppid}/{pid}/{tid}:'
17
18
19 def is_current_thread_main_thread() -> bool:
20     return threading.current_thread() is threading.main_thread()
21
22
23 def background_thread(
24         _funct: Optional[Callable]
25 ) -> Tuple[threading.Thread, threading.Event]:
26     """A function decorator to create a background thread.
27
28     *** Please note: the decorated function must take an shutdown ***
29     *** event as an input parameter and should periodically check ***
30     *** it and stop if the event is set.                          ***
31
32     Usage:
33
34         @background_thread
35         def random(a: int, b: str, stop_event: threading.Event) -> None:
36             while True:
37                 print(f"Hi there {b}: {a}!")
38                 time.sleep(10.0)
39                 if stop_event.is_set():
40                     return
41
42
43         def main() -> None:
44             (thread, event) = random(22, "dude")
45             print("back!")
46             time.sleep(30.0)
47             event.set()
48             thread.join()
49
50     Note: in addition to any other arguments the function has, it must
51     take a stop_event as the last unnamed argument which it should
52     periodically check.  If the event is set, it means the thread has
53     been requested to terminate ASAP.
54     """
55     def wrapper(funct: Callable):
56         @functools.wraps(funct)
57         def inner_wrapper(
58                 *a, **kwa
59         ) -> Tuple[threading.Thread, threading.Event]:
60             should_terminate = threading.Event()
61             should_terminate.clear()
62             newargs = (*a, should_terminate)
63             thread = threading.Thread(
64                 target=funct,
65                 args=newargs,
66                 kwargs=kwa,
67             )
68             thread.start()
69             logger.debug(
70                 f'Started thread {thread.name} tid={thread.ident}'
71             )
72             return (thread, should_terminate)
73         return inner_wrapper
74
75     if _funct is None:
76         return wrapper
77     else:
78         return wrapper(_funct)
79
80
81 def periodically_invoke(
82         period_sec: float,
83         stop_after: Optional[int],
84 ):
85     """
86     Periodically invoke a decorated function.  Stop after N invocations
87     (or, if stop_after is None, call forever).  Delay period_sec between
88     invocations.
89
90     Returns a Thread object and an Event that, when signaled, will stop
91     the invocations.  Note that it is possible to be invoked one time
92     after the Event is set.  This event can be used to stop infinite
93     invocation style or finite invocation style decorations.
94
95         @periodically_invoke(period_sec=0.5, stop_after=None)
96         def there(name: str, age: int) -> None:
97             print(f"   ...there {name}, {age}")
98
99
100         @periodically_invoke(period_sec=1.0, stop_after=3)
101         def hello(name: str) -> None:
102             print(f"Hello, {name}")
103
104     """
105     def decorator_repeat(func):
106         def helper_thread(should_terminate, *args, **kwargs) -> None:
107             if stop_after is None:
108                 while True:
109                     func(*args, **kwargs)
110                     res = should_terminate.wait(period_sec)
111                     if res:
112                         return
113             else:
114                 for _ in range(stop_after):
115                     func(*args, **kwargs)
116                     res = should_terminate.wait(period_sec)
117                     if res:
118                         return
119                 return
120
121         @functools.wraps(func)
122         def wrapper_repeat(*args, **kwargs):
123             should_terminate = threading.Event()
124             should_terminate.clear()
125             newargs = (should_terminate, *args)
126             thread = threading.Thread(
127                 target=helper_thread,
128                 args = newargs,
129                 kwargs = kwargs
130             )
131             thread.start()
132             logger.debug(
133                 f'Started thread {thread.name} tid={thread.ident}'
134             )
135             return (thread, should_terminate)
136         return wrapper_repeat
137     return decorator_repeat