import datetime
import logging
import time
+import threading
from typing import Dict, Optional
import pytz
invoked via the heartbeat() method. This method, in turn, invokes
update() with update_ids according to a schedule / periodicity
provided to the c'tor.
- """
+ """
def __init__(self, update_ids_to_update_secs: Dict[str, float]) -> None:
"""The update_ids_to_update_secs dict parameter describes one or more
update types (unique update_ids) and the periodicity(ies), in
Note that, when more than one update is overdue, they will be
invoked in order by their update_ids so care in choosing these
identifiers may be in order.
+
"""
self.update_ids_to_update_secs = update_ids_to_update_secs
self.last_reminder_ts: Dict[str, Optional[datetime.datetime]] = {}
The now param is the approximate current timestamp and the
last_invocation param is the last time you were invoked (or
None on the first invocation)
+
"""
pass
Setting force_all_updates_to_run will invoke all updates
(ordered by update_id) immediately ignoring whether or not
they are due.
+
"""
self.now = datetime.datetime.now(tz=pytz.timezone("US/Pacific"))
for update_id in sorted(self.last_reminder_ts.keys()):
"""Just like HeartbeatCurrentState but you don't need to pump the
heartbeat; it runs on a background thread. Call .shutdown() to
terminate the updates.
- """
+ """
@background_thread
def pace_maker(self, should_terminate) -> None:
"""Entry point for a background thread to own calling heartbeat()
at regular intervals so that the main thread doesn't need to do
- so."""
+ so.
+
+ """
while True:
if should_terminate.is_set():
logger.debug('pace_maker noticed event; shutting down')
return
self.heartbeat()
- logger.debug(f'page_maker is sleeping for {self.sleep_delay}s')
+ logger.debug(f'pace_maker is sleeping for {self.sleep_delay}s')
time.sleep(self.sleep_delay)
def __init__(
def shutdown(self):
"""Terminates the background thread and waits for it to tear down.
This may block for as long as self.sleep_delay.
+
"""
logger.debug(
'Setting shutdown event and waiting for background thread.'
self.should_terminate.set()
self.updater_thread.join()
logger.debug('Background thread terminated.')
+
+
+class WaitableAutomaticStateTracker(AutomaticStateTracker):
+ """This is an AutomaticStateTracker that exposes a wait method which
+ will block the calling thread until the state changes with an
+ optional timeout. The caller should check the return value of
+ wait; it will be true if something changed and false if the wait
+ simply timed out. If the return value is true, the instance
+ should be reset() before wait is called again.
+
+ Example usage:
+
+ detector = waitable_presence.WaitableAutomaticStateSubclass()
+ while True:
+ changed = detector.wait(timeout=60 * 5)
+ if changed:
+ detector.reset()
+ # Figure out what changed and react
+ else:
+ # Just a timeout; no need to reset. Maybe do something
+ # else before looping up into wait again.
+
+ """
+ def __init__(
+ self,
+ update_ids_to_update_secs: Dict[str, float],
+ *,
+ override_sleep_delay: Optional[float] = None,
+ ) -> None:
+ self._something_changed = threading.Event()
+ super().__init__(update_ids_to_update_secs,
+ override_sleep_delay=override_sleep_delay)
+
+ def something_changed(self):
+ self._something_changed.set()
+
+ def did_something_change(self) -> bool:
+ return self._something_changed.is_set()
+
+ def reset(self):
+ self._something_changed.clear()
+
+ def wait(self,
+ *,
+ timeout=None):
+ return self._something_changed.wait(
+ timeout=timeout
+ )