#!/usr/bin/env python3 from abc import ABC, abstractmethod import datetime import logging import time from typing import Dict, Optional import pytz import math_utils from thread_utils import background_thread logger = logging.getLogger(__name__) class StateTracker(ABC): """A base class that maintains and updates a global state via an update routine. Instances of this class should be periodically invoked via the heartbeat() method. This method, in turn, invokes update() with update_ids according to a schedule / periodicity provided to the c'tor. """ def __init__(self, update_ids_to_update_secs: Dict[str, float]) -> None: """The update_ids_to_update_secs dict parameter describes one or more update types (unique update_ids) and the periodicity(ies), in seconds, at which it/they should be invoked. Note that, when more than one update is overdue, they will be invoked in order by their update_ids so care in choosing these identifiers may be in order. """ self.update_ids_to_update_secs = update_ids_to_update_secs self.last_reminder_ts: Dict[str, Optional[datetime.datetime]] = {} for x in update_ids_to_update_secs.keys(): self.last_reminder_ts[x] = None @abstractmethod def update( self, update_id: str, now: datetime.datetime, last_invocation: Optional[datetime.datetime], ) -> None: """Put whatever you want here. The update_id will be the string passed to the c'tor as a key in the Dict. It will only be tapped on the shoulder, at most, every update_secs seconds. The now param is the approximate current timestamp and the last_invocation param is the last time you were invoked (or None on the first invocation) """ pass def heartbeat(self, *, force_all_updates_to_run: bool = False) -> None: """Invoke this method to cause the StateTracker instance to identify and invoke any overdue updates based on the schedule passed to the c'tor. In the base StateTracker class, this method must be invoked manually with a thread from external code. If more than one type of update (update_id) are overdue, they will be invoked in order based on their update_ids. Setting force_all_updates_to_run will invoke all updates (ordered by update_id) immediately ignoring whether or not they are due. """ self.now = datetime.datetime.now(tz=pytz.timezone("US/Pacific")) for update_id in sorted(self.last_reminder_ts.keys()): if force_all_updates_to_run: logger.debug('Forcing all updates to run') self.update( update_id, self.now, self.last_reminder_ts[update_id] ) self.last_reminder_ts[update_id] = self.now return refresh_secs = self.update_ids_to_update_secs[update_id] last_run = self.last_reminder_ts[update_id] if last_run is None: # Never run before logger.debug( f'id {update_id} has never been run; running it now' ) self.update( update_id, self.now, self.last_reminder_ts[update_id] ) self.last_reminder_ts[update_id] = self.now else: delta = self.now - last_run if delta.total_seconds() >= refresh_secs: # Is overdue? logger.debug(f'id {update_id} is overdue; running it now') self.update( update_id, self.now, self.last_reminder_ts[update_id], ) self.last_reminder_ts[update_id] = self.now class AutomaticStateTracker(StateTracker): """Just like HeartbeatCurrentState but you don't need to pump the heartbeat; it runs on a background thread. Call .shutdown() to terminate the updates. """ @background_thread def pace_maker(self, should_terminate) -> None: """Entry point for a background thread to own calling heartbeat() at regular intervals so that the main thread doesn't need to do so.""" while True: if should_terminate.is_set(): logger.debug('pace_maker noticed event; shutting down') return self.heartbeat() logger.debug(f'page_maker is sleeping for {self.sleep_delay}s') time.sleep(self.sleep_delay) def __init__( self, update_ids_to_update_secs: Dict[str, float], *, override_sleep_delay: Optional[float] = None, ) -> None: super().__init__(update_ids_to_update_secs) if override_sleep_delay is not None: logger.debug(f'Overriding sleep delay to {override_sleep_delay}') self.sleep_delay = override_sleep_delay else: periods_list = list(update_ids_to_update_secs.values()) self.sleep_delay = math_utils.gcd_float_sequence(periods_list) logger.info(f'Computed sleep_delay={self.sleep_delay}') (thread, stop_event) = self.pace_maker() self.should_terminate = stop_event self.updater_thread = thread def shutdown(self): """Terminates the background thread and waits for it to tear down. This may block for as long as self.sleep_delay. """ logger.debug( 'Setting shutdown event and waiting for background thread.' ) self.should_terminate.set() self.updater_thread.join() logger.debug('Background thread terminated.')