9e0a9d0180663ad2337d7a6dc9ff42ccc677afb5
[python_utils.git] / waitable_presence.py
1 #!/usr/bin/env python3
2
3 """A PresenceDetector that is waitable.  This is not part of
4 base_presence.py because I do not want to bring these dependencies
5 into that lower-level module (especially state_tracker).
6
7 """
8
9 import datetime
10 import logging
11 from typing import Optional, Tuple
12
13 from overrides import overrides
14
15 import base_presence
16 from type.locations import Location
17 import site_config
18 import state_tracker
19
20 logger = logging.getLogger(__name__)
21
22
23 class WaitablePresenceDetectorWithMemory(state_tracker.WaitableAutomaticStateTracker):
24     """
25     This is a waitable class that keeps a PresenceDetector internally
26     and periodically polls it to detect changes in presence in a
27     particular location.  Example suggested usage pattern:
28
29         detector = waitable_presence.WaitablePresenceDetectorWithMemory(60.0)
30         while True:
31             changed = detector.wait(timeout=60 * 5)  # or, None for "forever"
32             (someone_is_home, since) = detector.is_someone_home()
33             if changed:
34                 detector.reset()
35             logger.debug(
36                 f'someone_is_home={someone_is_home}, since={since}, changed={changed}'
37             )
38     """
39
40     def __init__(
41             self,
42             override_update_interval_sec: float = 60.0,
43             override_location: Location = site_config.get_location(),
44     ) -> None:
45         self.last_someone_is_home: Optional[bool] = None
46         self.someone_is_home: Optional[bool] = None
47         self.everyone_gone_since: Optional[datetime.datetime] = None
48         self.someone_home_since: Optional[datetime.datetime] = None
49         self.location = override_location
50         self.detector: base_presence.PresenceDetection = base_presence.PresenceDetection()
51         super().__init__(
52             {
53                 'poll_presence': override_update_interval_sec,
54                 'check_detector': override_update_interval_sec * 5,
55             }
56         )
57
58     @overrides
59     def update(
60         self,
61         update_id: str,
62         now: datetime.datetime,
63         last_invocation: Optional[datetime.datetime],
64     ) -> None:
65         if update_id == 'poll_presence':
66             self.poll_presence(now)
67         elif update_id == 'check_detector':
68             self.check_detector()
69         else:
70             raise Exception(f'Unknown update type {update_id} in {__file__}')
71
72     def poll_presence(self, now: datetime.datetime) -> None:
73         logger.debug(f'Checking presence in {self.location} now...')
74         self.detector.update()
75         if self.detector.is_anyone_in_location_now(self.location):
76             self.someone_is_home = True
77             self.someone_home_since = now
78         else:
79             self.someone_is_home = False
80             self.everyone_gone_since = now
81         if self.someone_is_home != self.last_someone_is_home:
82             self.something_changed()
83             self.last_someone_is_home = self.someone_is_home
84
85     def check_detector(self) -> None:
86         if len(self.detector.dark_locations) > 0:
87             logger.debug('PresenceDetector is incomplete; trying to reinitialize...')
88             self.detector = base_presence.PresenceDetection()
89
90     def is_someone_home(self) -> Tuple[bool, datetime.datetime]:
91         """Returns a tuple of a bool that indicates whether someone is home
92         and a datetime that indicates how long either someone has been
93         home or no one has been home.
94
95         """
96         if self.someone_is_home is None:
97             raise Exception("Too Soon!")
98         if self.someone_is_home:
99             return (True, self.someone_home_since)
100         else:
101             return (False, self.everyone_gone_since)