3 # © Copyright 2021-2022, Scott Gasch
5 """Several helpers to keep track of internal state via periodic
6 polling. StateTracker expects to be invoked periodically to maintain
7 state whereas the others automatically update themselves and,
8 optionally, expose an event for client code to wait on state changes.
15 from abc import ABC, abstractmethod
16 from typing import Dict, Optional
20 from thread_utils import background_thread
22 logger = logging.getLogger(__name__)
25 class StateTracker(ABC):
26 """A base class that maintains and updates a global state via an
27 update routine. Instances of this class should be periodically
28 invoked via the heartbeat() method. This method, in turn, invokes
29 update() with update_ids according to a schedule / periodicity
30 provided to the c'tor.
33 def __init__(self, update_ids_to_update_secs: Dict[str, float]) -> None:
34 """The update_ids_to_update_secs dict parameter describes one or more
35 update types (unique update_ids) and the periodicity(ies), in
36 seconds, at which it/they should be invoked.
38 Note that, when more than one update is overdue, they will be
39 invoked in order by their update_ids so care in choosing these
40 identifiers may be in order.
42 self.update_ids_to_update_secs = update_ids_to_update_secs
43 self.last_reminder_ts: Dict[str, Optional[datetime.datetime]] = {}
44 self.now: Optional[datetime.datetime] = None
45 for x in update_ids_to_update_secs.keys():
46 self.last_reminder_ts[x] = None
52 now: datetime.datetime,
53 last_invocation: Optional[datetime.datetime],
55 """Put whatever you want here. The update_id will be the string
56 passed to the c'tor as a key in the Dict. It will only be
57 tapped on the shoulder, at most, every update_secs seconds.
58 The now param is the approximate current timestamp and the
59 last_invocation param is the last time you were invoked (or
60 None on the first invocation)
64 def heartbeat(self, *, force_all_updates_to_run: bool = False) -> None:
65 """Invoke this method to cause the StateTracker instance to identify
66 and invoke any overdue updates based on the schedule passed to
67 the c'tor. In the base StateTracker class, this method must
68 be invoked manually with a thread from external code.
70 If more than one type of update (update_id) are overdue,
71 they will be invoked in order based on their update_ids.
73 Setting force_all_updates_to_run will invoke all updates
74 (ordered by update_id) immediately ignoring whether or not
78 self.now = datetime.datetime.now(tz=pytz.timezone("US/Pacific"))
79 for update_id in sorted(self.last_reminder_ts.keys()):
80 if force_all_updates_to_run:
81 logger.debug('Forcing all updates to run')
82 self.update(update_id, self.now, self.last_reminder_ts[update_id])
83 self.last_reminder_ts[update_id] = self.now
86 refresh_secs = self.update_ids_to_update_secs[update_id]
87 last_run = self.last_reminder_ts[update_id]
88 if last_run is None: # Never run before
89 logger.debug('id %s has never been run; running it now', update_id)
90 self.update(update_id, self.now, self.last_reminder_ts[update_id])
91 self.last_reminder_ts[update_id] = self.now
93 delta = self.now - last_run
94 if delta.total_seconds() >= refresh_secs: # Is overdue?
95 logger.debug('id %s is overdue; running it now', update_id)
99 self.last_reminder_ts[update_id],
101 self.last_reminder_ts[update_id] = self.now
104 class AutomaticStateTracker(StateTracker):
105 """Just like HeartbeatCurrentState but you don't need to pump the
106 heartbeat; it runs on a background thread. Call .shutdown() to
107 terminate the updates.
111 def pace_maker(self, should_terminate: threading.Event) -> None:
112 """Entry point for a background thread to own calling heartbeat()
113 at regular intervals so that the main thread doesn't need to do
117 if should_terminate.is_set():
118 logger.debug('pace_maker noticed event; shutting down')
121 logger.debug('pace_maker is sleeping for %.1fs', self.sleep_delay)
122 time.sleep(self.sleep_delay)
126 update_ids_to_update_secs: Dict[str, float],
128 override_sleep_delay: Optional[float] = None,
132 super().__init__(update_ids_to_update_secs)
133 if override_sleep_delay is not None:
134 logger.debug('Overriding sleep delay to %.1f', override_sleep_delay)
135 self.sleep_delay = override_sleep_delay
137 periods_list = list(update_ids_to_update_secs.values())
138 self.sleep_delay = math_utils.gcd_float_sequence(periods_list)
139 logger.info('Computed sleep_delay=%.1f', self.sleep_delay)
140 (thread, stop_event) = self.pace_maker()
141 self.should_terminate = stop_event
142 self.updater_thread = thread
145 """Terminates the background thread and waits for it to tear down.
146 This may block for as long as self.sleep_delay.
149 logger.debug('Setting shutdown event and waiting for background thread.')
150 self.should_terminate.set()
151 self.updater_thread.join()
152 logger.debug('Background thread terminated.')
155 class WaitableAutomaticStateTracker(AutomaticStateTracker):
156 """This is an AutomaticStateTracker that exposes a wait method which
157 will block the calling thread until the state changes with an
158 optional timeout. The caller should check the return value of
159 wait; it will be true if something changed and false if the wait
160 simply timed out. If the return value is true, the instance
161 should be reset() before wait is called again.
165 detector = waitable_presence.WaitableAutomaticStateSubclass()
167 changed = detector.wait(timeout=60 * 5)
170 # Figure out what changed and react
172 # Just a timeout; no need to reset. Maybe do something
173 # else before looping up into wait again.
178 update_ids_to_update_secs: Dict[str, float],
180 override_sleep_delay: Optional[float] = None,
182 self._something_changed = threading.Event()
183 super().__init__(update_ids_to_update_secs, override_sleep_delay=override_sleep_delay)
185 def something_changed(self):
186 self._something_changed.set()
188 def did_something_change(self) -> bool:
189 return self._something_changed.is_set()
192 self._something_changed.clear()
194 def wait(self, *, timeout=None):
195 return self._something_changed.wait(timeout=timeout)