3 # © Copyright 2021-2022, Scott Gasch
5 """Several helpers to keep track of internal state via periodic
6 polling. StateTracker expects to be invoked periodically to maintain
7 state whereas the others automatically update themselves and,
8 optionally, expose an event for client code to wait on state changes.
16 from abc import ABC, abstractmethod
17 from typing import Dict, Optional
21 from thread_utils import background_thread
23 logger = logging.getLogger(__name__)
26 class StateTracker(ABC):
27 """A base class that maintains and updates a global state via an
28 update routine. Instances of this class should be periodically
29 invoked via the heartbeat() method. This method, in turn, invokes
30 update() with update_ids according to a schedule / periodicity
31 provided to the c'tor.
35 def __init__(self, update_ids_to_update_secs: Dict[str, float]) -> None:
36 """The update_ids_to_update_secs dict parameter describes one or more
37 update types (unique update_ids) and the periodicity(ies), in
38 seconds, at which it/they should be invoked.
40 Note that, when more than one update is overdue, they will be
41 invoked in order by their update_ids so care in choosing these
42 identifiers may be in order.
45 self.update_ids_to_update_secs = update_ids_to_update_secs
46 self.last_reminder_ts: Dict[str, Optional[datetime.datetime]] = {}
47 self.now: Optional[datetime.datetime] = None
48 for x in update_ids_to_update_secs.keys():
49 self.last_reminder_ts[x] = None
55 now: datetime.datetime,
56 last_invocation: Optional[datetime.datetime],
58 """Put whatever you want here. The update_id will be the string
59 passed to the c'tor as a key in the Dict. It will only be
60 tapped on the shoulder, at most, every update_secs seconds.
61 The now param is the approximate current timestamp and the
62 last_invocation param is the last time you were invoked (or
63 None on the first invocation)
68 def heartbeat(self, *, force_all_updates_to_run: bool = False) -> None:
69 """Invoke this method to cause the StateTracker instance to identify
70 and invoke any overdue updates based on the schedule passed to
71 the c'tor. In the base StateTracker class, this method must
72 be invoked manually with a thread from external code.
74 If more than one type of update (update_id) are overdue,
75 they will be invoked in order based on their update_ids.
77 Setting force_all_updates_to_run will invoke all updates
78 (ordered by update_id) immediately ignoring whether or not
82 self.now = datetime.datetime.now(tz=pytz.timezone("US/Pacific"))
83 for update_id in sorted(self.last_reminder_ts.keys()):
84 if force_all_updates_to_run:
85 logger.debug('Forcing all updates to run')
86 self.update(update_id, self.now, self.last_reminder_ts[update_id])
87 self.last_reminder_ts[update_id] = self.now
90 refresh_secs = self.update_ids_to_update_secs[update_id]
91 last_run = self.last_reminder_ts[update_id]
92 if last_run is None: # Never run before
93 logger.debug('id %s has never been run; running it now', update_id)
94 self.update(update_id, self.now, self.last_reminder_ts[update_id])
95 self.last_reminder_ts[update_id] = self.now
97 delta = self.now - last_run
98 if delta.total_seconds() >= refresh_secs: # Is overdue?
99 logger.debug('id %s is overdue; running it now', update_id)
103 self.last_reminder_ts[update_id],
105 self.last_reminder_ts[update_id] = self.now
108 class AutomaticStateTracker(StateTracker):
109 """Just like HeartbeatCurrentState but you don't need to pump the
110 heartbeat; it runs on a background thread. Call .shutdown() to
111 terminate the updates.
116 def pace_maker(self, should_terminate: threading.Event) -> None:
117 """Entry point for a background thread to own calling heartbeat()
118 at regular intervals so that the main thread doesn't need to do
123 if should_terminate.is_set():
124 logger.debug('pace_maker noticed event; shutting down')
127 logger.debug('pace_maker is sleeping for %.1fs', self.sleep_delay)
128 time.sleep(self.sleep_delay)
132 update_ids_to_update_secs: Dict[str, float],
134 override_sleep_delay: Optional[float] = None,
138 super().__init__(update_ids_to_update_secs)
139 if override_sleep_delay is not None:
140 logger.debug('Overriding sleep delay to %.1f', override_sleep_delay)
141 self.sleep_delay = override_sleep_delay
143 periods_list = list(update_ids_to_update_secs.values())
144 self.sleep_delay = math_utils.gcd_float_sequence(periods_list)
145 logger.info('Computed sleep_delay=%.1f', self.sleep_delay)
146 (thread, stop_event) = self.pace_maker()
147 self.should_terminate = stop_event
148 self.updater_thread = thread
151 """Terminates the background thread and waits for it to tear down.
152 This may block for as long as self.sleep_delay.
155 logger.debug('Setting shutdown event and waiting for background thread.')
156 self.should_terminate.set()
157 self.updater_thread.join()
158 logger.debug('Background thread terminated.')
161 class WaitableAutomaticStateTracker(AutomaticStateTracker):
162 """This is an AutomaticStateTracker that exposes a wait method which
163 will block the calling thread until the state changes with an
164 optional timeout. The caller should check the return value of
165 wait; it will be true if something changed and false if the wait
166 simply timed out. If the return value is true, the instance
167 should be reset() before wait is called again.
171 detector = waitable_presence.WaitableAutomaticStateSubclass()
173 changed = detector.wait(timeout=60 * 5)
176 # Figure out what changed and react
178 # Just a timeout; no need to reset. Maybe do something
179 # else before looping up into wait again.
185 update_ids_to_update_secs: Dict[str, float],
187 override_sleep_delay: Optional[float] = None,
189 self._something_changed = threading.Event()
190 super().__init__(update_ids_to_update_secs, override_sleep_delay=override_sleep_delay)
192 def something_changed(self):
193 self._something_changed.set()
195 def did_something_change(self) -> bool:
196 return self._something_changed.is_set()
199 self._something_changed.clear()
201 def wait(self, *, timeout=None):
202 return self._something_changed.wait(timeout=timeout)