Adding files needed to run sphinx as a pre-push hook.
[python_utils.git] / base_presence.py
1 #!/usr/bin/env python3
2
3 # © Copyright 2021-2022, Scott Gasch
4
5 """This is a module dealing with trying to guess a person's location
6 based on the location of certain devices (e.g. phones, laptops)
7 belonging to that person.  It works with networks I run that log
8 device MAC addresses active.
9
10 """
11
12 import datetime
13 import logging
14 import re
15 import warnings
16 from collections import defaultdict
17 from typing import Dict, List, Optional, Set
18
19 # Note: this module is fairly early loaded.  Be aware of dependencies.
20 import argparse_utils
21 import bootstrap
22 import config
23 import site_config
24 from type.locations import Location
25 from type.people import Person
26
27 logger = logging.getLogger(__name__)
28
29 cfg = config.add_commandline_args(
30     f"Presence Detection ({__file__})",
31     "Args related to detection of human beings in locations.",
32 )
33 cfg.add_argument(
34     "--presence_macs_file",
35     type=argparse_utils.valid_filename,
36     default="/home/scott/cron/persisted_mac_addresses.txt",
37     metavar="FILENAME",
38     help="The location of persisted_mac_addresses.txt to use.",
39 )
40 cfg.add_argument(
41     '--presence_tolerable_staleness_seconds',
42     type=argparse_utils.valid_duration,
43     default=datetime.timedelta(seconds=60 * 5),
44     metavar='DURATION',
45     help='Max acceptable age of location data before auto-refreshing',
46 )
47
48
49 class PresenceDetection(object):
50     """See above.  This is a base class for determining a person's
51     location on networks I administer."""
52
53     def __init__(self) -> None:
54         # Note: list most important devices first.
55         self.devices_by_person: Dict[Person, List[str]] = {
56             Person.SCOTT: [
57                 "DC:E5:5B:0F:03:3D",  # pixel6
58                 "6C:40:08:AE:DC:2E",  # laptop
59             ],
60             Person.LYNN: [
61                 "08:CC:27:63:26:14",  # motog7
62                 "B8:31:B5:9A:4F:19",  # laptop
63             ],
64             Person.ALEX: [
65                 "0C:CB:85:0C:8B:AE",  # phone
66                 "D0:C6:37:E3:36:9A",  # laptop
67             ],
68             Person.AARON_AND_DANA: [
69                 "98:B6:E9:E5:5A:7C",
70                 "D6:2F:37:CA:B2:9B",
71                 "6C:E8:5C:ED:17:26",
72                 "90:E1:7B:13:7C:E5",
73                 "6E:DC:7C:75:02:1B",
74                 "B2:16:1A:93:7D:50",
75                 "18:65:90:DA:3A:35",
76                 "22:28:C8:7D:3C:85",
77                 "B2:95:23:69:91:F8",
78                 "96:69:2C:88:7A:C3",
79             ],
80         }
81         self.run_location = site_config.get_location()
82         logger.debug("base_presence run_location is %s", self.run_location)
83         self.weird_mac_at_cabin = False
84         self.location_ts_by_mac: Dict[Location, Dict[str, datetime.datetime]] = defaultdict(dict)
85         self.names_by_mac: Dict[str, str] = {}
86         self.dark_locations: Set[Location] = set()
87         self.last_update: Optional[datetime.datetime] = None
88
89     def maybe_update(self) -> None:
90         if self.last_update is None:
91             self.update()
92         else:
93             now = datetime.datetime.now()
94             delta = now - self.last_update
95             if (
96                 delta.total_seconds()
97                 > config.config['presence_tolerable_staleness_seconds'].total_seconds()
98             ):
99                 logger.debug(
100                     "It's been %ss since last update; refreshing now.", delta.total_seconds()
101                 )
102                 self.update()
103
104     def update(self) -> None:
105         self.dark_locations = set()
106         if self.run_location is Location.HOUSE:
107             self.update_from_house()
108         elif self.run_location is Location.CABIN:
109             self.update_from_cabin()
110         else:
111             raise Exception("Where the hell is this running?!")
112         self.last_update = datetime.datetime.now()
113
114     def update_from_house(self) -> None:
115         from exec_utils import cmd
116
117         try:
118             persisted_macs = config.config['presence_macs_file']
119         except KeyError:
120             persisted_macs = '/home/scott/cron/persisted_mac_addresses.txt'
121         self.read_persisted_macs_file(persisted_macs, Location.HOUSE)
122         try:
123             raw = cmd(
124                 "ssh [email protected] 'cat /home/scott/cron/persisted_mac_addresses.txt'",
125                 timeout_seconds=20.0,
126             )
127             self.parse_raw_macs_file(raw, Location.CABIN)
128         except Exception as e:
129             logger.exception(e)
130             msg = "Can't see the cabin right now; presence detection impared."
131             warnings.warn(msg)
132             logger.warning(msg, stacklevel=2)
133             self.dark_locations.add(Location.CABIN)
134
135     def update_from_cabin(self) -> None:
136         from exec_utils import cmd
137
138         try:
139             persisted_macs = config.config['presence_macs_file']
140         except KeyError:
141             persisted_macs = '/home/scott/cron/persisted_mac_addresses.txt'
142         self.read_persisted_macs_file(persisted_macs, Location.CABIN)
143         try:
144             raw = cmd(
145                 "ssh [email protected] 'cat /home/scott/cron/persisted_mac_addresses.txt'",
146                 timeout_seconds=10.0,
147             )
148             self.parse_raw_macs_file(raw, Location.HOUSE)
149         except Exception as e:
150             logger.exception(e)
151             msg = "Can't see the house right now; presence detection impared."
152             logger.warning(msg)
153             warnings.warn(msg, stacklevel=2)
154             self.dark_locations.add(Location.HOUSE)
155
156     def read_persisted_macs_file(self, filename: str, location: Location) -> None:
157         if location is Location.UNKNOWN:
158             return
159         with open(filename, "r") as rf:
160             lines = rf.read()
161         self.parse_raw_macs_file(lines, location)
162
163     def parse_raw_macs_file(self, raw: str, location: Location) -> None:
164         lines = raw.split("\n")
165
166         # CC:F4:11:D7:FA:EE, 2240, 10.0.0.22 (side_deck_high_home), Google, 1611681990
167         cabin_count = 0
168         for line in lines:
169             line = line.strip()
170             if len(line) == 0:
171                 continue
172             logger.debug('%s> %s', location, line)
173             if "cabin_" in line:
174                 continue
175             if location == Location.CABIN:
176                 logger.debug('Cabin count: %d', cabin_count)
177                 cabin_count += 1
178             try:
179                 (mac, _, ip_name, _, ts) = line.split(",")  # type: ignore
180             except Exception as e:
181                 logger.exception(e)
182                 logger.error('SKIPPED BAD LINE> %s', line)
183                 continue
184             mac = mac.strip()
185             (self.location_ts_by_mac[location])[mac] = datetime.datetime.fromtimestamp(
186                 int(ts.strip())
187             )
188             ip_name = ip_name.strip()
189             match = re.match(r"(\d+\.\d+\.\d+\.\d+) +\(([^\)]+)\)", ip_name)
190             if match is not None:
191                 name = match.group(2)
192                 self.names_by_mac[mac] = name
193         if cabin_count > 0:
194             logger.debug('Weird MAC at the cabin')
195             self.weird_mac_at_cabin = True
196
197     def is_anyone_in_location_now(self, location: Location) -> bool:
198         self.maybe_update()
199         if location in self.dark_locations:
200             raise Exception(f"Can't see {location} right now; answer undefined.")
201         for person in Person:
202             if person is not None:
203                 loc = self.where_is_person_now(person)
204                 if location == loc:
205                     return True
206         if location == location.CABIN and self.weird_mac_at_cabin:
207             return True
208         return False
209
210     def where_is_person_now(self, name: Person) -> Location:
211         self.maybe_update()
212         if len(self.dark_locations) > 0:
213             msg = f"Can't see {self.dark_locations} right now; answer confidence impacted"
214             logger.warning(msg)
215             warnings.warn(msg, stacklevel=2)
216         logger.debug('Looking for %s...', name)
217
218         if name is Person.UNKNOWN:
219             if self.weird_mac_at_cabin:
220                 return Location.CABIN
221             else:
222                 return Location.UNKNOWN
223
224         import dict_utils
225
226         votes: Dict[Location, int] = {}
227         tiebreaks: Dict[Location, datetime.datetime] = {}
228         credit = 10000
229         location = None
230         for mac in self.devices_by_person[name]:
231             if mac not in self.names_by_mac:
232                 continue
233             mac_name = self.names_by_mac[mac]
234             logger.debug('Looking for %s... check for mac %s (%s)', name, mac, mac_name)
235             for location in self.location_ts_by_mac:
236                 if mac in self.location_ts_by_mac[location]:
237                     ts = (self.location_ts_by_mac[location])[mac]
238                     logger.debug('Seen %s (%s) at %s since %s', mac, mac_name, location, ts)
239                     tiebreaks[location] = ts
240
241             (
242                 most_recent_location,
243                 _,
244             ) = dict_utils.item_with_max_value(tiebreaks)
245             bonus = credit
246             v = votes.get(most_recent_location, 0)
247             votes[most_recent_location] = v + bonus
248             logger.debug('%s: %s gets %d votes.', name, most_recent_location, bonus)
249             credit = int(credit * 0.2)  # Note: list most important devices first
250             if credit <= 0:
251                 credit = 1
252         if len(votes) > 0:
253             (location, value) = dict_utils.item_with_max_value(votes)
254             if value > 2001:
255                 assert location
256                 return location
257         return Location.UNKNOWN
258
259
260 @bootstrap.initialize
261 def main() -> None:
262     p = PresenceDetection()
263     for person in Person:
264         print(f'{person} => {p.where_is_person_now(person)}')
265     print()
266     for location in Location:
267         print(f'{location} => {p.is_anyone_in_location_now(location)}')
268
269
270 if __name__ == '__main__':
271     main()