Add a simple test for waitable presence.
[python_utils.git] / waitable_presence.py
1 #!/usr/bin/env python3
2
3 """A PresenceDetector that is waitable.  This is not part of
4 base_presence.py because I do not want to bring these dependencies
5 into that lower-level module (especially state_tracker).
6
7 """
8
9 import datetime
10 import logging
11 from typing import Optional, Tuple
12
13 from overrides import overrides
14
15 import base_presence
16 import site_config
17 import state_tracker
18 from type.locations import Location
19
20 logger = logging.getLogger(__name__)
21
22
23 class WaitablePresenceDetectorWithMemory(state_tracker.WaitableAutomaticStateTracker):
24     """
25     This is a waitable class that keeps a PresenceDetector internally
26     and periodically polls it to detect changes in presence in a
27     particular location.  Example suggested usage pattern:
28
29         detector = waitable_presence.WaitablePresenceDetectorWithMemory(60.0)
30         while True:
31             changed = detector.wait(timeout=60 * 5)  # or, None for "forever"
32             (someone_is_home, since) = detector.is_someone_home()
33             if changed:
34                 detector.reset()
35             logger.debug(
36                 f'someone_is_home={someone_is_home}, since={since}, changed={changed}'
37             )
38     """
39
40     def __init__(
41         self,
42         override_update_interval_sec: float = 60.0,
43         override_location: Location = site_config.get_location(),
44         injected_presence_detector: Optional[base_presence.PresenceDetection] = None,
45     ) -> None:
46         self.last_someone_is_home: Optional[bool] = None
47         self.someone_is_home: Optional[bool] = None
48         self.everyone_gone_since: Optional[datetime.datetime] = None
49         self.someone_home_since: Optional[datetime.datetime] = None
50         self.location = override_location
51         if injected_presence_detector is not None:
52             self.detector: base_presence.PresenceDetection = injected_presence_detector
53         else:
54             self.detector = base_presence.PresenceDetection()
55         super().__init__(
56             {
57                 'poll_presence': override_update_interval_sec,
58                 'check_detector': override_update_interval_sec * 5,
59             }
60         )
61
62     @overrides
63     def update(
64         self,
65         update_id: str,
66         now: datetime.datetime,
67         last_invocation: Optional[datetime.datetime],
68     ) -> None:
69         if update_id == 'poll_presence':
70             self.poll_presence(now)
71         elif update_id == 'check_detector':
72             self.check_detector()
73         else:
74             raise Exception(f'Unknown update type {update_id} in {__file__}')
75
76     def poll_presence(self, now: datetime.datetime) -> None:
77         logger.debug(f'Checking presence in {self.location} now...')
78         self.detector.update()
79         if self.detector.is_anyone_in_location_now(self.location):
80             self.someone_is_home = True
81             self.someone_home_since = now
82         else:
83             self.someone_is_home = False
84             self.everyone_gone_since = now
85         if self.someone_is_home != self.last_someone_is_home:
86             self.something_changed()
87             self.last_someone_is_home = self.someone_is_home
88
89     def check_detector(self) -> None:
90         if len(self.detector.dark_locations) > 0:
91             logger.debug('PresenceDetector is incomplete; trying to reinitialize...')
92             self.detector = base_presence.PresenceDetection()
93
94     def is_someone_home(self) -> Optional[Tuple[bool, datetime.datetime]]:
95         """Returns a tuple of a bool that indicates whether someone is home
96         and a datetime that indicates how long either someone has been
97         home or no one has been home.
98
99         """
100         if self.someone_is_home is None:
101             return None  # checked too soon, wait a bit.
102         if self.someone_is_home:
103             assert self.someone_home_since is not None
104             return (True, self.someone_home_since)
105         else:
106             assert self.everyone_gone_since is not None
107             return (False, self.everyone_gone_since)