f774dbce270dc0b9c13c409ca9019d3a8e921166
[python_utils.git] / base_presence.py
1 #!/usr/bin/env python3
2
3 import datetime
4 from collections import defaultdict
5 import logging
6 import re
7 from typing import Dict, List, Set
8 import warnings
9
10 # Note: this module is fairly early loaded.  Be aware of dependencies.
11 import argparse_utils
12 import bootstrap
13 import config
14 from type.locations import Location
15 from type.people import Person
16 import site_config
17
18
19 logger = logging.getLogger(__name__)
20
21 cfg = config.add_commandline_args(
22     f"Presence Detection ({__file__})",
23     "Args related to detection of human beings in locations.",
24 )
25 cfg.add_argument(
26     "--presence_macs_file",
27     type=argparse_utils.valid_filename,
28     default="/home/scott/cron/persisted_mac_addresses.txt",
29     metavar="FILENAME",
30     help="The location of persisted_mac_addresses.txt to use.",
31 )
32 cfg.add_argument(
33     '--presence_tolerable_staleness_seconds',
34     type=argparse_utils.valid_duration,
35     default=datetime.timedelta(seconds=60 * 5),
36     metavar='DURATION',
37     help='Max acceptable age of location data before auto-refreshing',
38 )
39
40
41 class PresenceDetection(object):
42     def __init__(self) -> None:
43         # Note: list most important devices first.
44         self.devices_by_person: Dict[Person, List[str]] = {
45             Person.SCOTT: [
46                 "DC:E5:5B:0F:03:3D",  # pixel6
47                 "6C:40:08:AE:DC:2E",  # laptop
48             ],
49             Person.LYNN: [
50                 "08:CC:27:63:26:14",  # motog7
51                 "B8:31:B5:9A:4F:19",  # laptop
52             ],
53             Person.ALEX: [
54                 "0C:CB:85:0C:8B:AE",  # phone
55                 "D0:C6:37:E3:36:9A",  # laptop
56             ],
57             Person.AARON_AND_DANA: [
58                 "98:B6:E9:E5:5A:7C",
59                 "D6:2F:37:CA:B2:9B",
60                 "6C:E8:5C:ED:17:26",
61                 "90:E1:7B:13:7C:E5",
62                 "6E:DC:7C:75:02:1B",
63                 "B2:16:1A:93:7D:50",
64                 "18:65:90:DA:3A:35",
65                 "22:28:C8:7D:3C:85",
66                 "B2:95:23:69:91:F8",
67                 "96:69:2C:88:7A:C3",
68             ],
69         }
70         self.run_location = site_config.get_location()
71         logger.debug(f"run_location is {self.run_location}")
72         self.weird_mac_at_cabin = False
73         self.location_ts_by_mac: Dict[
74             Location, Dict[str, datetime.datetime]
75         ] = defaultdict(dict)
76         self.names_by_mac: Dict[str, str] = {}
77         self.dark_locations: Set[Location] = set()
78         self.last_update = None
79
80     def maybe_update(self) -> None:
81         if self.last_update is None:
82             self.update()
83         else:
84             now = datetime.datetime.now()
85             delta = now - self.last_update
86             if (
87                 delta.total_seconds()
88                 > config.config[
89                     'presence_tolerable_staleness_seconds'
90                 ].total_seconds()
91             ):
92                 logger.debug(
93                     f"It's been {delta.total_seconds()}s since last update; refreshing now."
94                 )
95                 self.update()
96
97     def update(self) -> None:
98         self.dark_locations = set()
99         if self.run_location is Location.HOUSE:
100             self.update_from_house()
101         elif self.run_location is Location.CABIN:
102             self.update_from_cabin()
103         else:
104             raise Exception("Where the hell is this running?!")
105         self.last_update = datetime.datetime.now()
106
107     def update_from_house(self) -> None:
108         from exec_utils import cmd
109
110         try:
111             persisted_macs = config.config['presence_macs_file']
112         except KeyError:
113             persisted_macs = '/home/scott/cron/persisted_mac_addresses.txt'
114         self.read_persisted_macs_file(persisted_macs, Location.HOUSE)
115         try:
116             raw = cmd(
117                 "ssh [email protected] 'cat /home/scott/cron/persisted_mac_addresses.txt'",
118                 timeout_seconds=10.0,
119             )
120             self.parse_raw_macs_file(raw, Location.CABIN)
121         except Exception as e:
122             logger.exception(e)
123             msg = "Can't see the cabin right now; presence detection impared."
124             warnings.warn(msg)
125             logger.warning(msg, stacklevel=2)
126             self.dark_locations.add(Location.CABIN)
127
128     def update_from_cabin(self) -> None:
129         from exec_utils import cmd
130
131         try:
132             persisted_macs = config.config['presence_macs_file']
133         except KeyError:
134             persisted_macs = '/home/scott/cron/persisted_mac_addresses.txt'
135         self.read_persisted_macs_file(persisted_macs, Location.CABIN)
136         try:
137             raw = cmd(
138                 "ssh [email protected] 'cat /home/scott/cron/persisted_mac_addresses.txt'",
139                 timeout_seconds=10.0,
140             )
141             self.parse_raw_macs_file(raw, Location.HOUSE)
142         except Exception as e:
143             logger.exception(e)
144             msg = "Can't see the house right now; presence detection impared."
145             logger.warning(msg)
146             warnings.warn(msg, stacklevel=2)
147             self.dark_locations.add(Location.HOUSE)
148
149     def read_persisted_macs_file(
150         self, filename: str, location: Location
151     ) -> None:
152         if location is Location.UNKNOWN:
153             return
154         with open(filename, "r") as rf:
155             lines = rf.read()
156         self.parse_raw_macs_file(lines, location)
157
158     def parse_raw_macs_file(self, raw: str, location: Location) -> None:
159         lines = raw.split("\n")
160
161         # CC:F4:11:D7:FA:EE, 2240, 10.0.0.22 (side_deck_high_home), Google, 1611681990
162         cabin_count = 0
163         for line in lines:
164             line = line.strip()
165             if len(line) == 0:
166                 continue
167             logger.debug(f'{location}> {line}')
168             if "cabin_" in line:
169                 continue
170             if location == Location.CABIN:
171                 logger.debug(f'Cabin count: {cabin_count}')
172                 cabin_count += 1
173             try:
174                 (mac, count, ip_name, mfg, ts) = line.split(",")
175             except Exception as e:
176                 logger.error(f'SKIPPED BAD LINE> {line}')
177                 logger.exception(e)
178                 continue
179             mac = mac.strip()
180             (self.location_ts_by_mac[location])[
181                 mac
182             ] = datetime.datetime.fromtimestamp(int(ts.strip()))
183             ip_name = ip_name.strip()
184             match = re.match(r"(\d+\.\d+\.\d+\.\d+) +\(([^\)]+)\)", ip_name)
185             if match is not None:
186                 name = match.group(2)
187                 self.names_by_mac[mac] = name
188         if cabin_count > 0:
189             logger.debug('Weird MAC at the cabin')
190             self.weird_mac_at_cabin = True
191
192     def is_anyone_in_location_now(self, location: Location) -> bool:
193         self.maybe_update()
194         if location in self.dark_locations:
195             raise Exception(
196                 f"Can't see {location} right now; answer undefined."
197             )
198         for person in Person:
199             if person is not None:
200                 loc = self.where_is_person_now(person)
201                 if location == loc:
202                     return True
203         if location == location.CABIN and self.weird_mac_at_cabin:
204             return True
205         return False
206
207     def where_is_person_now(self, name: Person) -> Location:
208         self.maybe_update()
209         if len(self.dark_locations) > 0:
210             msg = f"Can't see {self.dark_locations} right now; answer confidence impacted"
211             logger.warning(msg)
212             warnings.warn(msg, stacklevel=2)
213         logger.debug(f'Looking for {name}...')
214
215         if name is Person.UNKNOWN:
216             if self.weird_mac_at_cabin:
217                 return Location.CABIN
218             else:
219                 return Location.UNKNOWN
220
221         import dict_utils
222
223         votes: Dict[Location, int] = {}
224         tiebreaks: Dict[Location, datetime.datetime] = {}
225         credit = 10000
226         for mac in self.devices_by_person[name]:
227             if mac not in self.names_by_mac:
228                 continue
229             mac_name = self.names_by_mac[mac]
230             logger.debug(
231                 f'Looking for {name}... check for mac {mac} ({mac_name})'
232             )
233             for location in self.location_ts_by_mac:
234                 if mac in self.location_ts_by_mac[location]:
235                     ts = (self.location_ts_by_mac[location])[mac]
236                     logger.debug(
237                         f'Seen {mac} ({mac_name}) at {location} since {ts}'
238                     )
239                     tiebreaks[location] = ts
240
241             (
242                 most_recent_location,
243                 first_seen_ts,
244             ) = dict_utils.item_with_max_value(tiebreaks)
245             bonus = credit
246             v = votes.get(most_recent_location, 0)
247             votes[most_recent_location] = v + bonus
248             logger.debug(f'{name}: {location} gets {bonus} votes.')
249             credit = int(
250                 credit * 0.2
251             )  # Note: list most important devices first
252             if credit <= 0:
253                 credit = 1
254         if len(votes) > 0:
255             (location, value) = dict_utils.item_with_max_value(votes)
256             if value > 2001:
257                 return location
258         return Location.UNKNOWN
259
260
261 @bootstrap.initialize
262 def main() -> None:
263     p = PresenceDetection()
264     for person in Person:
265         print(f'{person} => {p.where_is_person_now(person)}')
266     print()
267
268
269 #    for location in Location:
270 #        print(f'{location} => {p.is_anyone_in_location_now(location)}')
271
272
273 if __name__ == '__main__':
274     main()