9ce61e3ac7a51c7cacdc2cc7e34dea02985078de
[python_utils.git] / state_tracker.py
1 #!/usr/bin/env python3
2
3 from abc import ABC, abstractmethod
4 import datetime
5 import logging
6 import time
7 import threading
8 from typing import Dict, Optional
9
10 import pytz
11
12 from thread_utils import background_thread
13
14 logger = logging.getLogger(__name__)
15
16
17 class StateTracker(ABC):
18     """A base class that maintains and updates a global state via an
19     update routine.  Instances of this class should be periodically
20     invoked via the heartbeat() method.  This method, in turn, invokes
21     update() with update_ids according to a schedule / periodicity
22     provided to the c'tor.
23
24     """
25     def __init__(self, update_ids_to_update_secs: Dict[str, float]) -> None:
26         """The update_ids_to_update_secs dict parameter describes one or more
27         update types (unique update_ids) and the periodicity(ies), in
28         seconds, at which it/they should be invoked.
29
30         Note that, when more than one update is overdue, they will be
31         invoked in order by their update_ids so care in choosing these
32         identifiers may be in order.
33
34         """
35         self.update_ids_to_update_secs = update_ids_to_update_secs
36         self.last_reminder_ts: Dict[str, Optional[datetime.datetime]] = {}
37         for x in update_ids_to_update_secs.keys():
38             self.last_reminder_ts[x] = None
39
40     @abstractmethod
41     def update(
42         self,
43         update_id: str,
44         now: datetime.datetime,
45         last_invocation: Optional[datetime.datetime],
46     ) -> None:
47         """Put whatever you want here.  The update_id will be the string
48         passed to the c'tor as a key in the Dict.  It will only be
49         tapped on the shoulder, at most, every update_secs seconds.
50         The now param is the approximate current timestamp and the
51         last_invocation param is the last time you were invoked (or
52         None on the first invocation)
53
54         """
55         pass
56
57     def heartbeat(self, *, force_all_updates_to_run: bool = False) -> None:
58         """Invoke this method to cause the StateTracker instance to identify
59         and invoke any overdue updates based on the schedule passed to
60         the c'tor.  In the base StateTracker class, this method must
61         be invoked manually with a thread from external code.
62
63         If more than one type of update (update_id) are overdue,
64         they will be invoked in order based on their update_ids.
65
66         Setting force_all_updates_to_run will invoke all updates
67         (ordered by update_id) immediately ignoring whether or not
68         they are due.
69
70         """
71         self.now = datetime.datetime.now(tz=pytz.timezone("US/Pacific"))
72         for update_id in sorted(self.last_reminder_ts.keys()):
73             if force_all_updates_to_run:
74                 logger.debug('Forcing all updates to run')
75                 self.update(
76                     update_id, self.now, self.last_reminder_ts[update_id]
77                 )
78                 self.last_reminder_ts[update_id] = self.now
79                 return
80
81             refresh_secs = self.update_ids_to_update_secs[update_id]
82             last_run = self.last_reminder_ts[update_id]
83             if last_run is None:  # Never run before
84                 logger.debug(
85                     f'id {update_id} has never been run; running it now'
86                 )
87                 self.update(
88                     update_id, self.now, self.last_reminder_ts[update_id]
89                 )
90                 self.last_reminder_ts[update_id] = self.now
91             else:
92                 delta = self.now - last_run
93                 if delta.total_seconds() >= refresh_secs:  # Is overdue?
94                     logger.debug(f'id {update_id} is overdue; running it now')
95                     self.update(
96                         update_id,
97                         self.now,
98                         self.last_reminder_ts[update_id],
99                     )
100                     self.last_reminder_ts[update_id] = self.now
101
102
103 class AutomaticStateTracker(StateTracker):
104     """Just like HeartbeatCurrentState but you don't need to pump the
105     heartbeat; it runs on a background thread.  Call .shutdown() to
106     terminate the updates.
107
108     """
109     @background_thread
110     def pace_maker(self, should_terminate) -> None:
111         """Entry point for a background thread to own calling heartbeat()
112         at regular intervals so that the main thread doesn't need to do
113         so.
114
115         """
116         while True:
117             if should_terminate.is_set():
118                 logger.debug('pace_maker noticed event; shutting down')
119                 return
120             self.heartbeat()
121             logger.debug(f'pace_maker is sleeping for {self.sleep_delay}s')
122             time.sleep(self.sleep_delay)
123
124     def __init__(
125         self,
126         update_ids_to_update_secs: Dict[str, float],
127         *,
128         override_sleep_delay: Optional[float] = None,
129     ) -> None:
130         import math_utils
131         super().__init__(update_ids_to_update_secs)
132         if override_sleep_delay is not None:
133             logger.debug(f'Overriding sleep delay to {override_sleep_delay}')
134             self.sleep_delay = override_sleep_delay
135         else:
136             periods_list = list(update_ids_to_update_secs.values())
137             self.sleep_delay = math_utils.gcd_float_sequence(periods_list)
138             logger.info(f'Computed sleep_delay={self.sleep_delay}')
139         (thread, stop_event) = self.pace_maker()
140         self.should_terminate = stop_event
141         self.updater_thread = thread
142
143     def shutdown(self):
144         """Terminates the background thread and waits for it to tear down.
145         This may block for as long as self.sleep_delay.
146
147         """
148         logger.debug(
149             'Setting shutdown event and waiting for background thread.'
150         )
151         self.should_terminate.set()
152         self.updater_thread.join()
153         logger.debug('Background thread terminated.')
154
155
156 class WaitableAutomaticStateTracker(AutomaticStateTracker):
157     """This is an AutomaticStateTracker that exposes a wait method which
158     will block the calling thread until the state changes with an
159     optional timeout.  The caller should check the return value of
160     wait; it will be true if something changed and false if the wait
161     simply timed out.  If the return value is true, the instance
162     should be reset() before wait is called again.
163
164     Example usage:
165
166         detector = waitable_presence.WaitableAutomaticStateSubclass()
167         while True:
168             changed = detector.wait(timeout=60 * 5)
169             if changed:
170                 detector.reset()
171                 # Figure out what changed and react
172             else:
173                 # Just a timeout; no need to reset.  Maybe do something
174                 # else before looping up into wait again.
175
176     """
177     def __init__(
178             self,
179             update_ids_to_update_secs: Dict[str, float],
180             *,
181             override_sleep_delay: Optional[float] = None,
182     ) -> None:
183         self._something_changed = threading.Event()
184         super().__init__(update_ids_to_update_secs,
185                          override_sleep_delay=override_sleep_delay)
186
187     def something_changed(self):
188         self._something_changed.set()
189
190     def did_something_change(self) -> bool:
191         return self._something_changed.is_set()
192
193     def reset(self):
194         self._something_changed.clear()
195
196     def wait(self,
197              *,
198              timeout=None):
199         return self._something_changed.wait(
200             timeout=timeout
201         )