Make smart futures avoid polling.
[python_utils.git] / state_tracker.py
1 #!/usr/bin/env python3
2
3 from abc import ABC, abstractmethod
4 import datetime
5 import logging
6 import time
7 import threading
8 from typing import Dict, Optional
9
10 import pytz
11
12 from thread_utils import background_thread
13
14 logger = logging.getLogger(__name__)
15
16
17 class StateTracker(ABC):
18     """A base class that maintains and updates a global state via an
19     update routine.  Instances of this class should be periodically
20     invoked via the heartbeat() method.  This method, in turn, invokes
21     update() with update_ids according to a schedule / periodicity
22     provided to the c'tor.
23     """
24
25     def __init__(self, update_ids_to_update_secs: Dict[str, float]) -> None:
26         """The update_ids_to_update_secs dict parameter describes one or more
27         update types (unique update_ids) and the periodicity(ies), in
28         seconds, at which it/they should be invoked.
29
30         Note that, when more than one update is overdue, they will be
31         invoked in order by their update_ids so care in choosing these
32         identifiers may be in order.
33         """
34         self.update_ids_to_update_secs = update_ids_to_update_secs
35         self.last_reminder_ts: Dict[str, Optional[datetime.datetime]] = {}
36         for x in update_ids_to_update_secs.keys():
37             self.last_reminder_ts[x] = None
38
39     @abstractmethod
40     def update(
41         self,
42         update_id: str,
43         now: datetime.datetime,
44         last_invocation: Optional[datetime.datetime],
45     ) -> None:
46         """Put whatever you want here.  The update_id will be the string
47         passed to the c'tor as a key in the Dict.  It will only be
48         tapped on the shoulder, at most, every update_secs seconds.
49         The now param is the approximate current timestamp and the
50         last_invocation param is the last time you were invoked (or
51         None on the first invocation)
52         """
53         pass
54
55     def heartbeat(self, *, force_all_updates_to_run: bool = False) -> None:
56         """Invoke this method to cause the StateTracker instance to identify
57         and invoke any overdue updates based on the schedule passed to
58         the c'tor.  In the base StateTracker class, this method must
59         be invoked manually with a thread from external code.
60
61         If more than one type of update (update_id) are overdue,
62         they will be invoked in order based on their update_ids.
63
64         Setting force_all_updates_to_run will invoke all updates
65         (ordered by update_id) immediately ignoring whether or not
66         they are due.
67         """
68         self.now = datetime.datetime.now(tz=pytz.timezone("US/Pacific"))
69         for update_id in sorted(self.last_reminder_ts.keys()):
70             if force_all_updates_to_run:
71                 logger.debug('Forcing all updates to run')
72                 self.update(
73                     update_id, self.now, self.last_reminder_ts[update_id]
74                 )
75                 self.last_reminder_ts[update_id] = self.now
76                 return
77
78             refresh_secs = self.update_ids_to_update_secs[update_id]
79             last_run = self.last_reminder_ts[update_id]
80             if last_run is None:  # Never run before
81                 logger.debug(
82                     f'id {update_id} has never been run; running it now'
83                 )
84                 self.update(
85                     update_id, self.now, self.last_reminder_ts[update_id]
86                 )
87                 self.last_reminder_ts[update_id] = self.now
88             else:
89                 delta = self.now - last_run
90                 if delta.total_seconds() >= refresh_secs:  # Is overdue?
91                     logger.debug(f'id {update_id} is overdue; running it now')
92                     self.update(
93                         update_id,
94                         self.now,
95                         self.last_reminder_ts[update_id],
96                     )
97                     self.last_reminder_ts[update_id] = self.now
98
99
100 class AutomaticStateTracker(StateTracker):
101     """Just like HeartbeatCurrentState but you don't need to pump the
102     heartbeat; it runs on a background thread.  Call .shutdown() to
103     terminate the updates.
104     """
105
106     @background_thread
107     def pace_maker(self, should_terminate) -> None:
108         """Entry point for a background thread to own calling heartbeat()
109         at regular intervals so that the main thread doesn't need to do
110         so."""
111         while True:
112             if should_terminate.is_set():
113                 logger.debug('pace_maker noticed event; shutting down')
114                 return
115             self.heartbeat()
116             logger.debug(f'pace_maker is sleeping for {self.sleep_delay}s')
117             time.sleep(self.sleep_delay)
118
119     def __init__(
120         self,
121         update_ids_to_update_secs: Dict[str, float],
122         *,
123         override_sleep_delay: Optional[float] = None,
124     ) -> None:
125         import math_utils
126         super().__init__(update_ids_to_update_secs)
127         if override_sleep_delay is not None:
128             logger.debug(f'Overriding sleep delay to {override_sleep_delay}')
129             self.sleep_delay = override_sleep_delay
130         else:
131             periods_list = list(update_ids_to_update_secs.values())
132             self.sleep_delay = math_utils.gcd_float_sequence(periods_list)
133             logger.info(f'Computed sleep_delay={self.sleep_delay}')
134         (thread, stop_event) = self.pace_maker()
135         self.should_terminate = stop_event
136         self.updater_thread = thread
137
138     def shutdown(self):
139         """Terminates the background thread and waits for it to tear down.
140         This may block for as long as self.sleep_delay.
141         """
142         logger.debug(
143             'Setting shutdown event and waiting for background thread.'
144         )
145         self.should_terminate.set()
146         self.updater_thread.join()
147         logger.debug('Background thread terminated.')
148
149
150 class WaitableAutomaticStateTracker(AutomaticStateTracker):
151
152     def __init__(
153             self,
154             update_ids_to_update_secs: Dict[str, float],
155             *,
156             override_sleep_delay: Optional[float] = None,
157     ) -> None:
158         self._something_changed = threading.Event()
159         super().__init__(update_ids_to_update_secs,
160                          override_sleep_delay=override_sleep_delay)
161
162     def something_changed(self):
163         self._something_changed.set()
164
165     def did_something_change(self) -> bool:
166         return self._something_changed.is_set()
167
168     def reset(self):
169         self._something_changed.clear()
170
171     def wait(self,
172              *,
173              timeout=None):
174         return self._something_changed.wait(
175             timeout=timeout
176         )