X-Git-Url: https://wannabe.guru.org/gitweb/?a=blobdiff_plain;f=src%2Fpyutils%2Fstate_tracker.py;h=90580f2d5977fd95abaabdad3e2f3f32558ecd22;hb=278d163705facc2276cd464414fb490ef6af50ab;hp=f83f254f27cce69ecf0d05bbcd740484beac7f65;hpb=69566c003b4f1c3a4905f37d3735d7921502d14a;p=pyutils.git diff --git a/src/pyutils/state_tracker.py b/src/pyutils/state_tracker.py index f83f254..90580f2 100644 --- a/src/pyutils/state_tracker.py +++ b/src/pyutils/state_tracker.py @@ -2,12 +2,16 @@ # © Copyright 2021-2022, Scott Gasch -"""Several helpers to keep track of internal state via periodic -polling. :class:`StateTracker` expects to be invoked periodically to -maintain state whereas the others (:class:`AutomaticStateTracker` and -:class:`WaitableAutomaticStateTracker`) automatically update themselves -and, optionally, expose an event for client code to wait on state +"""This module defines several classes (:py:class:`StateTracker`, +:py:class:`AutomaticStateTracker`, and +:py:class:`WaitableAutomaticStateTracker`) that can be used as base +classes by your code. These class patterns are meant to encapsulate +and represent some state that dynamically changes and must be updated +periodically. These classes update their state (either automatically +or when invoked to poll) and allow their callers to wait on state changes. + +See also :class:`pyutils.parallelize.thread_utils.periodically_invoke` """ import datetime @@ -25,17 +29,20 @@ logger = logging.getLogger(__name__) class StateTracker(ABC): - """A base class that maintains and updates a global state via an - update routine. Instances of this class should be periodically - invoked via the heartbeat() method. This method, in turn, invokes - update() with update_ids according to a schedule / periodicity - provided to the c'tor. + """A base class that maintains and updates its state via an update + routine called :meth:`heartbeat`. This method is not automatic: + instances of this class should be periodically invoked via their + :meth:`heartbeat` method by some other thread. + + See also :class:`AutomaticStateTracker` if you'd rather not have + to invoke your code regularly. """ def __init__(self, update_ids_to_update_secs: Dict[str, float]) -> None: - """The update_ids_to_update_secs dict parameter describes one or more - update types (unique update_ids) and the periodicity(ies), in - seconds, at which it/they should be invoked. + """The update_ids_to_update_secs dict parameter describes one + or more update types (unique update_ids) and the + periodicity(ies), in seconds, at which it/they should be + invoked. .. note:: When more than one update is overdue, they will be @@ -55,6 +62,7 @@ class StateTracker(ABC): This would indicate that every 10s we would like to refresh local state whereas every 60s we'd like to refresh remote state. + """ self.update_ids_to_update_secs = update_ids_to_update_secs self.last_reminder_ts: Dict[str, Optional[datetime.datetime]] = {} @@ -74,7 +82,7 @@ class StateTracker(ABC): Args: update_id: the string you passed to the c'tor as a key in the update_ids_to_update_secs dict. :meth:`update` will - only be invoked on the shoulder, at most, every update_secs + only be invoked, at most, every update_secs seconds. now: the approximate current timestamp at invocation time. @@ -85,17 +93,18 @@ class StateTracker(ABC): pass def heartbeat(self, *, force_all_updates_to_run: bool = False) -> None: - """Invoke this method to cause the StateTracker instance to identify - and invoke any overdue updates based on the schedule passed to - the c'tor. In the base :class:`StateTracker` class, this method must - be invoked manually by a thread from external code. Other subclasses - are available that create their own updater threads (see below). - - If more than one type of update (update_id) are overdue, - they will be invoked in order based on their update_ids. - - Setting force_all_updates_to_run will invoke all updates - (ordered by update_id) immediately ignoring whether or not + """Invoke this method periodically to cause the :class:`StateTracker` + instance to identify and invoke any overdue updates based on the + schedule passed to the c'tor. In the base :class:`StateTracker` class, + this method must be invoked manually by a thread from external code. + Other subclasses (e.g. :class:`AutomaticStateTracker`) are available + that create their own updater threads (see below). + + If more than one type of update (`update_id`) is overdue, + overdue updates will be invoked in order based on their `update_id`. + + Setting `force_all_updates_to_run` will invoke all updates + (ordered by `update_id`) immediately ignoring whether or not they are due. """ @@ -133,17 +142,20 @@ class AutomaticStateTracker(StateTracker): """ @background_thread - def pace_maker(self, should_terminate: threading.Event) -> None: + def _pace_maker(self, should_terminate: threading.Event) -> None: """Entry point for a background thread to own calling :meth:`heartbeat` at regular intervals so that the main thread doesn't need to do so. + + Args: + should_terminate: an event which, when set, indicates we should terminate. """ while True: if should_terminate.is_set(): - logger.debug('pace_maker noticed event; shutting down') + logger.debug('_pace_maker noticed event; shutting down') return self.heartbeat() - logger.debug('pace_maker is sleeping for %.1fs', self.sleep_delay) + logger.debug('_pace_maker is sleeping for %.1fs', self.sleep_delay) time.sleep(self.sleep_delay) def __init__( @@ -185,13 +197,13 @@ class AutomaticStateTracker(StateTracker): periods_list = list(update_ids_to_update_secs.values()) self.sleep_delay = math_utils.gcd_float_sequence(periods_list) logger.info('Computed sleep_delay=%.1f', self.sleep_delay) - (thread, stop_event) = self.pace_maker() + (thread, stop_event) = self._pace_maker() self.should_terminate = stop_event self.updater_thread = thread def shutdown(self): """Terminates the background thread and waits for it to tear down. - This may block for as long as self.sleep_delay. + This may block for as long as `self.sleep_delay`. """ logger.debug('Setting shutdown event and waiting for background thread.') self.should_terminate.set() @@ -211,10 +223,10 @@ class WaitableAutomaticStateTracker(AutomaticStateTracker): detector = waitable_presence.WaitableAutomaticStateSubclass() while True: - changed = detector.wait(timeout=60 * 5) + changed = detector.wait(timeout=60) if changed: detector.reset() - # Figure out what changed and react + # Figure out what changed and react somehow else: # Just a timeout; no need to reset. Maybe do something # else before looping up into wait again. @@ -267,10 +279,10 @@ class WaitableAutomaticStateTracker(AutomaticStateTracker): self._something_changed.clear() def wait(self, *, timeout=None): - """Wait for something to change or a timeout to lapse. + """Blocking wait for something to change or a timeout to lapse. Args: timeout: maximum amount of time to wait. If None, wait - forever (until something changes). + forever (until something changes or shutdown). """ return self._something_changed.wait(timeout=timeout)