Tweak around docstring to make prettier sphinx autodocs.
[python_utils.git] / state_tracker.py
1 #!/usr/bin/env python3
2
3 # © Copyright 2021-2022, Scott Gasch
4
5 """Several helpers to keep track of internal state via periodic
6 polling.  StateTracker expects to be invoked periodically to maintain
7 state whereas the others automatically update themselves and,
8 optionally, expose an event for client code to wait on state changes.
9 """
10
11 import datetime
12 import logging
13 import threading
14 import time
15 from abc import ABC, abstractmethod
16 from typing import Dict, Optional
17
18 import pytz
19
20 from thread_utils import background_thread
21
22 logger = logging.getLogger(__name__)
23
24
25 class StateTracker(ABC):
26     """A base class that maintains and updates a global state via an
27     update routine.  Instances of this class should be periodically
28     invoked via the heartbeat() method.  This method, in turn, invokes
29     update() with update_ids according to a schedule / periodicity
30     provided to the c'tor.
31     """
32
33     def __init__(self, update_ids_to_update_secs: Dict[str, float]) -> None:
34         """The update_ids_to_update_secs dict parameter describes one or more
35         update types (unique update_ids) and the periodicity(ies), in
36         seconds, at which it/they should be invoked.
37
38         Note that, when more than one update is overdue, they will be
39         invoked in order by their update_ids so care in choosing these
40         identifiers may be in order.
41         """
42         self.update_ids_to_update_secs = update_ids_to_update_secs
43         self.last_reminder_ts: Dict[str, Optional[datetime.datetime]] = {}
44         self.now: Optional[datetime.datetime] = None
45         for x in update_ids_to_update_secs.keys():
46             self.last_reminder_ts[x] = None
47
48     @abstractmethod
49     def update(
50         self,
51         update_id: str,
52         now: datetime.datetime,
53         last_invocation: Optional[datetime.datetime],
54     ) -> None:
55         """Put whatever you want here.  The update_id will be the string
56         passed to the c'tor as a key in the Dict.  It will only be
57         tapped on the shoulder, at most, every update_secs seconds.
58         The now param is the approximate current timestamp and the
59         last_invocation param is the last time you were invoked (or
60         None on the first invocation)
61         """
62         pass
63
64     def heartbeat(self, *, force_all_updates_to_run: bool = False) -> None:
65         """Invoke this method to cause the StateTracker instance to identify
66         and invoke any overdue updates based on the schedule passed to
67         the c'tor.  In the base StateTracker class, this method must
68         be invoked manually with a thread from external code.
69
70         If more than one type of update (update_id) are overdue,
71         they will be invoked in order based on their update_ids.
72
73         Setting force_all_updates_to_run will invoke all updates
74         (ordered by update_id) immediately ignoring whether or not
75         they are due.
76         """
77
78         self.now = datetime.datetime.now(tz=pytz.timezone("US/Pacific"))
79         for update_id in sorted(self.last_reminder_ts.keys()):
80             if force_all_updates_to_run:
81                 logger.debug('Forcing all updates to run')
82                 self.update(update_id, self.now, self.last_reminder_ts[update_id])
83                 self.last_reminder_ts[update_id] = self.now
84                 return
85
86             refresh_secs = self.update_ids_to_update_secs[update_id]
87             last_run = self.last_reminder_ts[update_id]
88             if last_run is None:  # Never run before
89                 logger.debug('id %s has never been run; running it now', update_id)
90                 self.update(update_id, self.now, self.last_reminder_ts[update_id])
91                 self.last_reminder_ts[update_id] = self.now
92             else:
93                 delta = self.now - last_run
94                 if delta.total_seconds() >= refresh_secs:  # Is overdue?
95                     logger.debug('id %s is overdue; running it now', update_id)
96                     self.update(
97                         update_id,
98                         self.now,
99                         self.last_reminder_ts[update_id],
100                     )
101                     self.last_reminder_ts[update_id] = self.now
102
103
104 class AutomaticStateTracker(StateTracker):
105     """Just like HeartbeatCurrentState but you don't need to pump the
106     heartbeat; it runs on a background thread.  Call .shutdown() to
107     terminate the updates.
108     """
109
110     @background_thread
111     def pace_maker(self, should_terminate: threading.Event) -> None:
112         """Entry point for a background thread to own calling heartbeat()
113         at regular intervals so that the main thread doesn't need to do
114         so.
115         """
116         while True:
117             if should_terminate.is_set():
118                 logger.debug('pace_maker noticed event; shutting down')
119                 return
120             self.heartbeat()
121             logger.debug('pace_maker is sleeping for %.1fs', self.sleep_delay)
122             time.sleep(self.sleep_delay)
123
124     def __init__(
125         self,
126         update_ids_to_update_secs: Dict[str, float],
127         *,
128         override_sleep_delay: Optional[float] = None,
129     ) -> None:
130         import math_utils
131
132         super().__init__(update_ids_to_update_secs)
133         if override_sleep_delay is not None:
134             logger.debug('Overriding sleep delay to %.1f', override_sleep_delay)
135             self.sleep_delay = override_sleep_delay
136         else:
137             periods_list = list(update_ids_to_update_secs.values())
138             self.sleep_delay = math_utils.gcd_float_sequence(periods_list)
139             logger.info('Computed sleep_delay=%.1f', self.sleep_delay)
140         (thread, stop_event) = self.pace_maker()
141         self.should_terminate = stop_event
142         self.updater_thread = thread
143
144     def shutdown(self):
145         """Terminates the background thread and waits for it to tear down.
146         This may block for as long as self.sleep_delay.
147         """
148
149         logger.debug('Setting shutdown event and waiting for background thread.')
150         self.should_terminate.set()
151         self.updater_thread.join()
152         logger.debug('Background thread terminated.')
153
154
155 class WaitableAutomaticStateTracker(AutomaticStateTracker):
156     """This is an AutomaticStateTracker that exposes a wait method which
157     will block the calling thread until the state changes with an
158     optional timeout.  The caller should check the return value of
159     wait; it will be true if something changed and false if the wait
160     simply timed out.  If the return value is true, the instance
161     should be reset() before wait is called again.
162
163     Example usage::
164
165         detector = waitable_presence.WaitableAutomaticStateSubclass()
166         while True:
167             changed = detector.wait(timeout=60 * 5)
168             if changed:
169                 detector.reset()
170                 # Figure out what changed and react
171             else:
172                 # Just a timeout; no need to reset.  Maybe do something
173                 # else before looping up into wait again.
174     """
175
176     def __init__(
177         self,
178         update_ids_to_update_secs: Dict[str, float],
179         *,
180         override_sleep_delay: Optional[float] = None,
181     ) -> None:
182         self._something_changed = threading.Event()
183         super().__init__(update_ids_to_update_secs, override_sleep_delay=override_sleep_delay)
184
185     def something_changed(self):
186         self._something_changed.set()
187
188     def did_something_change(self) -> bool:
189         return self._something_changed.is_set()
190
191     def reset(self):
192         self._something_changed.clear()
193
194     def wait(self, *, timeout=None):
195         return self._something_changed.wait(timeout=timeout)