Since this thing is on the innerwebs I suppose it should have a
[python_utils.git] / state_tracker.py
index 4836e3eb99915a6d25a06102ee0c93163cd69133..62eb183dba7bfc90a4bbbff73b4401f2d747231b 100644 (file)
@@ -1,10 +1,19 @@
 #!/usr/bin/env python3
 
-from abc import ABC, abstractmethod
+# © Copyright 2021-2022, Scott Gasch
+
+"""Several helpers to keep track of internal state via periodic
+polling.  StateTracker expects to be invoked periodically to maintain
+state whereas the others automatically update themselves and,
+optionally, expose an event for client code to wait on state changes.
+
+"""
+
 import datetime
 import logging
-import time
 import threading
+import time
+from abc import ABC, abstractmethod
 from typing import Dict, Optional
 
 import pytz
@@ -35,6 +44,7 @@ class StateTracker(ABC):
         """
         self.update_ids_to_update_secs = update_ids_to_update_secs
         self.last_reminder_ts: Dict[str, Optional[datetime.datetime]] = {}
+        self.now: Optional[datetime.datetime] = None
         for x in update_ids_to_update_secs.keys():
             self.last_reminder_ts[x] = None
 
@@ -73,26 +83,20 @@ class StateTracker(ABC):
         for update_id in sorted(self.last_reminder_ts.keys()):
             if force_all_updates_to_run:
                 logger.debug('Forcing all updates to run')
-                self.update(
-                    update_id, self.now, self.last_reminder_ts[update_id]
-                )
+                self.update(update_id, self.now, self.last_reminder_ts[update_id])
                 self.last_reminder_ts[update_id] = self.now
                 return
 
             refresh_secs = self.update_ids_to_update_secs[update_id]
             last_run = self.last_reminder_ts[update_id]
             if last_run is None:  # Never run before
-                logger.debug(
-                    f'id {update_id} has never been run; running it now'
-                )
-                self.update(
-                    update_id, self.now, self.last_reminder_ts[update_id]
-                )
+                logger.debug('id %s has never been run; running it now', update_id)
+                self.update(update_id, self.now, self.last_reminder_ts[update_id])
                 self.last_reminder_ts[update_id] = self.now
             else:
                 delta = self.now - last_run
                 if delta.total_seconds() >= refresh_secs:  # Is overdue?
-                    logger.debug(f'id {update_id} is overdue; running it now')
+                    logger.debug('id %s is overdue; running it now', update_id)
                     self.update(
                         update_id,
                         self.now,
@@ -109,7 +113,7 @@ class AutomaticStateTracker(StateTracker):
     """
 
     @background_thread
-    def pace_maker(self, should_terminate) -> None:
+    def pace_maker(self, should_terminate: threading.Event) -> None:
         """Entry point for a background thread to own calling heartbeat()
         at regular intervals so that the main thread doesn't need to do
         so.
@@ -120,7 +124,7 @@ class AutomaticStateTracker(StateTracker):
                 logger.debug('pace_maker noticed event; shutting down')
                 return
             self.heartbeat()
-            logger.debug(f'pace_maker is sleeping for {self.sleep_delay}s')
+            logger.debug('pace_maker is sleeping for %.1fs', self.sleep_delay)
             time.sleep(self.sleep_delay)
 
     def __init__(
@@ -133,12 +137,12 @@ class AutomaticStateTracker(StateTracker):
 
         super().__init__(update_ids_to_update_secs)
         if override_sleep_delay is not None:
-            logger.debug(f'Overriding sleep delay to {override_sleep_delay}')
+            logger.debug('Overriding sleep delay to %.1f', override_sleep_delay)
             self.sleep_delay = override_sleep_delay
         else:
             periods_list = list(update_ids_to_update_secs.values())
             self.sleep_delay = math_utils.gcd_float_sequence(periods_list)
-            logger.info(f'Computed sleep_delay={self.sleep_delay}')
+            logger.info('Computed sleep_delay=%.1f', self.sleep_delay)
         (thread, stop_event) = self.pace_maker()
         self.should_terminate = stop_event
         self.updater_thread = thread
@@ -148,9 +152,7 @@ class AutomaticStateTracker(StateTracker):
         This may block for as long as self.sleep_delay.
 
         """
-        logger.debug(
-            'Setting shutdown event and waiting for background thread.'
-        )
+        logger.debug('Setting shutdown event and waiting for background thread.')
         self.should_terminate.set()
         self.updater_thread.join()
         logger.debug('Background thread terminated.')
@@ -185,9 +187,7 @@ class WaitableAutomaticStateTracker(AutomaticStateTracker):
         override_sleep_delay: Optional[float] = None,
     ) -> None:
         self._something_changed = threading.Event()
-        super().__init__(
-            update_ids_to_update_secs, override_sleep_delay=override_sleep_delay
-        )
+        super().__init__(update_ids_to_update_secs, override_sleep_delay=override_sleep_delay)
 
     def something_changed(self):
         self._something_changed.set()