More documentation improvements.
[pyutils.git] / src / pyutils / state_tracker.py
1 #!/usr/bin/env python3
2
3 # © Copyright 2021-2022, Scott Gasch
4
5 """This module defines several classes (:py:class:`StateTracker`,
6 :py:class:`AutomaticStateTracker`, and
7 :py:class:`WaitableAutomaticStateTracker`) that can be used as base
8 classes by your code.  These class patterns are meant to encapsulate
9 and represent some state that dynamically changes and must be updated
10 periodically.  These classes update their state (either automatically
11 or when invoked to poll) and allow their callers to wait on state
12 changes.
13
14 See also :class:`pyutils.parallelize.thread_utils.periodically_invoke`
15 """
16
17 import datetime
18 import logging
19 import threading
20 import time
21 from abc import ABC, abstractmethod
22 from typing import Dict, Optional
23
24 import pytz
25
26 from pyutils.parallelize.thread_utils import background_thread
27
28 logger = logging.getLogger(__name__)
29
30
31 class StateTracker(ABC):
32     """A base class that maintains and updates its state via an update
33     routine called :meth:`heartbeat`.  This method is not automatic:
34     instances of this class should be periodically invoked via their
35     :meth:`heartbeat` method by some other thread.
36
37     See also :class:`AutomaticStateTracker` if you'd rather not have
38     to invoke your code regularly.
39     """
40
41     def __init__(self, update_ids_to_update_secs: Dict[str, float]) -> None:
42         """The update_ids_to_update_secs dict parameter describes one
43         or more update types (unique update_ids) and the
44         periodicity(ies), in seconds, at which it/they should be
45         invoked.
46
47         .. note::
48             When more than one update is overdue, they will be
49             invoked in order by their update_ids so care in choosing these
50             identifiers may be in order.
51
52         Args:
53             update_ids_to_update_secs: a dict mapping a user-defined
54                 update_id into a period (number of seconds) with which
55                 we would like this update performed.  e.g.::
56
57                     update_ids_to_update_secs = {
58                         'refresh_local_state': 10.0,
59                         'refresh_remote_state': 60.0,
60                     }
61
62                 This would indicate that every 10s we would like to
63                 refresh local state whereas every 60s we'd like to
64                 refresh remote state.
65
66         """
67         self.update_ids_to_update_secs = update_ids_to_update_secs
68         self.last_reminder_ts: Dict[str, Optional[datetime.datetime]] = {}
69         self.now: Optional[datetime.datetime] = None
70         for x in update_ids_to_update_secs.keys():
71             self.last_reminder_ts[x] = None
72
73     @abstractmethod
74     def update(
75         self,
76         update_id: str,
77         now: datetime.datetime,
78         last_invocation: Optional[datetime.datetime],
79     ) -> None:
80         """Put whatever you want here to perform your state updates.
81
82         Args:
83             update_id: the string you passed to the c'tor as a key in
84                 the update_ids_to_update_secs dict.  :meth:`update` will
85                 only be invoked, at most, every update_secs
86                 seconds.
87
88             now: the approximate current timestamp at invocation time.
89
90             last_invocation: the last time this operation was invoked
91                 (or None on the first invocation).
92         """
93         pass
94
95     def heartbeat(self, *, force_all_updates_to_run: bool = False) -> None:
96         """Invoke this method periodically to cause the :class:`StateTracker`
97         instance to identify and invoke any overdue updates based on the
98         schedule passed to the c'tor.  In the base :class:`StateTracker` class,
99         this method must be invoked manually by a thread from external code.
100         Other subclasses (e.g. :class:`AutomaticStateTracker`) are available
101         that create their own updater threads (see below).
102
103         If more than one type of update (`update_id`) is overdue,
104         overdue updates will be invoked in order based on their `update_id`.
105
106         Setting `force_all_updates_to_run` will invoke all updates
107         (ordered by `update_id`) immediately ignoring whether or not
108         they are due.
109         """
110
111         self.now = datetime.datetime.now(tz=pytz.timezone("US/Pacific"))
112         for update_id in sorted(self.last_reminder_ts.keys()):
113             if force_all_updates_to_run:
114                 logger.debug('Forcing all updates to run')
115                 self.update(update_id, self.now, self.last_reminder_ts[update_id])
116                 self.last_reminder_ts[update_id] = self.now
117                 return
118
119             refresh_secs = self.update_ids_to_update_secs[update_id]
120             last_run = self.last_reminder_ts[update_id]
121             if last_run is None:  # Never run before
122                 logger.debug('id %s has never been run; running it now', update_id)
123                 self.update(update_id, self.now, self.last_reminder_ts[update_id])
124                 self.last_reminder_ts[update_id] = self.now
125             else:
126                 delta = self.now - last_run
127                 if delta.total_seconds() >= refresh_secs:  # Is overdue?
128                     logger.debug('id %s is overdue; running it now', update_id)
129                     self.update(
130                         update_id,
131                         self.now,
132                         self.last_reminder_ts[update_id],
133                     )
134                     self.last_reminder_ts[update_id] = self.now
135
136
137 class AutomaticStateTracker(StateTracker):
138     """Just like :class:`StateTracker` but you don't need to pump the
139     :meth:`heartbeat` method periodically because we create a background
140     thread that manages periodic calling.  You must call :meth:`shutdown`,
141     though, in order to terminate the update thread.
142     """
143
144     @background_thread
145     def _pace_maker(self, should_terminate: threading.Event) -> None:
146         """Entry point for a background thread to own calling :meth:`heartbeat`
147         at regular intervals so that the main thread doesn't need to
148         do so.
149
150         Args:
151             should_terminate: an event which, when set, indicates we should terminate.
152         """
153         while True:
154             if should_terminate.is_set():
155                 logger.debug('_pace_maker noticed event; shutting down')
156                 return
157             self.heartbeat()
158             logger.debug('_pace_maker is sleeping for %.1fs', self.sleep_delay)
159             time.sleep(self.sleep_delay)
160
161     def __init__(
162         self,
163         update_ids_to_update_secs: Dict[str, float],
164         *,
165         override_sleep_delay: Optional[float] = None,
166     ) -> None:
167         """Construct an AutomaticStateTracker.
168
169         Args:
170             update_ids_to_update_secs: a dict mapping a user-defined
171                 update_id into a period (number of seconds) with which
172                 we would like this update performed.  e.g.::
173
174                     update_ids_to_update_secs = {
175                         'refresh_local_state': 10.0,
176                         'refresh_remote_state': 60.0,
177                     }
178
179                 This would indicate that every 10s we would like to
180                 refresh local state whereas every 60s we'd like to
181                 refresh remote state.
182
183             override_sleep_delay: By default, this class determines
184                 how long the background thread should sleep between
185                 automatic invocations to :meth:`heartbeat` based on the
186                 period of each update type in update_ids_to_update_secs.
187                 If this argument is non-None, it overrides this computation
188                 and uses this period as the sleep in the background thread.
189         """
190         from pyutils import math_utils
191
192         super().__init__(update_ids_to_update_secs)
193         if override_sleep_delay is not None:
194             logger.debug('Overriding sleep delay to %.1f', override_sleep_delay)
195             self.sleep_delay = override_sleep_delay
196         else:
197             periods_list = list(update_ids_to_update_secs.values())
198             self.sleep_delay = math_utils.gcd_float_sequence(periods_list)
199             logger.info('Computed sleep_delay=%.1f', self.sleep_delay)
200         (thread, stop_event) = self._pace_maker()
201         self.should_terminate = stop_event
202         self.updater_thread = thread
203
204     def shutdown(self):
205         """Terminates the background thread and waits for it to tear down.
206         This may block for as long as `self.sleep_delay`.
207         """
208         logger.debug('Setting shutdown event and waiting for background thread.')
209         self.should_terminate.set()
210         self.updater_thread.join()
211         logger.debug('Background thread terminated.')
212
213
214 class WaitableAutomaticStateTracker(AutomaticStateTracker):
215     """This is an AutomaticStateTracker that exposes a wait method which
216     will block the calling thread until the state changes with an
217     optional timeout.  The caller should check the return value of
218     wait; it will be true if something changed and false if the wait
219     simply timed out.  If the return value is true, the instance
220     should be reset() before wait is called again.
221
222     Example usage::
223
224         detector = waitable_presence.WaitableAutomaticStateSubclass()
225         while True:
226             changed = detector.wait(timeout=60)
227             if changed:
228                 detector.reset()
229                 # Figure out what changed and react somehow
230             else:
231                 # Just a timeout; no need to reset.  Maybe do something
232                 # else before looping up into wait again.
233     """
234
235     def __init__(
236         self,
237         update_ids_to_update_secs: Dict[str, float],
238         *,
239         override_sleep_delay: Optional[float] = None,
240     ) -> None:
241         """Construct an WaitableAutomaticStateTracker.
242
243         Args:
244             update_ids_to_update_secs: a dict mapping a user-defined
245                 update_id into a period (number of seconds) with which
246                 we would like this update performed.  e.g.::
247
248                     update_ids_to_update_secs = {
249                         'refresh_local_state': 10.0,
250                         'refresh_remote_state': 60.0,
251                     }
252
253                 This would indicate that every 10s we would like to
254                 refresh local state whereas every 60s we'd like to
255                 refresh remote state.
256
257             override_sleep_delay: By default, this class determines
258                 how long the background thread should sleep between
259                 automatic invocations to :meth:`heartbeat` based on the
260                 period of each update type in update_ids_to_update_secs.
261                 If this argument is non-None, it overrides this computation
262                 and uses this period as the sleep in the background thread.
263         """
264         self._something_changed = threading.Event()
265         super().__init__(
266             update_ids_to_update_secs, override_sleep_delay=override_sleep_delay
267         )
268
269     def something_changed(self):
270         """Indicate that something has changed."""
271         self._something_changed.set()
272
273     def did_something_change(self) -> bool:
274         """Indicate whether some state has changed in the background."""
275         return self._something_changed.is_set()
276
277     def reset(self):
278         """Call to clear the 'something changed' bit.  See usage above."""
279         self._something_changed.clear()
280
281     def wait(self, *, timeout=None):
282         """Blocking wait for something to change or a timeout to lapse.
283
284         Args:
285             timeout: maximum amount of time to wait.  If None, wait
286                 forever (until something changes or shutdown).
287         """
288         return self._something_changed.wait(timeout=timeout)