Easier and more self documenting patterns for loading/saving Persistent
[python_utils.git] / state_tracker.py
1 #!/usr/bin/env python3
2
3 # © Copyright 2021-2022, Scott Gasch
4
5 """Several helpers to keep track of internal state via periodic
6 polling.  :class:`StateTracker` expects to be invoked periodically to
7 maintain state whereas the others (:class:`AutomaticStateTracker` and
8 :class:`WaitableAutomaticStateTracker`) automatically update themselves
9 and, optionally, expose an event for client code to wait on state
10 changes.
11 """
12
13 import datetime
14 import logging
15 import threading
16 import time
17 from abc import ABC, abstractmethod
18 from typing import Dict, Optional
19
20 import pytz
21
22 from thread_utils import background_thread
23
24 logger = logging.getLogger(__name__)
25
26
27 class StateTracker(ABC):
28     """A base class that maintains and updates a global state via an
29     update routine.  Instances of this class should be periodically
30     invoked via the heartbeat() method.  This method, in turn, invokes
31     update() with update_ids according to a schedule / periodicity
32     provided to the c'tor.
33     """
34
35     def __init__(self, update_ids_to_update_secs: Dict[str, float]) -> None:
36         """The update_ids_to_update_secs dict parameter describes one or more
37         update types (unique update_ids) and the periodicity(ies), in
38         seconds, at which it/they should be invoked.
39
40         .. note::
41             When more than one update is overdue, they will be
42             invoked in order by their update_ids so care in choosing these
43             identifiers may be in order.
44
45         Args:
46             update_ids_to_update_secs: a dict mapping a user-defined
47                 update_id into a period (number of seconds) with which
48                 we would like this update performed.  e.g.::
49
50                     update_ids_to_update_secs = {
51                         'refresh_local_state': 10.0,
52                         'refresh_remote_state': 60.0,
53                     }
54
55                 This would indicate that every 10s we would like to
56                 refresh local state whereas every 60s we'd like to
57                 refresh remote state.
58         """
59         self.update_ids_to_update_secs = update_ids_to_update_secs
60         self.last_reminder_ts: Dict[str, Optional[datetime.datetime]] = {}
61         self.now: Optional[datetime.datetime] = None
62         for x in update_ids_to_update_secs.keys():
63             self.last_reminder_ts[x] = None
64
65     @abstractmethod
66     def update(
67         self,
68         update_id: str,
69         now: datetime.datetime,
70         last_invocation: Optional[datetime.datetime],
71     ) -> None:
72         """Put whatever you want here to perform your state updates.
73
74         Args:
75             update_id: the string you passed to the c'tor as a key in
76                 the update_ids_to_update_secs dict.  :meth:`update` will
77                 only be invoked on the shoulder, at most, every update_secs
78                 seconds.
79
80             now: the approximate current timestamp at invocation time.
81
82             last_invocation: the last time this operation was invoked
83                 (or None on the first invocation).
84         """
85         pass
86
87     def heartbeat(self, *, force_all_updates_to_run: bool = False) -> None:
88         """Invoke this method to cause the StateTracker instance to identify
89         and invoke any overdue updates based on the schedule passed to
90         the c'tor.  In the base :class:`StateTracker` class, this method must
91         be invoked manually by a thread from external code.  Other subclasses
92         are available that create their own updater threads (see below).
93
94         If more than one type of update (update_id) are overdue,
95         they will be invoked in order based on their update_ids.
96
97         Setting force_all_updates_to_run will invoke all updates
98         (ordered by update_id) immediately ignoring whether or not
99         they are due.
100         """
101
102         self.now = datetime.datetime.now(tz=pytz.timezone("US/Pacific"))
103         for update_id in sorted(self.last_reminder_ts.keys()):
104             if force_all_updates_to_run:
105                 logger.debug('Forcing all updates to run')
106                 self.update(update_id, self.now, self.last_reminder_ts[update_id])
107                 self.last_reminder_ts[update_id] = self.now
108                 return
109
110             refresh_secs = self.update_ids_to_update_secs[update_id]
111             last_run = self.last_reminder_ts[update_id]
112             if last_run is None:  # Never run before
113                 logger.debug('id %s has never been run; running it now', update_id)
114                 self.update(update_id, self.now, self.last_reminder_ts[update_id])
115                 self.last_reminder_ts[update_id] = self.now
116             else:
117                 delta = self.now - last_run
118                 if delta.total_seconds() >= refresh_secs:  # Is overdue?
119                     logger.debug('id %s is overdue; running it now', update_id)
120                     self.update(
121                         update_id,
122                         self.now,
123                         self.last_reminder_ts[update_id],
124                     )
125                     self.last_reminder_ts[update_id] = self.now
126
127
128 class AutomaticStateTracker(StateTracker):
129     """Just like :class:`StateTracker` but you don't need to pump the
130     :meth:`heartbeat` method periodically because we create a background
131     thread that manages periodic calling.  You must call :meth:`shutdown`,
132     though, in order to terminate the update thread.
133     """
134
135     @background_thread
136     def pace_maker(self, should_terminate: threading.Event) -> None:
137         """Entry point for a background thread to own calling :meth:`heartbeat`
138         at regular intervals so that the main thread doesn't need to
139         do so.
140         """
141         while True:
142             if should_terminate.is_set():
143                 logger.debug('pace_maker noticed event; shutting down')
144                 return
145             self.heartbeat()
146             logger.debug('pace_maker is sleeping for %.1fs', self.sleep_delay)
147             time.sleep(self.sleep_delay)
148
149     def __init__(
150         self,
151         update_ids_to_update_secs: Dict[str, float],
152         *,
153         override_sleep_delay: Optional[float] = None,
154     ) -> None:
155         """Construct an AutomaticStateTracker.
156
157         Args:
158             update_ids_to_update_secs: a dict mapping a user-defined
159                 update_id into a period (number of seconds) with which
160                 we would like this update performed.  e.g.::
161
162                     update_ids_to_update_secs = {
163                         'refresh_local_state': 10.0,
164                         'refresh_remote_state': 60.0,
165                     }
166
167                 This would indicate that every 10s we would like to
168                 refresh local state whereas every 60s we'd like to
169                 refresh remote state.
170
171             override_sleep_delay: By default, this class determines
172                 how long the background thread should sleep between
173                 automatic invocations to :meth:`heartbeat` based on the
174                 period of each update type in update_ids_to_update_secs.
175                 If this argument is non-None, it overrides this computation
176                 and uses this period as the sleep in the background thread.
177         """
178         import math_utils
179
180         super().__init__(update_ids_to_update_secs)
181         if override_sleep_delay is not None:
182             logger.debug('Overriding sleep delay to %.1f', override_sleep_delay)
183             self.sleep_delay = override_sleep_delay
184         else:
185             periods_list = list(update_ids_to_update_secs.values())
186             self.sleep_delay = math_utils.gcd_float_sequence(periods_list)
187             logger.info('Computed sleep_delay=%.1f', self.sleep_delay)
188         (thread, stop_event) = self.pace_maker()
189         self.should_terminate = stop_event
190         self.updater_thread = thread
191
192     def shutdown(self):
193         """Terminates the background thread and waits for it to tear down.
194         This may block for as long as self.sleep_delay.
195         """
196         logger.debug('Setting shutdown event and waiting for background thread.')
197         self.should_terminate.set()
198         self.updater_thread.join()
199         logger.debug('Background thread terminated.')
200
201
202 class WaitableAutomaticStateTracker(AutomaticStateTracker):
203     """This is an AutomaticStateTracker that exposes a wait method which
204     will block the calling thread until the state changes with an
205     optional timeout.  The caller should check the return value of
206     wait; it will be true if something changed and false if the wait
207     simply timed out.  If the return value is true, the instance
208     should be reset() before wait is called again.
209
210     Example usage::
211
212         detector = waitable_presence.WaitableAutomaticStateSubclass()
213         while True:
214             changed = detector.wait(timeout=60 * 5)
215             if changed:
216                 detector.reset()
217                 # Figure out what changed and react
218             else:
219                 # Just a timeout; no need to reset.  Maybe do something
220                 # else before looping up into wait again.
221     """
222
223     def __init__(
224         self,
225         update_ids_to_update_secs: Dict[str, float],
226         *,
227         override_sleep_delay: Optional[float] = None,
228     ) -> None:
229         """Construct an WaitableAutomaticStateTracker.
230
231         Args:
232             update_ids_to_update_secs: a dict mapping a user-defined
233                 update_id into a period (number of seconds) with which
234                 we would like this update performed.  e.g.::
235
236                     update_ids_to_update_secs = {
237                         'refresh_local_state': 10.0,
238                         'refresh_remote_state': 60.0,
239                     }
240
241                 This would indicate that every 10s we would like to
242                 refresh local state whereas every 60s we'd like to
243                 refresh remote state.
244
245             override_sleep_delay: By default, this class determines
246                 how long the background thread should sleep between
247                 automatic invocations to :meth:`heartbeat` based on the
248                 period of each update type in update_ids_to_update_secs.
249                 If this argument is non-None, it overrides this computation
250                 and uses this period as the sleep in the background thread.
251         """
252         self._something_changed = threading.Event()
253         super().__init__(update_ids_to_update_secs, override_sleep_delay=override_sleep_delay)
254
255     def something_changed(self):
256         """Indicate that something has changed."""
257         self._something_changed.set()
258
259     def did_something_change(self) -> bool:
260         """Indicate whether some state has changed in the background."""
261         return self._something_changed.is_set()
262
263     def reset(self):
264         """Call to clear the 'something changed' bit.  See usage above."""
265         self._something_changed.clear()
266
267     def wait(self, *, timeout=None):
268         """Wait for something to change or a timeout to lapse.
269
270         Args:
271             timeout: maximum amount of time to wait.  If None, wait
272                 forever (until something changes).
273         """
274         return self._something_changed.wait(timeout=timeout)