ce2e6c846221c24654b29539bf873ce3a76539e0
[pyutils.git] / src / pyutils / state_tracker.py
1 #!/usr/bin/env python3
2
3 # © Copyright 2021-2022, Scott Gasch
4
5 """This module defines several classes (:py:class:`StateTracker`,
6 :py:class:`AutomaticStateTracker`, and
7 :py:class:`WaitableAutomaticStateTracker`) that can be used as base
8 classes by your code.  These class patterns are meant to encapsulate
9 and represent some state that dynamically changes and must be updated
10 periodically.  These classes update their state (either automatically
11 or when invoked to poll) and allow their callers to wait on state
12 changes.
13 """
14
15 import datetime
16 import logging
17 import threading
18 import time
19 from abc import ABC, abstractmethod
20 from typing import Dict, Optional
21
22 import pytz
23
24 from pyutils.parallelize.thread_utils import background_thread
25
26 logger = logging.getLogger(__name__)
27
28
29 class StateTracker(ABC):
30     """A base class that maintains and updates its state via an update
31     routine called :meth:`heartbeat`.  This method is not automatic:
32     instances of this class should be periodically invoked via their
33     :meth:`heartbeat` method by some other thread.
34
35     See also :class:`AutomaticStateTracker` if you'd rather not have
36     to invoke your code regularly.
37     """
38
39     def __init__(self, update_ids_to_update_secs: Dict[str, float]) -> None:
40         """The update_ids_to_update_secs dict parameter describes one
41         or more update types (unique update_ids) and the
42         periodicity(ies), in seconds, at which it/they should be
43         invoked.
44
45         .. note::
46             When more than one update is overdue, they will be
47             invoked in order by their update_ids so care in choosing these
48             identifiers may be in order.
49
50         Args:
51             update_ids_to_update_secs: a dict mapping a user-defined
52                 update_id into a period (number of seconds) with which
53                 we would like this update performed.  e.g.::
54
55                     update_ids_to_update_secs = {
56                         'refresh_local_state': 10.0,
57                         'refresh_remote_state': 60.0,
58                     }
59
60                 This would indicate that every 10s we would like to
61                 refresh local state whereas every 60s we'd like to
62                 refresh remote state.
63
64         """
65         self.update_ids_to_update_secs = update_ids_to_update_secs
66         self.last_reminder_ts: Dict[str, Optional[datetime.datetime]] = {}
67         self.now: Optional[datetime.datetime] = None
68         for x in update_ids_to_update_secs.keys():
69             self.last_reminder_ts[x] = None
70
71     @abstractmethod
72     def update(
73         self,
74         update_id: str,
75         now: datetime.datetime,
76         last_invocation: Optional[datetime.datetime],
77     ) -> None:
78         """Put whatever you want here to perform your state updates.
79
80         Args:
81             update_id: the string you passed to the c'tor as a key in
82                 the update_ids_to_update_secs dict.  :meth:`update` will
83                 only be invoked, at most, every update_secs
84                 seconds.
85
86             now: the approximate current timestamp at invocation time.
87
88             last_invocation: the last time this operation was invoked
89                 (or None on the first invocation).
90         """
91         pass
92
93     def heartbeat(self, *, force_all_updates_to_run: bool = False) -> None:
94         """Invoke this method periodically to cause the :class:`StateTracker`
95         instance to identify and invoke any overdue updates based on the
96         schedule passed to the c'tor.  In the base :class:`StateTracker` class,
97         this method must be invoked manually by a thread from external code.
98         Other subclasses (e.g. :class:`AutomaticStateTracker`) are available
99         that create their own updater threads (see below).
100
101         If more than one type of update (`update_id`) is overdue,
102         overdue updates will be invoked in order based on their `update_id`.
103
104         Setting `force_all_updates_to_run` will invoke all updates
105         (ordered by `update_id`) immediately ignoring whether or not
106         they are due.
107         """
108
109         self.now = datetime.datetime.now(tz=pytz.timezone("US/Pacific"))
110         for update_id in sorted(self.last_reminder_ts.keys()):
111             if force_all_updates_to_run:
112                 logger.debug('Forcing all updates to run')
113                 self.update(update_id, self.now, self.last_reminder_ts[update_id])
114                 self.last_reminder_ts[update_id] = self.now
115                 return
116
117             refresh_secs = self.update_ids_to_update_secs[update_id]
118             last_run = self.last_reminder_ts[update_id]
119             if last_run is None:  # Never run before
120                 logger.debug('id %s has never been run; running it now', update_id)
121                 self.update(update_id, self.now, self.last_reminder_ts[update_id])
122                 self.last_reminder_ts[update_id] = self.now
123             else:
124                 delta = self.now - last_run
125                 if delta.total_seconds() >= refresh_secs:  # Is overdue?
126                     logger.debug('id %s is overdue; running it now', update_id)
127                     self.update(
128                         update_id,
129                         self.now,
130                         self.last_reminder_ts[update_id],
131                     )
132                     self.last_reminder_ts[update_id] = self.now
133
134
135 class AutomaticStateTracker(StateTracker):
136     """Just like :class:`StateTracker` but you don't need to pump the
137     :meth:`heartbeat` method periodically because we create a background
138     thread that manages periodic calling.  You must call :meth:`shutdown`,
139     though, in order to terminate the update thread.
140     """
141
142     @background_thread
143     def _pace_maker(self, should_terminate: threading.Event) -> None:
144         """Entry point for a background thread to own calling :meth:`heartbeat`
145         at regular intervals so that the main thread doesn't need to
146         do so.
147
148         Args:
149             should_terminate: an event which, when set, indicates we should terminate.
150         """
151         while True:
152             if should_terminate.is_set():
153                 logger.debug('_pace_maker noticed event; shutting down')
154                 return
155             self.heartbeat()
156             logger.debug('_pace_maker is sleeping for %.1fs', self.sleep_delay)
157             time.sleep(self.sleep_delay)
158
159     def __init__(
160         self,
161         update_ids_to_update_secs: Dict[str, float],
162         *,
163         override_sleep_delay: Optional[float] = None,
164     ) -> None:
165         """Construct an AutomaticStateTracker.
166
167         Args:
168             update_ids_to_update_secs: a dict mapping a user-defined
169                 update_id into a period (number of seconds) with which
170                 we would like this update performed.  e.g.::
171
172                     update_ids_to_update_secs = {
173                         'refresh_local_state': 10.0,
174                         'refresh_remote_state': 60.0,
175                     }
176
177                 This would indicate that every 10s we would like to
178                 refresh local state whereas every 60s we'd like to
179                 refresh remote state.
180
181             override_sleep_delay: By default, this class determines
182                 how long the background thread should sleep between
183                 automatic invocations to :meth:`heartbeat` based on the
184                 period of each update type in update_ids_to_update_secs.
185                 If this argument is non-None, it overrides this computation
186                 and uses this period as the sleep in the background thread.
187         """
188         from pyutils import math_utils
189
190         super().__init__(update_ids_to_update_secs)
191         if override_sleep_delay is not None:
192             logger.debug('Overriding sleep delay to %.1f', override_sleep_delay)
193             self.sleep_delay = override_sleep_delay
194         else:
195             periods_list = list(update_ids_to_update_secs.values())
196             self.sleep_delay = math_utils.gcd_float_sequence(periods_list)
197             logger.info('Computed sleep_delay=%.1f', self.sleep_delay)
198         (thread, stop_event) = self._pace_maker()
199         self.should_terminate = stop_event
200         self.updater_thread = thread
201
202     def shutdown(self):
203         """Terminates the background thread and waits for it to tear down.
204         This may block for as long as `self.sleep_delay`.
205         """
206         logger.debug('Setting shutdown event and waiting for background thread.')
207         self.should_terminate.set()
208         self.updater_thread.join()
209         logger.debug('Background thread terminated.')
210
211
212 class WaitableAutomaticStateTracker(AutomaticStateTracker):
213     """This is an AutomaticStateTracker that exposes a wait method which
214     will block the calling thread until the state changes with an
215     optional timeout.  The caller should check the return value of
216     wait; it will be true if something changed and false if the wait
217     simply timed out.  If the return value is true, the instance
218     should be reset() before wait is called again.
219
220     Example usage::
221
222         detector = waitable_presence.WaitableAutomaticStateSubclass()
223         while True:
224             changed = detector.wait(timeout=60)
225             if changed:
226                 detector.reset()
227                 # Figure out what changed and react somehow
228             else:
229                 # Just a timeout; no need to reset.  Maybe do something
230                 # else before looping up into wait again.
231     """
232
233     def __init__(
234         self,
235         update_ids_to_update_secs: Dict[str, float],
236         *,
237         override_sleep_delay: Optional[float] = None,
238     ) -> None:
239         """Construct an WaitableAutomaticStateTracker.
240
241         Args:
242             update_ids_to_update_secs: a dict mapping a user-defined
243                 update_id into a period (number of seconds) with which
244                 we would like this update performed.  e.g.::
245
246                     update_ids_to_update_secs = {
247                         'refresh_local_state': 10.0,
248                         'refresh_remote_state': 60.0,
249                     }
250
251                 This would indicate that every 10s we would like to
252                 refresh local state whereas every 60s we'd like to
253                 refresh remote state.
254
255             override_sleep_delay: By default, this class determines
256                 how long the background thread should sleep between
257                 automatic invocations to :meth:`heartbeat` based on the
258                 period of each update type in update_ids_to_update_secs.
259                 If this argument is non-None, it overrides this computation
260                 and uses this period as the sleep in the background thread.
261         """
262         self._something_changed = threading.Event()
263         super().__init__(
264             update_ids_to_update_secs, override_sleep_delay=override_sleep_delay
265         )
266
267     def something_changed(self):
268         """Indicate that something has changed."""
269         self._something_changed.set()
270
271     def did_something_change(self) -> bool:
272         """Indicate whether some state has changed in the background."""
273         return self._something_changed.is_set()
274
275     def reset(self):
276         """Call to clear the 'something changed' bit.  See usage above."""
277         self._something_changed.clear()
278
279     def wait(self, *, timeout=None):
280         """Blocking wait for something to change or a timeout to lapse.
281
282         Args:
283             timeout: maximum amount of time to wait.  If None, wait
284                 forever (until something changes or shutdown).
285         """
286         return self._something_changed.wait(timeout=timeout)