#!/usr/bin/env python3
-from abc import ABC, abstractmethod
+"""Several helpers to keep track of internal state via periodic
+polling. StateTracker expects to be invoked periodically to maintain
+state whereas the others automatically update themselves and,
+optionally, expose an event for client code to wait on state changes."""
+
import datetime
import logging
+import threading
import time
+from abc import ABC, abstractmethod
from typing import Dict, Optional
import pytz
from thread_utils import background_thread
-import math_utils
logger = logging.getLogger(__name__)
invoked via the heartbeat() method. This method, in turn, invokes
update() with update_ids according to a schedule / periodicity
provided to the c'tor.
+
"""
def __init__(self, update_ids_to_update_secs: Dict[str, float]) -> None:
Note that, when more than one update is overdue, they will be
invoked in order by their update_ids so care in choosing these
identifiers may be in order.
+
"""
self.update_ids_to_update_secs = update_ids_to_update_secs
self.last_reminder_ts: Dict[str, Optional[datetime.datetime]] = {}
+ self.now: Optional[datetime.datetime] = None
for x in update_ids_to_update_secs.keys():
self.last_reminder_ts[x] = None
The now param is the approximate current timestamp and the
last_invocation param is the last time you were invoked (or
None on the first invocation)
+
"""
pass
Setting force_all_updates_to_run will invoke all updates
(ordered by update_id) immediately ignoring whether or not
they are due.
+
"""
self.now = datetime.datetime.now(tz=pytz.timezone("US/Pacific"))
for update_id in sorted(self.last_reminder_ts.keys()):
- refresh_secs = self.update_ids_to_update_secs[update_id]
if force_all_updates_to_run:
logger.debug('Forcing all updates to run')
- self.update(
- update_id, self.now, self.last_reminder_ts[update_id]
- )
+ self.update(update_id, self.now, self.last_reminder_ts[update_id])
+ self.last_reminder_ts[update_id] = self.now
+ return
+
+ refresh_secs = self.update_ids_to_update_secs[update_id]
+ last_run = self.last_reminder_ts[update_id]
+ if last_run is None: # Never run before
+ logger.debug('id %s has never been run; running it now', update_id)
+ self.update(update_id, self.now, self.last_reminder_ts[update_id])
self.last_reminder_ts[update_id] = self.now
else:
- last_run = self.last_reminder_ts[update_id]
- if last_run is None: # Never run before
- logger.debug(
- f'id {update_id} has never been run; running it now'
- )
+ delta = self.now - last_run
+ if delta.total_seconds() >= refresh_secs: # Is overdue?
+ logger.debug('id %s is overdue; running it now', update_id)
self.update(
- update_id, self.now, self.last_reminder_ts[update_id]
+ update_id,
+ self.now,
+ self.last_reminder_ts[update_id],
)
self.last_reminder_ts[update_id] = self.now
- else:
- delta = self.now - last_run
- if delta.total_seconds() >= refresh_secs: # Is overdue
- logger.debug('id {update_id} is overdue; running it now')
- self.update(
- update_id,
- self.now,
- self.last_reminder_ts[update_id],
- )
- self.last_reminder_ts[update_id] = self.now
class AutomaticStateTracker(StateTracker):
"""Just like HeartbeatCurrentState but you don't need to pump the
heartbeat; it runs on a background thread. Call .shutdown() to
terminate the updates.
+
"""
@background_thread
- def pace_maker(self, should_terminate) -> None:
+ def pace_maker(self, should_terminate: threading.Event) -> None:
"""Entry point for a background thread to own calling heartbeat()
at regular intervals so that the main thread doesn't need to do
- so."""
+ so.
+
+ """
while True:
if should_terminate.is_set():
logger.debug('pace_maker noticed event; shutting down')
return
self.heartbeat()
- logger.debug(f'page_maker is sleeping for {self.sleep_delay}s')
+ logger.debug('pace_maker is sleeping for %.1fs', self.sleep_delay)
time.sleep(self.sleep_delay)
def __init__(
*,
override_sleep_delay: Optional[float] = None,
) -> None:
+ import math_utils
+
super().__init__(update_ids_to_update_secs)
if override_sleep_delay is not None:
- logger.debug(f'Overriding sleep delay to {override_sleep_delay}')
+ logger.debug('Overriding sleep delay to %.1f', override_sleep_delay)
self.sleep_delay = override_sleep_delay
else:
periods_list = list(update_ids_to_update_secs.values())
self.sleep_delay = math_utils.gcd_float_sequence(periods_list)
- logger.info(f'Computed sleep_delay={self.sleep_delay}')
+ logger.info('Computed sleep_delay=%.1f', self.sleep_delay)
(thread, stop_event) = self.pace_maker()
self.should_terminate = stop_event
self.updater_thread = thread
def shutdown(self):
"""Terminates the background thread and waits for it to tear down.
This may block for as long as self.sleep_delay.
+
"""
- logger.debug(
- 'Setting shutdown event and waiting for background thread.'
- )
+ logger.debug('Setting shutdown event and waiting for background thread.')
self.should_terminate.set()
self.updater_thread.join()
logger.debug('Background thread terminated.')
+
+
+class WaitableAutomaticStateTracker(AutomaticStateTracker):
+ """This is an AutomaticStateTracker that exposes a wait method which
+ will block the calling thread until the state changes with an
+ optional timeout. The caller should check the return value of
+ wait; it will be true if something changed and false if the wait
+ simply timed out. If the return value is true, the instance
+ should be reset() before wait is called again.
+
+ Example usage:
+
+ detector = waitable_presence.WaitableAutomaticStateSubclass()
+ while True:
+ changed = detector.wait(timeout=60 * 5)
+ if changed:
+ detector.reset()
+ # Figure out what changed and react
+ else:
+ # Just a timeout; no need to reset. Maybe do something
+ # else before looping up into wait again.
+
+ """
+
+ def __init__(
+ self,
+ update_ids_to_update_secs: Dict[str, float],
+ *,
+ override_sleep_delay: Optional[float] = None,
+ ) -> None:
+ self._something_changed = threading.Event()
+ super().__init__(update_ids_to_update_secs, override_sleep_delay=override_sleep_delay)
+
+ def something_changed(self):
+ self._something_changed.set()
+
+ def did_something_change(self) -> bool:
+ return self._something_changed.is_set()
+
+ def reset(self):
+ self._something_changed.clear()
+
+ def wait(self, *, timeout=None):
+ return self._something_changed.wait(timeout=timeout)