3 # © Copyright 2021-2022, Scott Gasch
5 """This module defines several classes (:py:class:`StateTracker`,
6 :py:class:`AutomaticStateTracker`, and
7 :py:class:`WaitableAutomaticStateTracker`) that can be used as base
8 classes by your code. These class patterns are meant to encapsulate
9 and represent some state that dynamically changes and must be updated
10 periodically. These classes update their state (either automatically
11 or when invoked to poll) and allow their callers to wait on state
14 See also :class:`pyutils.parallelize.thread_utils.periodically_invoke`
21 from abc import ABC, abstractmethod
22 from typing import Dict, Optional
26 from pyutils.parallelize.thread_utils import background_thread
28 logger = logging.getLogger(__name__)
31 class StateTracker(ABC):
32 """A base class that maintains and updates its state via an update
33 routine called :meth:`heartbeat`. This method is not automatic:
34 instances of this class should be periodically invoked via their
35 :meth:`heartbeat` method by some other thread.
37 See also :class:`AutomaticStateTracker` if you'd rather not have
38 to invoke your code regularly.
41 def __init__(self, update_ids_to_update_secs: Dict[str, float]) -> None:
42 """The update_ids_to_update_secs dict parameter describes one
43 or more update types (unique update_ids) and the
44 periodicity(ies), in seconds, at which it/they should be
48 When more than one update is overdue, they will be
49 invoked in order by their update_ids so care in choosing these
50 identifiers may be in order.
53 update_ids_to_update_secs: a dict mapping a user-defined
54 update_id into a period (number of seconds) with which
55 we would like this update performed. e.g.::
57 update_ids_to_update_secs = {
58 'refresh_local_state': 10.0,
59 'refresh_remote_state': 60.0,
62 This would indicate that every 10s we would like to
63 refresh local state whereas every 60s we'd like to
67 self.update_ids_to_update_secs = update_ids_to_update_secs
68 self.last_reminder_ts: Dict[str, Optional[datetime.datetime]] = {}
69 self.now: Optional[datetime.datetime] = None
70 for x in update_ids_to_update_secs.keys():
71 self.last_reminder_ts[x] = None
77 now: datetime.datetime,
78 last_invocation: Optional[datetime.datetime],
80 """Put whatever you want here to perform your state updates.
83 update_id: the string you passed to the c'tor as a key in
84 the update_ids_to_update_secs dict. :meth:`update` will
85 only be invoked, at most, every update_secs
88 now: the approximate current timestamp at invocation time.
90 last_invocation: the last time this operation was invoked
91 (or None on the first invocation).
95 def heartbeat(self, *, force_all_updates_to_run: bool = False) -> None:
96 """Invoke this method periodically to cause the :class:`StateTracker`
97 instance to identify and invoke any overdue updates based on the
98 schedule passed to the c'tor. In the base :class:`StateTracker` class,
99 this method must be invoked manually by a thread from external code.
100 Other subclasses (e.g. :class:`AutomaticStateTracker`) are available
101 that create their own updater threads (see below).
103 If more than one type of update (`update_id`) is overdue,
104 overdue updates will be invoked in order based on their `update_id`.
106 Setting `force_all_updates_to_run` will invoke all updates
107 (ordered by `update_id`) immediately ignoring whether or not
111 self.now = datetime.datetime.now(tz=pytz.timezone("US/Pacific"))
112 for update_id in sorted(self.last_reminder_ts.keys()):
113 if force_all_updates_to_run:
114 logger.debug('Forcing all updates to run')
115 self.update(update_id, self.now, self.last_reminder_ts[update_id])
116 self.last_reminder_ts[update_id] = self.now
119 refresh_secs = self.update_ids_to_update_secs[update_id]
120 last_run = self.last_reminder_ts[update_id]
121 if last_run is None: # Never run before
122 logger.debug('id %s has never been run; running it now', update_id)
123 self.update(update_id, self.now, self.last_reminder_ts[update_id])
124 self.last_reminder_ts[update_id] = self.now
126 delta = self.now - last_run
127 if delta.total_seconds() >= refresh_secs: # Is overdue?
128 logger.debug('id %s is overdue; running it now', update_id)
132 self.last_reminder_ts[update_id],
134 self.last_reminder_ts[update_id] = self.now
137 class AutomaticStateTracker(StateTracker):
138 """Just like :class:`StateTracker` but you don't need to pump the
139 :meth:`heartbeat` method periodically because we create a background
140 thread that manages periodic calling. You must call :meth:`shutdown`,
141 though, in order to terminate the update thread.
145 def _pace_maker(self, should_terminate: threading.Event) -> None:
146 """Entry point for a background thread to own calling :meth:`heartbeat`
147 at regular intervals so that the main thread doesn't need to
151 should_terminate: an event which, when set, indicates we should terminate.
154 if should_terminate.is_set():
155 logger.debug('_pace_maker noticed event; shutting down')
158 logger.debug('_pace_maker is sleeping for %.1fs', self.sleep_delay)
159 time.sleep(self.sleep_delay)
163 update_ids_to_update_secs: Dict[str, float],
165 override_sleep_delay: Optional[float] = None,
167 """Construct an AutomaticStateTracker.
170 update_ids_to_update_secs: a dict mapping a user-defined
171 update_id into a period (number of seconds) with which
172 we would like this update performed. e.g.::
174 update_ids_to_update_secs = {
175 'refresh_local_state': 10.0,
176 'refresh_remote_state': 60.0,
179 This would indicate that every 10s we would like to
180 refresh local state whereas every 60s we'd like to
181 refresh remote state.
183 override_sleep_delay: By default, this class determines
184 how long the background thread should sleep between
185 automatic invocations to :meth:`heartbeat` based on the
186 period of each update type in update_ids_to_update_secs.
187 If this argument is non-None, it overrides this computation
188 and uses this period as the sleep in the background thread.
190 from pyutils import math_utils
192 super().__init__(update_ids_to_update_secs)
193 if override_sleep_delay is not None:
194 logger.debug('Overriding sleep delay to %.1f', override_sleep_delay)
195 self.sleep_delay = override_sleep_delay
197 periods_list = list(update_ids_to_update_secs.values())
198 self.sleep_delay = math_utils.gcd_float_sequence(periods_list)
199 logger.info('Computed sleep_delay=%.1f', self.sleep_delay)
200 (thread, stop_event) = self._pace_maker()
201 self.should_terminate = stop_event
202 self.updater_thread = thread
205 """Terminates the background thread and waits for it to tear down.
206 This may block for as long as `self.sleep_delay`.
208 logger.debug('Setting shutdown event and waiting for background thread.')
209 self.should_terminate.set()
210 self.updater_thread.join()
211 logger.debug('Background thread terminated.')
214 class WaitableAutomaticStateTracker(AutomaticStateTracker):
215 """This is an AutomaticStateTracker that exposes a wait method which
216 will block the calling thread until the state changes with an
217 optional timeout. The caller should check the return value of
218 wait; it will be true if something changed and false if the wait
219 simply timed out. If the return value is true, the instance
220 should be reset() before wait is called again.
224 detector = waitable_presence.WaitableAutomaticStateSubclass()
226 changed = detector.wait(timeout=60)
229 # Figure out what changed and react somehow
231 # Just a timeout; no need to reset. Maybe do something
232 # else before looping up into wait again.
237 update_ids_to_update_secs: Dict[str, float],
239 override_sleep_delay: Optional[float] = None,
241 """Construct an WaitableAutomaticStateTracker.
244 update_ids_to_update_secs: a dict mapping a user-defined
245 update_id into a period (number of seconds) with which
246 we would like this update performed. e.g.::
248 update_ids_to_update_secs = {
249 'refresh_local_state': 10.0,
250 'refresh_remote_state': 60.0,
253 This would indicate that every 10s we would like to
254 refresh local state whereas every 60s we'd like to
255 refresh remote state.
257 override_sleep_delay: By default, this class determines
258 how long the background thread should sleep between
259 automatic invocations to :meth:`heartbeat` based on the
260 period of each update type in update_ids_to_update_secs.
261 If this argument is non-None, it overrides this computation
262 and uses this period as the sleep in the background thread.
264 self._something_changed = threading.Event()
266 update_ids_to_update_secs, override_sleep_delay=override_sleep_delay
269 def something_changed(self):
270 """Indicate that something has changed."""
271 self._something_changed.set()
273 def did_something_change(self) -> bool:
274 """Indicate whether some state has changed in the background."""
275 return self._something_changed.is_set()
278 """Call to clear the 'something changed' bit. See usage above."""
279 self._something_changed.clear()
281 def wait(self, *, timeout=None):
282 """Blocking wait for something to change or a timeout to lapse.
285 timeout: maximum amount of time to wait. If None, wait
286 forever (until something changes or shutdown).
288 return self._something_changed.wait(timeout=timeout)