Update docs.
[python_utils.git] / base_presence.py
1 #!/usr/bin/env python3
2
3 # © Copyright 2021-2022, Scott Gasch
4
5 """This is a module dealing with trying to guess a person's location
6 based on the location of certain devices (e.g. phones, laptops)
7 belonging to that person.  It works with networks I run that log
8 device MAC addresses active.
9
10 """
11
12 import datetime
13 import logging
14 import re
15 import warnings
16 from collections import defaultdict
17 from typing import Dict, List, Optional, Set
18
19 # Note: this module is fairly early loaded.  Be aware of dependencies.
20 import argparse_utils
21 import bootstrap
22 import config
23 import site_config
24 from type.locations import Location
25 from type.people import Person
26
27 logger = logging.getLogger(__name__)
28
29 cfg = config.add_commandline_args(
30     f"Presence Detection ({__file__})",
31     "Args related to detection of human beings in locations.",
32 )
33 cfg.add_argument(
34     "--presence_macs_file",
35     type=argparse_utils.valid_filename,
36     default="/home/scott/cron/persisted_mac_addresses.txt",
37     metavar="FILENAME",
38     help="The location of persisted_mac_addresses.txt to use.",
39 )
40 cfg.add_argument(
41     '--presence_tolerable_staleness_seconds',
42     type=argparse_utils.valid_duration,
43     default=datetime.timedelta(seconds=60 * 5),
44     metavar='DURATION',
45     help='Max acceptable age of location data before auto-refreshing',
46 )
47
48
49 class PresenceDetection(object):
50     """This is a module dealing with trying to guess a person's location
51     based on the location of certain devices (e.g. phones, laptops)
52     belonging to that person.  It works with networks I run that log
53     device MAC addresses active.
54     """
55
56     def __init__(self) -> None:
57         """C'tor"""
58
59         # Note: list most important devices first.
60         self.devices_by_person: Dict[Person, List[str]] = {
61             Person.SCOTT: [
62                 "DC:E5:5B:0F:03:3D",  # pixel6
63                 "6C:40:08:AE:DC:2E",  # laptop
64             ],
65             Person.LYNN: [
66                 "08:CC:27:63:26:14",  # motog7
67                 "B8:31:B5:9A:4F:19",  # laptop
68             ],
69             Person.ALEX: [
70                 "0C:CB:85:0C:8B:AE",  # phone
71                 "D0:C6:37:E3:36:9A",  # laptop
72             ],
73             Person.AARON_AND_DANA: [
74                 "98:B6:E9:E5:5A:7C",
75                 "D6:2F:37:CA:B2:9B",
76                 "6C:E8:5C:ED:17:26",
77                 "90:E1:7B:13:7C:E5",
78                 "6E:DC:7C:75:02:1B",
79                 "B2:16:1A:93:7D:50",
80                 "18:65:90:DA:3A:35",
81                 "22:28:C8:7D:3C:85",
82                 "B2:95:23:69:91:F8",
83                 "96:69:2C:88:7A:C3",
84             ],
85         }
86         self.run_location = site_config.get_location()
87         logger.debug("base_presence run_location is %s", self.run_location)
88         self.weird_mac_at_cabin = False
89         self.location_ts_by_mac: Dict[Location, Dict[str, datetime.datetime]] = defaultdict(dict)
90         self.names_by_mac: Dict[str, str] = {}
91         self.dark_locations: Set[Location] = set()
92         self.last_update: Optional[datetime.datetime] = None
93
94     def maybe_update(self) -> None:
95         """Determine if our state is stale and needs to be updated and do
96         it, if so.
97         """
98
99         if self.last_update is None:
100             self.update()
101         else:
102             now = datetime.datetime.now()
103             delta = now - self.last_update
104             if (
105                 delta.total_seconds()
106                 > config.config['presence_tolerable_staleness_seconds'].total_seconds()
107             ):
108                 logger.debug(
109                     "It's been %ss since last update; refreshing now.", delta.total_seconds()
110                 )
111                 self.update()
112
113     def update(self) -> None:
114         """Unconditionally update our state."""
115
116         self.dark_locations = set()
117         if self.run_location is Location.HOUSE:
118             self._update_from_house()
119         elif self.run_location is Location.CABIN:
120             self._update_from_cabin()
121         else:
122             raise Exception("Where the hell is this running?!")
123         self.last_update = datetime.datetime.now()
124
125     def _update_from_house(self) -> None:
126         """Internal method for updating from code running on the house
127         network."""
128
129         from exec_utils import cmd
130
131         try:
132             persisted_macs = config.config['presence_macs_file']
133         except KeyError:
134             persisted_macs = '/home/scott/cron/persisted_mac_addresses.txt'
135         self._read_persisted_macs_file(persisted_macs, Location.HOUSE)
136         try:
137             raw = cmd(
138                 "ssh [email protected] 'cat /home/scott/cron/persisted_mac_addresses.txt'",
139                 timeout_seconds=20.0,
140             )
141             self._parse_raw_macs_file(raw, Location.CABIN)
142         except Exception as e:
143             logger.exception(e)
144             msg = "Can't see the cabin right now; presence detection impared."
145             warnings.warn(msg)
146             logger.warning(msg, stacklevel=2)
147             self.dark_locations.add(Location.CABIN)
148
149     def _update_from_cabin(self) -> None:
150         """Internal method for updating from code running on the cabing
151         network."""
152
153         from exec_utils import cmd
154
155         try:
156             persisted_macs = config.config['presence_macs_file']
157         except KeyError:
158             persisted_macs = '/home/scott/cron/persisted_mac_addresses.txt'
159         self._read_persisted_macs_file(persisted_macs, Location.CABIN)
160         try:
161             raw = cmd(
162                 "ssh [email protected] 'cat /home/scott/cron/persisted_mac_addresses.txt'",
163                 timeout_seconds=10.0,
164             )
165             self._parse_raw_macs_file(raw, Location.HOUSE)
166         except Exception as e:
167             logger.exception(e)
168             msg = "Can't see the house right now; presence detection impared."
169             logger.warning(msg)
170             warnings.warn(msg, stacklevel=2)
171             self.dark_locations.add(Location.HOUSE)
172
173     def _read_persisted_macs_file(self, filename: str, location: Location) -> None:
174         """Internal method that, Given a filename that contains MAC addresses
175         seen on the network recently, reads it in and calls
176         _parse_raw_macs_file with the contents.
177
178         Args:
179             filename: The name of the file to read
180             location: The location we're reading from
181
182         """
183         if location is Location.UNKNOWN:
184             return
185         with open(filename, "r") as rf:
186             lines = rf.read()
187         self._parse_raw_macs_file(lines, location)
188
189     def _parse_raw_macs_file(self, raw: str, location: Location) -> None:
190         """Internal method that parses the contents of the MACs file."""
191
192         lines = raw.split("\n")
193
194         # CC:F4:11:D7:FA:EE, 2240, 10.0.0.22 (side_deck_high_home), Google, 1611681990
195         cabin_count = 0
196         for line in lines:
197             line = line.strip()
198             if len(line) == 0:
199                 continue
200             logger.debug('%s> %s', location, line)
201             if "cabin_" in line:
202                 continue
203             if location == Location.CABIN:
204                 logger.debug('Cabin count: %d', cabin_count)
205                 cabin_count += 1
206             try:
207                 (mac, _, ip_name, _, ts) = line.split(",")  # type: ignore
208             except Exception as e:
209                 logger.exception(e)
210                 logger.error('SKIPPED BAD LINE> %s', line)
211                 continue
212             mac = mac.strip()
213             (self.location_ts_by_mac[location])[mac] = datetime.datetime.fromtimestamp(
214                 int(ts.strip())
215             )
216             ip_name = ip_name.strip()
217             match = re.match(r"(\d+\.\d+\.\d+\.\d+) +\(([^\)]+)\)", ip_name)
218             if match is not None:
219                 name = match.group(2)
220                 self.names_by_mac[mac] = name
221         if cabin_count > 0:
222             logger.debug('Weird MAC at the cabin')
223             self.weird_mac_at_cabin = True
224
225     def is_anyone_in_location_now(self, location: Location) -> bool:
226         """Determine if anyone is in a given location based on the presence of
227         MAC files seen recently on the network.
228
229         Args:
230             location: the location in question
231
232         Returns:
233             True if someone is detected or False otherwise.
234         """
235
236         self.maybe_update()
237         if location in self.dark_locations:
238             raise Exception(f"Can't see {location} right now; answer undefined.")
239         for person in Person:
240             if person is not None:
241                 loc = self.where_is_person_now(person)
242                 if location == loc:
243                     return True
244         if location == location.CABIN and self.weird_mac_at_cabin:
245             return True
246         return False
247
248     def where_is_person_now(self, name: Person) -> Location:
249         """Given a person, see if we can determine their location based on
250         network MAC addresses.
251
252         Args:
253             name: The person we're looking for.
254
255         Returns:
256             The Location where we think they are (including UNKNOWN).
257         """
258
259         self.maybe_update()
260         if len(self.dark_locations) > 0:
261             msg = f"Can't see {self.dark_locations} right now; answer confidence impacted"
262             logger.warning(msg)
263             warnings.warn(msg, stacklevel=2)
264         logger.debug('Looking for %s...', name)
265
266         if name is Person.UNKNOWN:
267             if self.weird_mac_at_cabin:
268                 return Location.CABIN
269             else:
270                 return Location.UNKNOWN
271
272         import dict_utils
273
274         votes: Dict[Location, int] = {}
275         tiebreaks: Dict[Location, datetime.datetime] = {}
276         credit = 10000
277         location = None
278         for mac in self.devices_by_person[name]:
279             if mac not in self.names_by_mac:
280                 continue
281             mac_name = self.names_by_mac[mac]
282             logger.debug('Looking for %s... check for mac %s (%s)', name, mac, mac_name)
283             for location in self.location_ts_by_mac:
284                 if mac in self.location_ts_by_mac[location]:
285                     ts = (self.location_ts_by_mac[location])[mac]
286                     logger.debug('Seen %s (%s) at %s since %s', mac, mac_name, location, ts)
287                     tiebreaks[location] = ts
288
289             (
290                 most_recent_location,
291                 _,
292             ) = dict_utils.item_with_max_value(tiebreaks)
293             bonus = credit
294             v = votes.get(most_recent_location, 0)
295             votes[most_recent_location] = v + bonus
296             logger.debug('%s: %s gets %d votes.', name, most_recent_location, bonus)
297             credit = int(credit * 0.2)  # Note: list most important devices first
298             if credit <= 0:
299                 credit = 1
300         if len(votes) > 0:
301             (location, value) = dict_utils.item_with_max_value(votes)
302             if value > 2001:
303                 assert location
304                 return location
305         return Location.UNKNOWN
306
307
308 @bootstrap.initialize
309 def main() -> None:
310     p = PresenceDetection()
311     for person in Person:
312         print(f'{person} => {p.where_is_person_now(person)}')
313     print()
314     for location in Location:
315         print(f'{location} => {p.is_anyone_in_location_now(location)}')
316
317
318 if __name__ == '__main__':
319     main()