3 # © Copyright 2021-2022, Scott Gasch
5 """Several helpers to keep track of internal state via periodic
6 polling. :class:`StateTracker` expects to be invoked periodically to
7 maintain state whereas the others (:class:`AutomaticStateTracker` and
8 :class:`WaitableAutomaticStateTracker`) automatically update themselves
9 and, optionally, expose an event for client code to wait on state
17 from abc import ABC, abstractmethod
18 from typing import Dict, Optional
22 from thread_utils import background_thread
24 logger = logging.getLogger(__name__)
27 class StateTracker(ABC):
28 """A base class that maintains and updates a global state via an
29 update routine. Instances of this class should be periodically
30 invoked via the heartbeat() method. This method, in turn, invokes
31 update() with update_ids according to a schedule / periodicity
32 provided to the c'tor.
35 def __init__(self, update_ids_to_update_secs: Dict[str, float]) -> None:
36 """The update_ids_to_update_secs dict parameter describes one or more
37 update types (unique update_ids) and the periodicity(ies), in
38 seconds, at which it/they should be invoked.
41 When more than one update is overdue, they will be
42 invoked in order by their update_ids so care in choosing these
43 identifiers may be in order.
46 update_ids_to_update_secs: a dict mapping a user-defined
47 update_id into a period (number of seconds) with which
48 we would like this update performed. e.g.::
50 update_ids_to_update_secs = {
51 'refresh_local_state': 10.0,
52 'refresh_remote_state': 60.0,
55 This would indicate that every 10s we would like to
56 refresh local state whereas every 60s we'd like to
59 self.update_ids_to_update_secs = update_ids_to_update_secs
60 self.last_reminder_ts: Dict[str, Optional[datetime.datetime]] = {}
61 self.now: Optional[datetime.datetime] = None
62 for x in update_ids_to_update_secs.keys():
63 self.last_reminder_ts[x] = None
69 now: datetime.datetime,
70 last_invocation: Optional[datetime.datetime],
72 """Put whatever you want here to perform your state updates.
75 update_id: the string you passed to the c'tor as a key in
76 the update_ids_to_update_secs dict. :meth:`update` will
77 only be invoked on the shoulder, at most, every update_secs
80 now: the approximate current timestamp at invocation time.
82 last_invocation: the last time this operation was invoked
83 (or None on the first invocation).
87 def heartbeat(self, *, force_all_updates_to_run: bool = False) -> None:
88 """Invoke this method to cause the StateTracker instance to identify
89 and invoke any overdue updates based on the schedule passed to
90 the c'tor. In the base :class:`StateTracker` class, this method must
91 be invoked manually by a thread from external code. Other subclasses
92 are available that create their own updater threads (see below).
94 If more than one type of update (update_id) are overdue,
95 they will be invoked in order based on their update_ids.
97 Setting force_all_updates_to_run will invoke all updates
98 (ordered by update_id) immediately ignoring whether or not
102 self.now = datetime.datetime.now(tz=pytz.timezone("US/Pacific"))
103 for update_id in sorted(self.last_reminder_ts.keys()):
104 if force_all_updates_to_run:
105 logger.debug('Forcing all updates to run')
106 self.update(update_id, self.now, self.last_reminder_ts[update_id])
107 self.last_reminder_ts[update_id] = self.now
110 refresh_secs = self.update_ids_to_update_secs[update_id]
111 last_run = self.last_reminder_ts[update_id]
112 if last_run is None: # Never run before
113 logger.debug('id %s has never been run; running it now', update_id)
114 self.update(update_id, self.now, self.last_reminder_ts[update_id])
115 self.last_reminder_ts[update_id] = self.now
117 delta = self.now - last_run
118 if delta.total_seconds() >= refresh_secs: # Is overdue?
119 logger.debug('id %s is overdue; running it now', update_id)
123 self.last_reminder_ts[update_id],
125 self.last_reminder_ts[update_id] = self.now
128 class AutomaticStateTracker(StateTracker):
129 """Just like :class:`StateTracker` but you don't need to pump the
130 :meth:`heartbeat` method periodically because we create a background
131 thread that manages periodic calling. You must call :meth:`shutdown`,
132 though, in order to terminate the update thread.
136 def pace_maker(self, should_terminate: threading.Event) -> None:
137 """Entry point for a background thread to own calling :meth:`heartbeat`
138 at regular intervals so that the main thread doesn't need to
142 if should_terminate.is_set():
143 logger.debug('pace_maker noticed event; shutting down')
146 logger.debug('pace_maker is sleeping for %.1fs', self.sleep_delay)
147 time.sleep(self.sleep_delay)
151 update_ids_to_update_secs: Dict[str, float],
153 override_sleep_delay: Optional[float] = None,
155 """Construct an AutomaticStateTracker.
158 update_ids_to_update_secs: a dict mapping a user-defined
159 update_id into a period (number of seconds) with which
160 we would like this update performed. e.g.::
162 update_ids_to_update_secs = {
163 'refresh_local_state': 10.0,
164 'refresh_remote_state': 60.0,
167 This would indicate that every 10s we would like to
168 refresh local state whereas every 60s we'd like to
169 refresh remote state.
171 override_sleep_delay: By default, this class determines
172 how long the background thread should sleep between
173 automatic invocations to :meth:`heartbeat` based on the
174 period of each update type in update_ids_to_update_secs.
175 If this argument is non-None, it overrides this computation
176 and uses this period as the sleep in the background thread.
180 super().__init__(update_ids_to_update_secs)
181 if override_sleep_delay is not None:
182 logger.debug('Overriding sleep delay to %.1f', override_sleep_delay)
183 self.sleep_delay = override_sleep_delay
185 periods_list = list(update_ids_to_update_secs.values())
186 self.sleep_delay = math_utils.gcd_float_sequence(periods_list)
187 logger.info('Computed sleep_delay=%.1f', self.sleep_delay)
188 (thread, stop_event) = self.pace_maker()
189 self.should_terminate = stop_event
190 self.updater_thread = thread
193 """Terminates the background thread and waits for it to tear down.
194 This may block for as long as self.sleep_delay.
196 logger.debug('Setting shutdown event and waiting for background thread.')
197 self.should_terminate.set()
198 self.updater_thread.join()
199 logger.debug('Background thread terminated.')
202 class WaitableAutomaticStateTracker(AutomaticStateTracker):
203 """This is an AutomaticStateTracker that exposes a wait method which
204 will block the calling thread until the state changes with an
205 optional timeout. The caller should check the return value of
206 wait; it will be true if something changed and false if the wait
207 simply timed out. If the return value is true, the instance
208 should be reset() before wait is called again.
212 detector = waitable_presence.WaitableAutomaticStateSubclass()
214 changed = detector.wait(timeout=60 * 5)
217 # Figure out what changed and react
219 # Just a timeout; no need to reset. Maybe do something
220 # else before looping up into wait again.
225 update_ids_to_update_secs: Dict[str, float],
227 override_sleep_delay: Optional[float] = None,
229 """Construct an WaitableAutomaticStateTracker.
232 update_ids_to_update_secs: a dict mapping a user-defined
233 update_id into a period (number of seconds) with which
234 we would like this update performed. e.g.::
236 update_ids_to_update_secs = {
237 'refresh_local_state': 10.0,
238 'refresh_remote_state': 60.0,
241 This would indicate that every 10s we would like to
242 refresh local state whereas every 60s we'd like to
243 refresh remote state.
245 override_sleep_delay: By default, this class determines
246 how long the background thread should sleep between
247 automatic invocations to :meth:`heartbeat` based on the
248 period of each update type in update_ids_to_update_secs.
249 If this argument is non-None, it overrides this computation
250 and uses this period as the sleep in the background thread.
252 self._something_changed = threading.Event()
253 super().__init__(update_ids_to_update_secs, override_sleep_delay=override_sleep_delay)
255 def something_changed(self):
256 """Indicate that something has changed."""
257 self._something_changed.set()
259 def did_something_change(self) -> bool:
260 """Indicate whether some state has changed in the background."""
261 return self._something_changed.is_set()
264 """Call to clear the 'something changed' bit. See usage above."""
265 self._something_changed.clear()
267 def wait(self, *, timeout=None):
268 """Wait for something to change or a timeout to lapse.
271 timeout: maximum amount of time to wait. If None, wait
272 forever (until something changes).
274 return self._something_changed.wait(timeout=timeout)