Code cleanup for run_test.py
[python_utils.git] / waitable_presence.py
1 #!/usr/bin/env python3
2
3 # © Copyright 2021-2022, Scott Gasch
4
5 """A PresenceDetector that is waitable.  This is not part of
6 base_presence.py because I do not want to bring these dependencies
7 into that lower-level module (especially state_tracker).
8 """
9
10 import datetime
11 import logging
12 from typing import Optional, Tuple
13
14 from overrides import overrides
15
16 import base_presence
17 import site_config
18 import state_tracker
19 from type.locations import Location
20
21 logger = logging.getLogger(__name__)
22
23
24 class WaitablePresenceDetectorWithMemory(state_tracker.WaitableAutomaticStateTracker):
25     """
26     This is a waitable class that keeps a PresenceDetector internally
27     and periodically polls it to detect changes in presence in a
28     particular location.  Example suggested usage pattern::
29
30         detector = waitable_presence.WaitablePresenceDetectorWithMemory(60.0)
31         while True:
32             changed = detector.wait(timeout=60 * 5)  # or, None for "forever"
33             (someone_is_home, since) = detector.is_someone_home()
34             if changed:
35                 detector.reset()
36             logger.debug(
37                 f'someone_is_home={someone_is_home}, since={since}, changed={changed}'
38             )
39     """
40
41     def __init__(
42         self,
43         override_update_interval_sec: float = 60.0,
44         override_location: Location = site_config.get_location(),
45         injected_presence_detector: Optional[base_presence.PresenceDetection] = None,
46     ) -> None:
47         self.last_someone_is_home: Optional[bool] = None
48         self.someone_is_home: Optional[bool] = None
49         self.everyone_gone_since: Optional[datetime.datetime] = None
50         self.someone_home_since: Optional[datetime.datetime] = None
51         self.location = override_location
52         if injected_presence_detector is not None:
53             self.detector: base_presence.PresenceDetection = injected_presence_detector
54         else:
55             self.detector = base_presence.PresenceDetection()
56         super().__init__(
57             {
58                 'poll_presence': override_update_interval_sec,
59                 'check_detector': override_update_interval_sec * 5,
60             }
61         )
62
63     @overrides
64     def update(
65         self,
66         update_id: str,
67         now: datetime.datetime,
68         last_invocation: Optional[datetime.datetime],
69     ) -> None:
70         if update_id == 'poll_presence':
71             self.poll_presence(now)
72         elif update_id == 'check_detector':
73             self.check_detector()
74         else:
75             raise Exception(f'Unknown update type {update_id} in {__file__}')
76
77     def poll_presence(self, now: datetime.datetime) -> None:
78         logger.debug('Checking presence in %s now...', self.location)
79         self.detector.update()
80         if self.detector.is_anyone_in_location_now(self.location):
81             self.someone_is_home = True
82             self.someone_home_since = now
83         else:
84             self.someone_is_home = False
85             self.everyone_gone_since = now
86         if self.someone_is_home != self.last_someone_is_home:
87             self.something_changed()
88             self.last_someone_is_home = self.someone_is_home
89
90     def check_detector(self) -> None:
91         if len(self.detector.dark_locations) > 0:
92             logger.debug('PresenceDetector is incomplete; trying to reinitialize...')
93             self.detector = base_presence.PresenceDetection()
94
95     def is_someone_home(self) -> Optional[Tuple[bool, datetime.datetime]]:
96         """Returns a tuple of a bool that indicates whether someone is home
97         and a datetime that indicates how long either someone has been
98         home or no one has been home.
99
100         """
101         if self.someone_is_home is None:
102             return None  # checked too soon, wait a bit.
103         if self.someone_is_home:
104             assert self.someone_home_since is not None
105             return (True, self.someone_home_since)
106         else:
107             assert self.everyone_gone_since is not None
108             return (False, self.everyone_gone_since)