X-Git-Url: https://wannabe.guru.org/gitweb/?a=blobdiff_plain;f=state_tracker.py;h=62eb183dba7bfc90a4bbbff73b4401f2d747231b;hb=532df2c5b57c7517dfb3dddd8c1358fbadf8baf3;hp=4836e3eb99915a6d25a06102ee0c93163cd69133;hpb=36fea7f15ed17150691b5b3ead75450e575229ef;p=python_utils.git diff --git a/state_tracker.py b/state_tracker.py index 4836e3e..62eb183 100644 --- a/state_tracker.py +++ b/state_tracker.py @@ -1,10 +1,19 @@ #!/usr/bin/env python3 -from abc import ABC, abstractmethod +# © Copyright 2021-2022, Scott Gasch + +"""Several helpers to keep track of internal state via periodic +polling. StateTracker expects to be invoked periodically to maintain +state whereas the others automatically update themselves and, +optionally, expose an event for client code to wait on state changes. + +""" + import datetime import logging -import time import threading +import time +from abc import ABC, abstractmethod from typing import Dict, Optional import pytz @@ -35,6 +44,7 @@ class StateTracker(ABC): """ self.update_ids_to_update_secs = update_ids_to_update_secs self.last_reminder_ts: Dict[str, Optional[datetime.datetime]] = {} + self.now: Optional[datetime.datetime] = None for x in update_ids_to_update_secs.keys(): self.last_reminder_ts[x] = None @@ -73,26 +83,20 @@ class StateTracker(ABC): for update_id in sorted(self.last_reminder_ts.keys()): if force_all_updates_to_run: logger.debug('Forcing all updates to run') - self.update( - update_id, self.now, self.last_reminder_ts[update_id] - ) + self.update(update_id, self.now, self.last_reminder_ts[update_id]) self.last_reminder_ts[update_id] = self.now return refresh_secs = self.update_ids_to_update_secs[update_id] last_run = self.last_reminder_ts[update_id] if last_run is None: # Never run before - logger.debug( - f'id {update_id} has never been run; running it now' - ) - self.update( - update_id, self.now, self.last_reminder_ts[update_id] - ) + logger.debug('id %s has never been run; running it now', update_id) + self.update(update_id, self.now, self.last_reminder_ts[update_id]) self.last_reminder_ts[update_id] = self.now else: delta = self.now - last_run if delta.total_seconds() >= refresh_secs: # Is overdue? - logger.debug(f'id {update_id} is overdue; running it now') + logger.debug('id %s is overdue; running it now', update_id) self.update( update_id, self.now, @@ -109,7 +113,7 @@ class AutomaticStateTracker(StateTracker): """ @background_thread - def pace_maker(self, should_terminate) -> None: + def pace_maker(self, should_terminate: threading.Event) -> None: """Entry point for a background thread to own calling heartbeat() at regular intervals so that the main thread doesn't need to do so. @@ -120,7 +124,7 @@ class AutomaticStateTracker(StateTracker): logger.debug('pace_maker noticed event; shutting down') return self.heartbeat() - logger.debug(f'pace_maker is sleeping for {self.sleep_delay}s') + logger.debug('pace_maker is sleeping for %.1fs', self.sleep_delay) time.sleep(self.sleep_delay) def __init__( @@ -133,12 +137,12 @@ class AutomaticStateTracker(StateTracker): super().__init__(update_ids_to_update_secs) if override_sleep_delay is not None: - logger.debug(f'Overriding sleep delay to {override_sleep_delay}') + logger.debug('Overriding sleep delay to %.1f', override_sleep_delay) self.sleep_delay = override_sleep_delay else: periods_list = list(update_ids_to_update_secs.values()) self.sleep_delay = math_utils.gcd_float_sequence(periods_list) - logger.info(f'Computed sleep_delay={self.sleep_delay}') + logger.info('Computed sleep_delay=%.1f', self.sleep_delay) (thread, stop_event) = self.pace_maker() self.should_terminate = stop_event self.updater_thread = thread @@ -148,9 +152,7 @@ class AutomaticStateTracker(StateTracker): This may block for as long as self.sleep_delay. """ - logger.debug( - 'Setting shutdown event and waiting for background thread.' - ) + logger.debug('Setting shutdown event and waiting for background thread.') self.should_terminate.set() self.updater_thread.join() logger.debug('Background thread terminated.') @@ -185,9 +187,7 @@ class WaitableAutomaticStateTracker(AutomaticStateTracker): override_sleep_delay: Optional[float] = None, ) -> None: self._something_changed = threading.Event() - super().__init__( - update_ids_to_update_secs, override_sleep_delay=override_sleep_delay - ) + super().__init__(update_ids_to_update_secs, override_sleep_delay=override_sleep_delay) def something_changed(self): self._something_changed.set()