3 from abc import ABC, abstractmethod
7 from typing import Dict, Optional
12 from thread_utils import background_thread
14 logger = logging.getLogger(__name__)
17 class StateTracker(ABC):
18 """A base class that maintains and updates a global state via an
19 update routine. Instances of this class should be periodically
20 invoked via the heartbeat() method. This method, in turn, invokes
21 update() with update_ids according to a schedule / periodicity
22 provided to the c'tor.
25 def __init__(self, update_ids_to_update_secs: Dict[str, float]) -> None:
26 """The update_ids_to_update_secs dict parameter describes one or more
27 update types (unique update_ids) and the periodicity(ies), in
28 seconds, at which it/they should be invoked.
30 Note that, when more than one update is overdue, they will be
31 invoked in order by their update_ids so care in choosing these
32 identifiers may be in order.
34 self.update_ids_to_update_secs = update_ids_to_update_secs
35 self.last_reminder_ts: Dict[str, Optional[datetime.datetime]] = {}
36 for x in update_ids_to_update_secs.keys():
37 self.last_reminder_ts[x] = None
43 now: datetime.datetime,
44 last_invocation: Optional[datetime.datetime],
46 """Put whatever you want here. The update_id will be the string
47 passed to the c'tor as a key in the Dict. It will only be
48 tapped on the shoulder, at most, every update_secs seconds.
49 The now param is the approximate current timestamp and the
50 last_invocation param is the last time you were invoked (or
51 None on the first invocation)
55 def heartbeat(self, *, force_all_updates_to_run: bool = False) -> None:
56 """Invoke this method to cause the StateTracker instance to identify
57 and invoke any overdue updates based on the schedule passed to
58 the c'tor. In the base StateTracker class, this method must
59 be invoked manually with a thread from external code.
61 If more than one type of update (update_id) are overdue,
62 they will be invoked in order based on their update_ids.
64 Setting force_all_updates_to_run will invoke all updates
65 (ordered by update_id) immediately ignoring whether or not
68 self.now = datetime.datetime.now(tz=pytz.timezone("US/Pacific"))
69 for update_id in sorted(self.last_reminder_ts.keys()):
70 if force_all_updates_to_run:
71 logger.debug('Forcing all updates to run')
73 update_id, self.now, self.last_reminder_ts[update_id]
75 self.last_reminder_ts[update_id] = self.now
78 refresh_secs = self.update_ids_to_update_secs[update_id]
79 last_run = self.last_reminder_ts[update_id]
80 if last_run is None: # Never run before
82 f'id {update_id} has never been run; running it now'
85 update_id, self.now, self.last_reminder_ts[update_id]
87 self.last_reminder_ts[update_id] = self.now
89 delta = self.now - last_run
90 if delta.total_seconds() >= refresh_secs: # Is overdue?
91 logger.debug(f'id {update_id} is overdue; running it now')
95 self.last_reminder_ts[update_id],
97 self.last_reminder_ts[update_id] = self.now
100 class AutomaticStateTracker(StateTracker):
101 """Just like HeartbeatCurrentState but you don't need to pump the
102 heartbeat; it runs on a background thread. Call .shutdown() to
103 terminate the updates.
107 def pace_maker(self, should_terminate) -> None:
108 """Entry point for a background thread to own calling heartbeat()
109 at regular intervals so that the main thread doesn't need to do
112 if should_terminate.is_set():
113 logger.debug('pace_maker noticed event; shutting down')
116 logger.debug(f'page_maker is sleeping for {self.sleep_delay}s')
117 time.sleep(self.sleep_delay)
121 update_ids_to_update_secs: Dict[str, float],
123 override_sleep_delay: Optional[float] = None,
125 super().__init__(update_ids_to_update_secs)
126 if override_sleep_delay is not None:
127 logger.debug(f'Overriding sleep delay to {override_sleep_delay}')
128 self.sleep_delay = override_sleep_delay
130 periods_list = list(update_ids_to_update_secs.values())
131 self.sleep_delay = math_utils.gcd_float_sequence(periods_list)
132 logger.info(f'Computed sleep_delay={self.sleep_delay}')
133 (thread, stop_event) = self.pace_maker()
134 self.should_terminate = stop_event
135 self.updater_thread = thread
138 """Terminates the background thread and waits for it to tear down.
139 This may block for as long as self.sleep_delay.
142 'Setting shutdown event and waiting for background thread.'
144 self.should_terminate.set()
145 self.updater_thread.join()
146 logger.debug('Background thread terminated.')