Make smart futures avoid polling.
[python_utils.git] / thread_utils.py
1 #!/usr/bin/env python3
2
3 import functools
4 import logging
5 import os
6 import threading
7 from typing import Callable, Optional, Tuple
8
9 # This module is commonly used by others in here and should avoid
10 # taking any unnecessary dependencies back on them.
11
12 logger = logging.getLogger(__name__)
13
14
15 def current_thread_id() -> str:
16     ppid = os.getppid()
17     pid = os.getpid()
18     tid = threading.current_thread().name
19     return f'{ppid}/{pid}/{tid}:'
20
21
22 def is_current_thread_main_thread() -> bool:
23     """Returns True is the current (calling) thread is the process' main
24     thread and False otherwise.
25     """
26     return threading.current_thread() is threading.main_thread()
27
28
29 def background_thread(
30         _funct: Optional[Callable]
31 ) -> Tuple[threading.Thread, threading.Event]:
32     """A function decorator to create a background thread.
33
34     *** Please note: the decorated function must take an shutdown ***
35     *** event as an input parameter and should periodically check ***
36     *** it and stop if the event is set.                          ***
37
38     Usage:
39
40         @background_thread
41         def random(a: int, b: str, stop_event: threading.Event) -> None:
42             while True:
43                 print(f"Hi there {b}: {a}!")
44                 time.sleep(10.0)
45                 if stop_event.is_set():
46                     return
47
48
49         def main() -> None:
50             (thread, event) = random(22, "dude")
51             print("back!")
52             time.sleep(30.0)
53             event.set()
54             thread.join()
55
56     Note: in addition to any other arguments the function has, it must
57     take a stop_event as the last unnamed argument which it should
58     periodically check.  If the event is set, it means the thread has
59     been requested to terminate ASAP.
60     """
61     def wrapper(funct: Callable):
62         @functools.wraps(funct)
63         def inner_wrapper(
64                 *a, **kwa
65         ) -> Tuple[threading.Thread, threading.Event]:
66             should_terminate = threading.Event()
67             should_terminate.clear()
68             newargs = (*a, should_terminate)
69             thread = threading.Thread(
70                 target=funct,
71                 args=newargs,
72                 kwargs=kwa,
73             )
74             thread.start()
75             logger.debug(
76                 f'Started thread {thread.name} tid={thread.ident}'
77             )
78             return (thread, should_terminate)
79         return inner_wrapper
80
81     if _funct is None:
82         return wrapper
83     else:
84         return wrapper(_funct)
85
86
87 def periodically_invoke(
88         period_sec: float,
89         stop_after: Optional[int],
90 ):
91     """
92     Periodically invoke a decorated function.  Stop after N invocations
93     (or, if stop_after is None, call forever).  Delay period_sec between
94     invocations.
95
96     Returns a Thread object and an Event that, when signaled, will stop
97     the invocations.  Note that it is possible to be invoked one time
98     after the Event is set.  This event can be used to stop infinite
99     invocation style or finite invocation style decorations.
100
101         @periodically_invoke(period_sec=0.5, stop_after=None)
102         def there(name: str, age: int) -> None:
103             print(f"   ...there {name}, {age}")
104
105
106         @periodically_invoke(period_sec=1.0, stop_after=3)
107         def hello(name: str) -> None:
108             print(f"Hello, {name}")
109
110     """
111     def decorator_repeat(func):
112         def helper_thread(should_terminate, *args, **kwargs) -> None:
113             if stop_after is None:
114                 while True:
115                     func(*args, **kwargs)
116                     res = should_terminate.wait(period_sec)
117                     if res:
118                         return
119             else:
120                 for _ in range(stop_after):
121                     func(*args, **kwargs)
122                     res = should_terminate.wait(period_sec)
123                     if res:
124                         return
125                 return
126
127         @functools.wraps(func)
128         def wrapper_repeat(*args, **kwargs):
129             should_terminate = threading.Event()
130             should_terminate.clear()
131             newargs = (should_terminate, *args)
132             thread = threading.Thread(
133                 target=helper_thread,
134                 args = newargs,
135                 kwargs = kwargs
136             )
137             thread.start()
138             logger.debug(
139                 f'Started thread {thread.name} tid={thread.ident}'
140             )
141             return (thread, should_terminate)
142         return wrapper_repeat
143     return decorator_repeat