More type annotations.
[python_utils.git] / state_tracker.py
index 16d2f595cf12ee34e3c28cdb9f1c522f8ac0b978..453faf7b1972d8f4f1b3250bfdd353150e682503 100644 (file)
@@ -4,11 +4,11 @@ from abc import ABC, abstractmethod
 import datetime
 import logging
 import time
+import threading
 from typing import Dict, Optional
 
 import pytz
 
-import math_utils
 from thread_utils import background_thread
 
 logger = logging.getLogger(__name__)
@@ -20,6 +20,7 @@ class StateTracker(ABC):
     invoked via the heartbeat() method.  This method, in turn, invokes
     update() with update_ids according to a schedule / periodicity
     provided to the c'tor.
+
     """
 
     def __init__(self, update_ids_to_update_secs: Dict[str, float]) -> None:
@@ -30,6 +31,7 @@ class StateTracker(ABC):
         Note that, when more than one update is overdue, they will be
         invoked in order by their update_ids so care in choosing these
         identifiers may be in order.
+
         """
         self.update_ids_to_update_secs = update_ids_to_update_secs
         self.last_reminder_ts: Dict[str, Optional[datetime.datetime]] = {}
@@ -49,6 +51,7 @@ class StateTracker(ABC):
         The now param is the approximate current timestamp and the
         last_invocation param is the last time you were invoked (or
         None on the first invocation)
+
         """
         pass
 
@@ -64,26 +67,21 @@ class StateTracker(ABC):
         Setting force_all_updates_to_run will invoke all updates
         (ordered by update_id) immediately ignoring whether or not
         they are due.
+
         """
         self.now = datetime.datetime.now(tz=pytz.timezone("US/Pacific"))
         for update_id in sorted(self.last_reminder_ts.keys()):
             if force_all_updates_to_run:
                 logger.debug('Forcing all updates to run')
-                self.update(
-                    update_id, self.now, self.last_reminder_ts[update_id]
-                )
+                self.update(update_id, self.now, self.last_reminder_ts[update_id])
                 self.last_reminder_ts[update_id] = self.now
                 return
 
             refresh_secs = self.update_ids_to_update_secs[update_id]
             last_run = self.last_reminder_ts[update_id]
             if last_run is None:  # Never run before
-                logger.debug(
-                    f'id {update_id} has never been run; running it now'
-                )
-                self.update(
-                    update_id, self.now, self.last_reminder_ts[update_id]
-                )
+                logger.debug(f'id {update_id} has never been run; running it now')
+                self.update(update_id, self.now, self.last_reminder_ts[update_id])
                 self.last_reminder_ts[update_id] = self.now
             else:
                 delta = self.now - last_run
@@ -101,19 +99,22 @@ class AutomaticStateTracker(StateTracker):
     """Just like HeartbeatCurrentState but you don't need to pump the
     heartbeat; it runs on a background thread.  Call .shutdown() to
     terminate the updates.
+
     """
 
     @background_thread
     def pace_maker(self, should_terminate) -> None:
         """Entry point for a background thread to own calling heartbeat()
         at regular intervals so that the main thread doesn't need to do
-        so."""
+        so.
+
+        """
         while True:
             if should_terminate.is_set():
                 logger.debug('pace_maker noticed event; shutting down')
                 return
             self.heartbeat()
-            logger.debug(f'page_maker is sleeping for {self.sleep_delay}s')
+            logger.debug(f'pace_maker is sleeping for {self.sleep_delay}s')
             time.sleep(self.sleep_delay)
 
     def __init__(
@@ -122,6 +123,8 @@ class AutomaticStateTracker(StateTracker):
         *,
         override_sleep_delay: Optional[float] = None,
     ) -> None:
+        import math_utils
+
         super().__init__(update_ids_to_update_secs)
         if override_sleep_delay is not None:
             logger.debug(f'Overriding sleep delay to {override_sleep_delay}')
@@ -137,10 +140,55 @@ class AutomaticStateTracker(StateTracker):
     def shutdown(self):
         """Terminates the background thread and waits for it to tear down.
         This may block for as long as self.sleep_delay.
+
         """
-        logger.debug(
-            'Setting shutdown event and waiting for background thread.'
-        )
+        logger.debug('Setting shutdown event and waiting for background thread.')
         self.should_terminate.set()
         self.updater_thread.join()
         logger.debug('Background thread terminated.')
+
+
+class WaitableAutomaticStateTracker(AutomaticStateTracker):
+    """This is an AutomaticStateTracker that exposes a wait method which
+    will block the calling thread until the state changes with an
+    optional timeout.  The caller should check the return value of
+    wait; it will be true if something changed and false if the wait
+    simply timed out.  If the return value is true, the instance
+    should be reset() before wait is called again.
+
+    Example usage:
+
+        detector = waitable_presence.WaitableAutomaticStateSubclass()
+        while True:
+            changed = detector.wait(timeout=60 * 5)
+            if changed:
+                detector.reset()
+                # Figure out what changed and react
+            else:
+                # Just a timeout; no need to reset.  Maybe do something
+                # else before looping up into wait again.
+
+    """
+
+    def __init__(
+        self,
+        update_ids_to_update_secs: Dict[str, float],
+        *,
+        override_sleep_delay: Optional[float] = None,
+    ) -> None:
+        self._something_changed = threading.Event()
+        super().__init__(
+            update_ids_to_update_secs, override_sleep_delay=override_sleep_delay
+        )
+
+    def something_changed(self):
+        self._something_changed.set()
+
+    def did_something_change(self) -> bool:
+        return self._something_changed.is_set()
+
+    def reset(self):
+        self._something_changed.clear()
+
+    def wait(self, *, timeout=None):
+        return self._something_changed.wait(timeout=timeout)