3 # © Copyright 2021-2022, Scott Gasch
5 """This module defines several classes (:py:class:`StateTracker`,
6 :py:class:`AutomaticStateTracker`, and
7 :py:class:`WaitableAutomaticStateTracker`) that can be used as base
8 classes by your code. These class patterns are meant to encapsulate
9 and represent some state that dynamically changes and must be updated
10 periodically. These classes update their state (either automatically
11 or when invoked to poll) and allow their callers to wait on state
19 from abc import ABC, abstractmethod
20 from typing import Dict, Optional
24 from pyutils.parallelize.thread_utils import background_thread
26 logger = logging.getLogger(__name__)
29 class StateTracker(ABC):
30 """A base class that maintains and updates its state via an update
31 routine called :meth:`heartbeat`. This method is not automatic:
32 instances of this class should be periodically invoked via their
33 :meth:`heartbeat` method by some other thread.
35 See also :class:`AutomaticStateTracker` if you'd rather not have
36 to invoke your code regularly.
39 def __init__(self, update_ids_to_update_secs: Dict[str, float]) -> None:
40 """The update_ids_to_update_secs dict parameter describes one
41 or more update types (unique update_ids) and the
42 periodicity(ies), in seconds, at which it/they should be
46 When more than one update is overdue, they will be
47 invoked in order by their update_ids so care in choosing these
48 identifiers may be in order.
51 update_ids_to_update_secs: a dict mapping a user-defined
52 update_id into a period (number of seconds) with which
53 we would like this update performed. e.g.::
55 update_ids_to_update_secs = {
56 'refresh_local_state': 10.0,
57 'refresh_remote_state': 60.0,
60 This would indicate that every 10s we would like to
61 refresh local state whereas every 60s we'd like to
65 self.update_ids_to_update_secs = update_ids_to_update_secs
66 self.last_reminder_ts: Dict[str, Optional[datetime.datetime]] = {}
67 self.now: Optional[datetime.datetime] = None
68 for x in update_ids_to_update_secs.keys():
69 self.last_reminder_ts[x] = None
75 now: datetime.datetime,
76 last_invocation: Optional[datetime.datetime],
78 """Put whatever you want here to perform your state updates.
81 update_id: the string you passed to the c'tor as a key in
82 the update_ids_to_update_secs dict. :meth:`update` will
83 only be invoked, at most, every update_secs
86 now: the approximate current timestamp at invocation time.
88 last_invocation: the last time this operation was invoked
89 (or None on the first invocation).
93 def heartbeat(self, *, force_all_updates_to_run: bool = False) -> None:
94 """Invoke this method periodically to cause the :class:`StateTracker`
95 instance to identify and invoke any overdue updates based on the
96 schedule passed to the c'tor. In the base :class:`StateTracker` class,
97 this method must be invoked manually by a thread from external code.
98 Other subclasses (e.g. :class:`AutomaticStateTracker`) are available
99 that create their own updater threads (see below).
101 If more than one type of update (`update_id`) is overdue,
102 overdue updates will be invoked in order based on their `update_id`.
104 Setting `force_all_updates_to_run` will invoke all updates
105 (ordered by `update_id`) immediately ignoring whether or not
109 self.now = datetime.datetime.now(tz=pytz.timezone("US/Pacific"))
110 for update_id in sorted(self.last_reminder_ts.keys()):
111 if force_all_updates_to_run:
112 logger.debug('Forcing all updates to run')
113 self.update(update_id, self.now, self.last_reminder_ts[update_id])
114 self.last_reminder_ts[update_id] = self.now
117 refresh_secs = self.update_ids_to_update_secs[update_id]
118 last_run = self.last_reminder_ts[update_id]
119 if last_run is None: # Never run before
120 logger.debug('id %s has never been run; running it now', update_id)
121 self.update(update_id, self.now, self.last_reminder_ts[update_id])
122 self.last_reminder_ts[update_id] = self.now
124 delta = self.now - last_run
125 if delta.total_seconds() >= refresh_secs: # Is overdue?
126 logger.debug('id %s is overdue; running it now', update_id)
130 self.last_reminder_ts[update_id],
132 self.last_reminder_ts[update_id] = self.now
135 class AutomaticStateTracker(StateTracker):
136 """Just like :class:`StateTracker` but you don't need to pump the
137 :meth:`heartbeat` method periodically because we create a background
138 thread that manages periodic calling. You must call :meth:`shutdown`,
139 though, in order to terminate the update thread.
143 def _pace_maker(self, should_terminate: threading.Event) -> None:
144 """Entry point for a background thread to own calling :meth:`heartbeat`
145 at regular intervals so that the main thread doesn't need to
149 should_terminate: an event which, when set, indicates we should terminate.
152 if should_terminate.is_set():
153 logger.debug('_pace_maker noticed event; shutting down')
156 logger.debug('_pace_maker is sleeping for %.1fs', self.sleep_delay)
157 time.sleep(self.sleep_delay)
161 update_ids_to_update_secs: Dict[str, float],
163 override_sleep_delay: Optional[float] = None,
165 """Construct an AutomaticStateTracker.
168 update_ids_to_update_secs: a dict mapping a user-defined
169 update_id into a period (number of seconds) with which
170 we would like this update performed. e.g.::
172 update_ids_to_update_secs = {
173 'refresh_local_state': 10.0,
174 'refresh_remote_state': 60.0,
177 This would indicate that every 10s we would like to
178 refresh local state whereas every 60s we'd like to
179 refresh remote state.
181 override_sleep_delay: By default, this class determines
182 how long the background thread should sleep between
183 automatic invocations to :meth:`heartbeat` based on the
184 period of each update type in update_ids_to_update_secs.
185 If this argument is non-None, it overrides this computation
186 and uses this period as the sleep in the background thread.
188 from pyutils import math_utils
190 super().__init__(update_ids_to_update_secs)
191 if override_sleep_delay is not None:
192 logger.debug('Overriding sleep delay to %.1f', override_sleep_delay)
193 self.sleep_delay = override_sleep_delay
195 periods_list = list(update_ids_to_update_secs.values())
196 self.sleep_delay = math_utils.gcd_float_sequence(periods_list)
197 logger.info('Computed sleep_delay=%.1f', self.sleep_delay)
198 (thread, stop_event) = self._pace_maker()
199 self.should_terminate = stop_event
200 self.updater_thread = thread
203 """Terminates the background thread and waits for it to tear down.
204 This may block for as long as `self.sleep_delay`.
206 logger.debug('Setting shutdown event and waiting for background thread.')
207 self.should_terminate.set()
208 self.updater_thread.join()
209 logger.debug('Background thread terminated.')
212 class WaitableAutomaticStateTracker(AutomaticStateTracker):
213 """This is an AutomaticStateTracker that exposes a wait method which
214 will block the calling thread until the state changes with an
215 optional timeout. The caller should check the return value of
216 wait; it will be true if something changed and false if the wait
217 simply timed out. If the return value is true, the instance
218 should be reset() before wait is called again.
222 detector = waitable_presence.WaitableAutomaticStateSubclass()
224 changed = detector.wait(timeout=60)
227 # Figure out what changed and react somehow
229 # Just a timeout; no need to reset. Maybe do something
230 # else before looping up into wait again.
235 update_ids_to_update_secs: Dict[str, float],
237 override_sleep_delay: Optional[float] = None,
239 """Construct an WaitableAutomaticStateTracker.
242 update_ids_to_update_secs: a dict mapping a user-defined
243 update_id into a period (number of seconds) with which
244 we would like this update performed. e.g.::
246 update_ids_to_update_secs = {
247 'refresh_local_state': 10.0,
248 'refresh_remote_state': 60.0,
251 This would indicate that every 10s we would like to
252 refresh local state whereas every 60s we'd like to
253 refresh remote state.
255 override_sleep_delay: By default, this class determines
256 how long the background thread should sleep between
257 automatic invocations to :meth:`heartbeat` based on the
258 period of each update type in update_ids_to_update_secs.
259 If this argument is non-None, it overrides this computation
260 and uses this period as the sleep in the background thread.
262 self._something_changed = threading.Event()
264 update_ids_to_update_secs, override_sleep_delay=override_sleep_delay
267 def something_changed(self):
268 """Indicate that something has changed."""
269 self._something_changed.set()
271 def did_something_change(self) -> bool:
272 """Indicate whether some state has changed in the background."""
273 return self._something_changed.is_set()
276 """Call to clear the 'something changed' bit. See usage above."""
277 self._something_changed.clear()
279 def wait(self, *, timeout=None):
280 """Blocking wait for something to change or a timeout to lapse.
283 timeout: maximum amount of time to wait. If None, wait
284 forever (until something changes or shutdown).
286 return self._something_changed.wait(timeout=timeout)