Since this thing is on the innerwebs I suppose it should have a
[python_utils.git] / state_tracker.py
index 11ce4c3b4198e5f7ff6e0406b35d8b65920b46d7..62eb183dba7bfc90a4bbbff73b4401f2d747231b 100644 (file)
@@ -1,9 +1,19 @@
 #!/usr/bin/env python3
 
-from abc import ABC, abstractmethod
+# © Copyright 2021-2022, Scott Gasch
+
+"""Several helpers to keep track of internal state via periodic
+polling.  StateTracker expects to be invoked periodically to maintain
+state whereas the others automatically update themselves and,
+optionally, expose an event for client code to wait on state changes.
+
+"""
+
 import datetime
 import logging
+import threading
 import time
+from abc import ABC, abstractmethod
 from typing import Dict, Optional
 
 import pytz
@@ -19,6 +29,7 @@ class StateTracker(ABC):
     invoked via the heartbeat() method.  This method, in turn, invokes
     update() with update_ids according to a schedule / periodicity
     provided to the c'tor.
+
     """
 
     def __init__(self, update_ids_to_update_secs: Dict[str, float]) -> None:
@@ -29,9 +40,11 @@ class StateTracker(ABC):
         Note that, when more than one update is overdue, they will be
         invoked in order by their update_ids so care in choosing these
         identifiers may be in order.
+
         """
         self.update_ids_to_update_secs = update_ids_to_update_secs
         self.last_reminder_ts: Dict[str, Optional[datetime.datetime]] = {}
+        self.now: Optional[datetime.datetime] = None
         for x in update_ids_to_update_secs.keys():
             self.last_reminder_ts[x] = None
 
@@ -48,6 +61,7 @@ class StateTracker(ABC):
         The now param is the approximate current timestamp and the
         last_invocation param is the last time you were invoked (or
         None on the first invocation)
+
         """
         pass
 
@@ -63,31 +77,26 @@ class StateTracker(ABC):
         Setting force_all_updates_to_run will invoke all updates
         (ordered by update_id) immediately ignoring whether or not
         they are due.
+
         """
         self.now = datetime.datetime.now(tz=pytz.timezone("US/Pacific"))
         for update_id in sorted(self.last_reminder_ts.keys()):
             if force_all_updates_to_run:
                 logger.debug('Forcing all updates to run')
-                self.update(
-                    update_id, self.now, self.last_reminder_ts[update_id]
-                )
+                self.update(update_id, self.now, self.last_reminder_ts[update_id])
                 self.last_reminder_ts[update_id] = self.now
                 return
 
             refresh_secs = self.update_ids_to_update_secs[update_id]
             last_run = self.last_reminder_ts[update_id]
             if last_run is None:  # Never run before
-                logger.debug(
-                    f'id {update_id} has never been run; running it now'
-                )
-                self.update(
-                    update_id, self.now, self.last_reminder_ts[update_id]
-                )
+                logger.debug('id %s has never been run; running it now', update_id)
+                self.update(update_id, self.now, self.last_reminder_ts[update_id])
                 self.last_reminder_ts[update_id] = self.now
             else:
                 delta = self.now - last_run
                 if delta.total_seconds() >= refresh_secs:  # Is overdue?
-                    logger.debug(f'id {update_id} is overdue; running it now')
+                    logger.debug('id %s is overdue; running it now', update_id)
                     self.update(
                         update_id,
                         self.now,
@@ -100,19 +109,22 @@ class AutomaticStateTracker(StateTracker):
     """Just like HeartbeatCurrentState but you don't need to pump the
     heartbeat; it runs on a background thread.  Call .shutdown() to
     terminate the updates.
+
     """
 
     @background_thread
-    def pace_maker(self, should_terminate) -> None:
+    def pace_maker(self, should_terminate: threading.Event) -> None:
         """Entry point for a background thread to own calling heartbeat()
         at regular intervals so that the main thread doesn't need to do
-        so."""
+        so.
+
+        """
         while True:
             if should_terminate.is_set():
                 logger.debug('pace_maker noticed event; shutting down')
                 return
             self.heartbeat()
-            logger.debug(f'page_maker is sleeping for {self.sleep_delay}s')
+            logger.debug('pace_maker is sleeping for %.1fs', self.sleep_delay)
             time.sleep(self.sleep_delay)
 
     def __init__(
@@ -122,14 +134,15 @@ class AutomaticStateTracker(StateTracker):
         override_sleep_delay: Optional[float] = None,
     ) -> None:
         import math_utils
+
         super().__init__(update_ids_to_update_secs)
         if override_sleep_delay is not None:
-            logger.debug(f'Overriding sleep delay to {override_sleep_delay}')
+            logger.debug('Overriding sleep delay to %.1f', override_sleep_delay)
             self.sleep_delay = override_sleep_delay
         else:
             periods_list = list(update_ids_to_update_secs.values())
             self.sleep_delay = math_utils.gcd_float_sequence(periods_list)
-            logger.info(f'Computed sleep_delay={self.sleep_delay}')
+            logger.info('Computed sleep_delay=%.1f', self.sleep_delay)
         (thread, stop_event) = self.pace_maker()
         self.should_terminate = stop_event
         self.updater_thread = thread
@@ -137,10 +150,53 @@ class AutomaticStateTracker(StateTracker):
     def shutdown(self):
         """Terminates the background thread and waits for it to tear down.
         This may block for as long as self.sleep_delay.
+
         """
-        logger.debug(
-            'Setting shutdown event and waiting for background thread.'
-        )
+        logger.debug('Setting shutdown event and waiting for background thread.')
         self.should_terminate.set()
         self.updater_thread.join()
         logger.debug('Background thread terminated.')
+
+
+class WaitableAutomaticStateTracker(AutomaticStateTracker):
+    """This is an AutomaticStateTracker that exposes a wait method which
+    will block the calling thread until the state changes with an
+    optional timeout.  The caller should check the return value of
+    wait; it will be true if something changed and false if the wait
+    simply timed out.  If the return value is true, the instance
+    should be reset() before wait is called again.
+
+    Example usage:
+
+        detector = waitable_presence.WaitableAutomaticStateSubclass()
+        while True:
+            changed = detector.wait(timeout=60 * 5)
+            if changed:
+                detector.reset()
+                # Figure out what changed and react
+            else:
+                # Just a timeout; no need to reset.  Maybe do something
+                # else before looping up into wait again.
+
+    """
+
+    def __init__(
+        self,
+        update_ids_to_update_secs: Dict[str, float],
+        *,
+        override_sleep_delay: Optional[float] = None,
+    ) -> None:
+        self._something_changed = threading.Event()
+        super().__init__(update_ids_to_update_secs, override_sleep_delay=override_sleep_delay)
+
+    def something_changed(self):
+        self._something_changed.set()
+
+    def did_something_change(self) -> bool:
+        return self._something_changed.is_set()
+
+    def reset(self):
+        self._something_changed.clear()
+
+    def wait(self, *, timeout=None):
+        return self._something_changed.wait(timeout=timeout)