X-Git-Url: https://wannabe.guru.org/gitweb/?a=blobdiff_plain;f=state_tracker.py;h=453faf7b1972d8f4f1b3250bfdd353150e682503;hb=822454f580c1ff9eb207b8da46cdfae24e30cde1;hp=4f77ff42241474d214f222c428ae0f60e1da1e51;hpb=6f688ff9bacee93679f6af45a301b4308e19764c;p=python_utils.git diff --git a/state_tracker.py b/state_tracker.py index 4f77ff4..453faf7 100644 --- a/state_tracker.py +++ b/state_tracker.py @@ -20,6 +20,7 @@ class StateTracker(ABC): invoked via the heartbeat() method. This method, in turn, invokes update() with update_ids according to a schedule / periodicity provided to the c'tor. + """ def __init__(self, update_ids_to_update_secs: Dict[str, float]) -> None: @@ -30,6 +31,7 @@ class StateTracker(ABC): Note that, when more than one update is overdue, they will be invoked in order by their update_ids so care in choosing these identifiers may be in order. + """ self.update_ids_to_update_secs = update_ids_to_update_secs self.last_reminder_ts: Dict[str, Optional[datetime.datetime]] = {} @@ -49,6 +51,7 @@ class StateTracker(ABC): The now param is the approximate current timestamp and the last_invocation param is the last time you were invoked (or None on the first invocation) + """ pass @@ -64,26 +67,21 @@ class StateTracker(ABC): Setting force_all_updates_to_run will invoke all updates (ordered by update_id) immediately ignoring whether or not they are due. + """ self.now = datetime.datetime.now(tz=pytz.timezone("US/Pacific")) for update_id in sorted(self.last_reminder_ts.keys()): if force_all_updates_to_run: logger.debug('Forcing all updates to run') - self.update( - update_id, self.now, self.last_reminder_ts[update_id] - ) + self.update(update_id, self.now, self.last_reminder_ts[update_id]) self.last_reminder_ts[update_id] = self.now return refresh_secs = self.update_ids_to_update_secs[update_id] last_run = self.last_reminder_ts[update_id] if last_run is None: # Never run before - logger.debug( - f'id {update_id} has never been run; running it now' - ) - self.update( - update_id, self.now, self.last_reminder_ts[update_id] - ) + logger.debug(f'id {update_id} has never been run; running it now') + self.update(update_id, self.now, self.last_reminder_ts[update_id]) self.last_reminder_ts[update_id] = self.now else: delta = self.now - last_run @@ -101,13 +99,16 @@ class AutomaticStateTracker(StateTracker): """Just like HeartbeatCurrentState but you don't need to pump the heartbeat; it runs on a background thread. Call .shutdown() to terminate the updates. + """ @background_thread def pace_maker(self, should_terminate) -> None: """Entry point for a background thread to own calling heartbeat() at regular intervals so that the main thread doesn't need to do - so.""" + so. + + """ while True: if should_terminate.is_set(): logger.debug('pace_maker noticed event; shutting down') @@ -123,6 +124,7 @@ class AutomaticStateTracker(StateTracker): override_sleep_delay: Optional[float] = None, ) -> None: import math_utils + super().__init__(update_ids_to_update_secs) if override_sleep_delay is not None: logger.debug(f'Overriding sleep delay to {override_sleep_delay}') @@ -138,26 +140,46 @@ class AutomaticStateTracker(StateTracker): def shutdown(self): """Terminates the background thread and waits for it to tear down. This may block for as long as self.sleep_delay. + """ - logger.debug( - 'Setting shutdown event and waiting for background thread.' - ) + logger.debug('Setting shutdown event and waiting for background thread.') self.should_terminate.set() self.updater_thread.join() logger.debug('Background thread terminated.') class WaitableAutomaticStateTracker(AutomaticStateTracker): + """This is an AutomaticStateTracker that exposes a wait method which + will block the calling thread until the state changes with an + optional timeout. The caller should check the return value of + wait; it will be true if something changed and false if the wait + simply timed out. If the return value is true, the instance + should be reset() before wait is called again. + + Example usage: + + detector = waitable_presence.WaitableAutomaticStateSubclass() + while True: + changed = detector.wait(timeout=60 * 5) + if changed: + detector.reset() + # Figure out what changed and react + else: + # Just a timeout; no need to reset. Maybe do something + # else before looping up into wait again. + + """ def __init__( - self, - update_ids_to_update_secs: Dict[str, float], - *, - override_sleep_delay: Optional[float] = None, + self, + update_ids_to_update_secs: Dict[str, float], + *, + override_sleep_delay: Optional[float] = None, ) -> None: self._something_changed = threading.Event() - super().__init__(update_ids_to_update_secs, - override_sleep_delay=override_sleep_delay) + super().__init__( + update_ids_to_update_secs, override_sleep_delay=override_sleep_delay + ) def something_changed(self): self._something_changed.set() @@ -168,9 +190,5 @@ class WaitableAutomaticStateTracker(AutomaticStateTracker): def reset(self): self._something_changed.clear() - def wait(self, - *, - timeout=None): - return self._something_changed.wait( - timeout=timeout - ) + def wait(self, *, timeout=None): + return self._something_changed.wait(timeout=timeout)