Reduce import scopes, remove cycles.
[python_utils.git] / state_tracker.py
1 #!/usr/bin/env python3
2
3 from abc import ABC, abstractmethod
4 import datetime
5 import logging
6 import time
7 from typing import Dict, Optional
8
9 import pytz
10
11 from thread_utils import background_thread
12
13 logger = logging.getLogger(__name__)
14
15
16 class StateTracker(ABC):
17     """A base class that maintains and updates a global state via an
18     update routine.  Instances of this class should be periodically
19     invoked via the heartbeat() method.  This method, in turn, invokes
20     update() with update_ids according to a schedule / periodicity
21     provided to the c'tor.
22     """
23
24     def __init__(self, update_ids_to_update_secs: Dict[str, float]) -> None:
25         """The update_ids_to_update_secs dict parameter describes one or more
26         update types (unique update_ids) and the periodicity(ies), in
27         seconds, at which it/they should be invoked.
28
29         Note that, when more than one update is overdue, they will be
30         invoked in order by their update_ids so care in choosing these
31         identifiers may be in order.
32         """
33         self.update_ids_to_update_secs = update_ids_to_update_secs
34         self.last_reminder_ts: Dict[str, Optional[datetime.datetime]] = {}
35         for x in update_ids_to_update_secs.keys():
36             self.last_reminder_ts[x] = None
37
38     @abstractmethod
39     def update(
40         self,
41         update_id: str,
42         now: datetime.datetime,
43         last_invocation: Optional[datetime.datetime],
44     ) -> None:
45         """Put whatever you want here.  The update_id will be the string
46         passed to the c'tor as a key in the Dict.  It will only be
47         tapped on the shoulder, at most, every update_secs seconds.
48         The now param is the approximate current timestamp and the
49         last_invocation param is the last time you were invoked (or
50         None on the first invocation)
51         """
52         pass
53
54     def heartbeat(self, *, force_all_updates_to_run: bool = False) -> None:
55         """Invoke this method to cause the StateTracker instance to identify
56         and invoke any overdue updates based on the schedule passed to
57         the c'tor.  In the base StateTracker class, this method must
58         be invoked manually with a thread from external code.
59
60         If more than one type of update (update_id) are overdue,
61         they will be invoked in order based on their update_ids.
62
63         Setting force_all_updates_to_run will invoke all updates
64         (ordered by update_id) immediately ignoring whether or not
65         they are due.
66         """
67         self.now = datetime.datetime.now(tz=pytz.timezone("US/Pacific"))
68         for update_id in sorted(self.last_reminder_ts.keys()):
69             if force_all_updates_to_run:
70                 logger.debug('Forcing all updates to run')
71                 self.update(
72                     update_id, self.now, self.last_reminder_ts[update_id]
73                 )
74                 self.last_reminder_ts[update_id] = self.now
75                 return
76
77             refresh_secs = self.update_ids_to_update_secs[update_id]
78             last_run = self.last_reminder_ts[update_id]
79             if last_run is None:  # Never run before
80                 logger.debug(
81                     f'id {update_id} has never been run; running it now'
82                 )
83                 self.update(
84                     update_id, self.now, self.last_reminder_ts[update_id]
85                 )
86                 self.last_reminder_ts[update_id] = self.now
87             else:
88                 delta = self.now - last_run
89                 if delta.total_seconds() >= refresh_secs:  # Is overdue?
90                     logger.debug(f'id {update_id} is overdue; running it now')
91                     self.update(
92                         update_id,
93                         self.now,
94                         self.last_reminder_ts[update_id],
95                     )
96                     self.last_reminder_ts[update_id] = self.now
97
98
99 class AutomaticStateTracker(StateTracker):
100     """Just like HeartbeatCurrentState but you don't need to pump the
101     heartbeat; it runs on a background thread.  Call .shutdown() to
102     terminate the updates.
103     """
104
105     @background_thread
106     def pace_maker(self, should_terminate) -> None:
107         """Entry point for a background thread to own calling heartbeat()
108         at regular intervals so that the main thread doesn't need to do
109         so."""
110         while True:
111             if should_terminate.is_set():
112                 logger.debug('pace_maker noticed event; shutting down')
113                 return
114             self.heartbeat()
115             logger.debug(f'page_maker is sleeping for {self.sleep_delay}s')
116             time.sleep(self.sleep_delay)
117
118     def __init__(
119         self,
120         update_ids_to_update_secs: Dict[str, float],
121         *,
122         override_sleep_delay: Optional[float] = None,
123     ) -> None:
124         import math_utils
125         super().__init__(update_ids_to_update_secs)
126         if override_sleep_delay is not None:
127             logger.debug(f'Overriding sleep delay to {override_sleep_delay}')
128             self.sleep_delay = override_sleep_delay
129         else:
130             periods_list = list(update_ids_to_update_secs.values())
131             self.sleep_delay = math_utils.gcd_float_sequence(periods_list)
132             logger.info(f'Computed sleep_delay={self.sleep_delay}')
133         (thread, stop_event) = self.pace_maker()
134         self.should_terminate = stop_event
135         self.updater_thread = thread
136
137     def shutdown(self):
138         """Terminates the background thread and waits for it to tear down.
139         This may block for as long as self.sleep_delay.
140         """
141         logger.debug(
142             'Setting shutdown event and waiting for background thread.'
143         )
144         self.should_terminate.set()
145         self.updater_thread.join()
146         logger.debug('Background thread terminated.')