Easier and more self documenting patterns for loading/saving Persistent
[python_utils.git] / base_presence.py
1 #!/usr/bin/env python3
2
3 # © Copyright 2021-2022, Scott Gasch
4
5 """This is a module dealing with trying to guess a person's location
6 based on the location of certain devices (e.g. phones, laptops)
7 belonging to that person.  It works with networks I run that log
8 device MAC addresses active.
9
10 """
11
12 import datetime
13 import logging
14 import re
15 import warnings
16 from collections import defaultdict
17 from typing import Dict, List, Optional, Set
18
19 # Note: this module is fairly early loaded.  Be aware of dependencies.
20 import argparse_utils
21 import bootstrap
22 import config
23 import site_config
24 from type.locations import Location
25 from type.people import Person
26
27 logger = logging.getLogger(__name__)
28
29 cfg = config.add_commandline_args(
30     f"Presence Detection ({__file__})",
31     "Args related to detection of human beings in locations.",
32 )
33 cfg.add_argument(
34     "--presence_macs_file",
35     type=str,
36     default="/home/scott/cron/persisted_mac_addresses.txt",
37     metavar="FILENAME",
38     help="The location of persisted_mac_addresses.txt to use.",
39 )
40 cfg.add_argument(
41     '--presence_tolerable_staleness_seconds',
42     type=argparse_utils.valid_duration,
43     default=datetime.timedelta(seconds=60 * 5),
44     metavar='DURATION',
45     help='Max acceptable age of location data before auto-refreshing',
46 )
47
48
49 class PresenceDetection(object):
50     """This is a module dealing with trying to guess a person's location
51     based on the location of certain devices (e.g. phones, laptops)
52     belonging to that person.  It works with networks I run that log
53     device MAC addresses active.
54     """
55
56     def __init__(self) -> None:
57         """C'tor"""
58
59         # Note: list most important devices first.
60         self.devices_by_person: Dict[Person, List[str]] = {
61             Person.SCOTT: [
62                 "DC:E5:5B:0F:03:3D",  # pixel6
63                 "6C:40:08:AE:DC:2E",  # laptop
64             ],
65             Person.LYNN: [
66                 "08:CC:27:63:26:14",  # motog7
67                 "B8:31:B5:9A:4F:19",  # laptop
68             ],
69             Person.ALEX: [
70                 "0C:CB:85:0C:8B:AE",  # phone
71                 "D0:C6:37:E3:36:9A",  # laptop
72             ],
73             Person.AARON_AND_DANA: [
74                 "98:B6:E9:E5:5A:7C",
75                 "D6:2F:37:CA:B2:9B",
76                 "6C:E8:5C:ED:17:26",
77                 "90:E1:7B:13:7C:E5",
78                 "6E:DC:7C:75:02:1B",
79                 "B2:16:1A:93:7D:50",
80                 "18:65:90:DA:3A:35",
81                 "22:28:C8:7D:3C:85",
82                 "B2:95:23:69:91:F8",
83                 "96:69:2C:88:7A:C3",
84             ],
85         }
86         self.run_location = site_config.get_location()
87         logger.debug("base_presence run_location is %s", self.run_location)
88         self.weird_mac_at_cabin = False
89         self.location_ts_by_mac: Dict[Location, Dict[str, datetime.datetime]] = defaultdict(dict)
90         self.names_by_mac: Dict[str, str] = {}
91         self.dark_locations: Set[Location] = set()
92         self.last_update: Optional[datetime.datetime] = None
93
94     def maybe_update(self) -> None:
95         """Determine if our state is stale and needs to be updated and do
96         it, if so.
97         """
98
99         if self.last_update is None:
100             self.update()
101         else:
102             now = datetime.datetime.now()
103             delta = now - self.last_update
104             if (
105                 delta.total_seconds()
106                 > config.config['presence_tolerable_staleness_seconds'].total_seconds()
107             ):
108                 logger.debug(
109                     "It's been %ss since last update; refreshing now.", delta.total_seconds()
110                 )
111                 self.update()
112
113     def update(self) -> None:
114         """Unconditionally update our state."""
115
116         self.dark_locations = set()
117         self._update_house()
118         self._update_cabin()
119         self.last_update = datetime.datetime.now()
120
121     @staticmethod
122     def _get_raw_data_via_ssh(location: Location) -> Optional[str]:
123         from exec_utils import cmd
124
125         canonical = {
126             Location.HOUSE: '[email protected]',
127             Location.CABIN: '[email protected]',
128         }
129         try:
130             return cmd(
131                 f"ssh {canonical[location]} 'cat /home/scott/cron/persisted_mac_addresses.txt'",
132                 timeout_seconds=30.0,
133             )
134         except Exception:
135             return None
136
137     def _get_raw_data(self, location: Location) -> Optional[str]:
138         from os.path import exists
139
140         if self.run_location == location:
141             persisted_macs = config.config.get(
142                 'presence_macs_file', '/home/scott/cron/persisted_mac_addresses.txt'
143             )
144             if exists(persisted_macs):
145                 with open(persisted_macs, 'r') as rf:
146                     return rf.read()
147             else:
148                 return PresenceDetection._get_raw_data_via_ssh(location)
149         else:
150             return PresenceDetection._get_raw_data_via_ssh(location)
151         return None
152
153     def _update_house(self) -> None:
154         data = self._get_raw_data(Location.HOUSE)
155         if data:
156             self._parse_raw_macs_file(data, Location.HOUSE)
157         else:
158             msg = "Can't see the house right now; presence detection impared."
159             warnings.warn(msg)
160             logger.warning(msg, stacklevel=2)
161             self.dark_locations.add(Location.HOUSE)
162
163     def _update_cabin(self) -> None:
164         data = self._get_raw_data(Location.CABIN)
165         if data:
166             self._parse_raw_macs_file(data, Location.CABIN)
167         else:
168             msg = "Can't see the cabin right now; presence detection impared."
169             warnings.warn(msg)
170             logger.warning(msg, stacklevel=2)
171             self.dark_locations.add(Location.CABIN)
172
173     def _parse_raw_macs_file(self, raw: str, location: Location) -> None:
174         """Internal method that parses the contents of the MACs file."""
175
176         lines = raw.split("\n")
177
178         # CC:F4:11:D7:FA:EE, 2240, 10.0.0.22 (side_deck_high_home), Google, 1611681990
179         cabin_count = 0
180         for line in lines:
181             line = line.strip()
182             if 'using fake authentication data for X11' in line:
183                 continue
184             if len(line) == 0:
185                 continue
186             logger.debug('%s> %s', location, line)
187             if "cabin_" in line:
188                 continue
189             if location == Location.CABIN:
190                 logger.debug('Cabin count: %d', cabin_count)
191                 cabin_count += 1
192             try:
193                 (mac, _, ip_name, _, ts) = line.split(",")  # type: ignore
194             except Exception as e:
195                 logger.exception(e)
196                 logger.error('SKIPPED BAD LINE> %s', line)
197                 continue
198             mac = mac.strip()
199             (self.location_ts_by_mac[location])[mac] = datetime.datetime.fromtimestamp(
200                 int(ts.strip())
201             )
202             ip_name = ip_name.strip()
203             match = re.match(r"(\d+\.\d+\.\d+\.\d+) +\(([^\)]+)\)", ip_name)
204             if match is not None:
205                 name = match.group(2)
206                 self.names_by_mac[mac] = name
207         if cabin_count > 0:
208             logger.debug('Weird MAC at the cabin')
209             self.weird_mac_at_cabin = True
210
211     def is_anyone_in_location_now(self, location: Location) -> bool:
212         """Determine if anyone is in a given location based on the presence of
213         MAC files seen recently on the network.
214
215         Args:
216             location: the location in question
217
218         Returns:
219             True if someone is detected or False otherwise.
220         """
221
222         self.maybe_update()
223         if location in self.dark_locations:
224             raise Exception(f"Can't see {location} right now; answer undefined.")
225         for person in Person:
226             if person is not None:
227                 loc = self.where_is_person_now(person)
228                 if location == loc:
229                     return True
230         if location == location.CABIN and self.weird_mac_at_cabin:
231             return True
232         return False
233
234     def where_is_person_now(self, name: Person) -> Location:
235         """Given a person, see if we can determine their location based on
236         network MAC addresses.
237
238         Args:
239             name: The person we're looking for.
240
241         Returns:
242             The Location where we think they are (including UNKNOWN).
243         """
244
245         self.maybe_update()
246         if len(self.dark_locations) > 0:
247             msg = f"Can't see {self.dark_locations} right now; answer confidence impacted"
248             logger.warning(msg)
249             warnings.warn(msg, stacklevel=2)
250         logger.debug('Looking for %s...', name)
251
252         if name is Person.UNKNOWN:
253             if self.weird_mac_at_cabin:
254                 return Location.CABIN
255             else:
256                 return Location.UNKNOWN
257
258         import dict_utils
259
260         votes: Dict[Location, int] = {}
261         tiebreaks: Dict[Location, datetime.datetime] = {}
262         credit = 10000
263         location = None
264         for mac in self.devices_by_person[name]:
265             if mac not in self.names_by_mac:
266                 continue
267             mac_name = self.names_by_mac[mac]
268             logger.debug('Looking for %s... check for mac %s (%s)', name, mac, mac_name)
269             for location in self.location_ts_by_mac:
270                 if mac in self.location_ts_by_mac[location]:
271                     ts = (self.location_ts_by_mac[location])[mac]
272                     logger.debug('Seen %s (%s) at %s since %s', mac, mac_name, location, ts)
273                     tiebreaks[location] = ts
274
275             (
276                 most_recent_location,
277                 _,
278             ) = dict_utils.item_with_max_value(tiebreaks)
279             bonus = credit
280             v = votes.get(most_recent_location, 0)
281             votes[most_recent_location] = v + bonus
282             logger.debug('%s: %s gets %d votes.', name, most_recent_location, bonus)
283             credit = int(credit * 0.2)  # Note: list most important devices first
284             if credit <= 0:
285                 credit = 1
286         if len(votes) > 0:
287             (location, value) = dict_utils.item_with_max_value(votes)
288             if value > 2001:
289                 assert location
290                 return location
291         return Location.UNKNOWN
292
293
294 @bootstrap.initialize
295 def main() -> None:
296     p = PresenceDetection()
297     for person in Person:
298         print(f'{person} => {p.where_is_person_now(person)}')
299     print()
300     for location in Location:
301         print(f'{location} => {p.is_anyone_in_location_now(location)}')
302
303
304 if __name__ == '__main__':
305     main()