8be4d936d7a5e1e9be2027e8b3ad5b7ea47b98de
[python_utils.git] / base_presence.py
1 #!/usr/bin/env python3
2
3 """This is a module dealing with trying to guess a person's location
4 based on the location of certain devices (e.g. phones, laptops)
5 belonging to that person.  It works with networks I run that log
6 device MAC addresses active."""
7
8 import datetime
9 import logging
10 import re
11 import warnings
12 from collections import defaultdict
13 from typing import Dict, List, Optional, Set
14
15 # Note: this module is fairly early loaded.  Be aware of dependencies.
16 import argparse_utils
17 import bootstrap
18 import config
19 import site_config
20 from type.locations import Location
21 from type.people import Person
22
23 logger = logging.getLogger(__name__)
24
25 cfg = config.add_commandline_args(
26     f"Presence Detection ({__file__})",
27     "Args related to detection of human beings in locations.",
28 )
29 cfg.add_argument(
30     "--presence_macs_file",
31     type=argparse_utils.valid_filename,
32     default="/home/scott/cron/persisted_mac_addresses.txt",
33     metavar="FILENAME",
34     help="The location of persisted_mac_addresses.txt to use.",
35 )
36 cfg.add_argument(
37     '--presence_tolerable_staleness_seconds',
38     type=argparse_utils.valid_duration,
39     default=datetime.timedelta(seconds=60 * 5),
40     metavar='DURATION',
41     help='Max acceptable age of location data before auto-refreshing',
42 )
43
44
45 class PresenceDetection(object):
46     """See above.  This is a base class for determining a person's
47     location on networks I administer."""
48
49     def __init__(self) -> None:
50         # Note: list most important devices first.
51         self.devices_by_person: Dict[Person, List[str]] = {
52             Person.SCOTT: [
53                 "DC:E5:5B:0F:03:3D",  # pixel6
54                 "6C:40:08:AE:DC:2E",  # laptop
55             ],
56             Person.LYNN: [
57                 "08:CC:27:63:26:14",  # motog7
58                 "B8:31:B5:9A:4F:19",  # laptop
59             ],
60             Person.ALEX: [
61                 "0C:CB:85:0C:8B:AE",  # phone
62                 "D0:C6:37:E3:36:9A",  # laptop
63             ],
64             Person.AARON_AND_DANA: [
65                 "98:B6:E9:E5:5A:7C",
66                 "D6:2F:37:CA:B2:9B",
67                 "6C:E8:5C:ED:17:26",
68                 "90:E1:7B:13:7C:E5",
69                 "6E:DC:7C:75:02:1B",
70                 "B2:16:1A:93:7D:50",
71                 "18:65:90:DA:3A:35",
72                 "22:28:C8:7D:3C:85",
73                 "B2:95:23:69:91:F8",
74                 "96:69:2C:88:7A:C3",
75             ],
76         }
77         self.run_location = site_config.get_location()
78         logger.debug("base_presence run_location is %s", self.run_location)
79         self.weird_mac_at_cabin = False
80         self.location_ts_by_mac: Dict[Location, Dict[str, datetime.datetime]] = defaultdict(dict)
81         self.names_by_mac: Dict[str, str] = {}
82         self.dark_locations: Set[Location] = set()
83         self.last_update: Optional[datetime.datetime] = None
84
85     def maybe_update(self) -> None:
86         if self.last_update is None:
87             self.update()
88         else:
89             now = datetime.datetime.now()
90             delta = now - self.last_update
91             if (
92                 delta.total_seconds()
93                 > config.config['presence_tolerable_staleness_seconds'].total_seconds()
94             ):
95                 logger.debug(
96                     "It's been %ss since last update; refreshing now.", delta.total_seconds()
97                 )
98                 self.update()
99
100     def update(self) -> None:
101         self.dark_locations = set()
102         if self.run_location is Location.HOUSE:
103             self.update_from_house()
104         elif self.run_location is Location.CABIN:
105             self.update_from_cabin()
106         else:
107             raise Exception("Where the hell is this running?!")
108         self.last_update = datetime.datetime.now()
109
110     def update_from_house(self) -> None:
111         from exec_utils import cmd
112
113         try:
114             persisted_macs = config.config['presence_macs_file']
115         except KeyError:
116             persisted_macs = '/home/scott/cron/persisted_mac_addresses.txt'
117         self.read_persisted_macs_file(persisted_macs, Location.HOUSE)
118         try:
119             raw = cmd(
120                 "ssh [email protected] 'cat /home/scott/cron/persisted_mac_addresses.txt'",
121                 timeout_seconds=10.0,
122             )
123             self.parse_raw_macs_file(raw, Location.CABIN)
124         except Exception as e:
125             logger.exception(e)
126             msg = "Can't see the cabin right now; presence detection impared."
127             warnings.warn(msg)
128             logger.warning(msg, stacklevel=2)
129             self.dark_locations.add(Location.CABIN)
130
131     def update_from_cabin(self) -> None:
132         from exec_utils import cmd
133
134         try:
135             persisted_macs = config.config['presence_macs_file']
136         except KeyError:
137             persisted_macs = '/home/scott/cron/persisted_mac_addresses.txt'
138         self.read_persisted_macs_file(persisted_macs, Location.CABIN)
139         try:
140             raw = cmd(
141                 "ssh [email protected] 'cat /home/scott/cron/persisted_mac_addresses.txt'",
142                 timeout_seconds=10.0,
143             )
144             self.parse_raw_macs_file(raw, Location.HOUSE)
145         except Exception as e:
146             logger.exception(e)
147             msg = "Can't see the house right now; presence detection impared."
148             logger.warning(msg)
149             warnings.warn(msg, stacklevel=2)
150             self.dark_locations.add(Location.HOUSE)
151
152     def read_persisted_macs_file(self, filename: str, location: Location) -> None:
153         if location is Location.UNKNOWN:
154             return
155         with open(filename, "r") as rf:
156             lines = rf.read()
157         self.parse_raw_macs_file(lines, location)
158
159     def parse_raw_macs_file(self, raw: str, location: Location) -> None:
160         lines = raw.split("\n")
161
162         # CC:F4:11:D7:FA:EE, 2240, 10.0.0.22 (side_deck_high_home), Google, 1611681990
163         cabin_count = 0
164         for line in lines:
165             line = line.strip()
166             if len(line) == 0:
167                 continue
168             logger.debug('%s> %s', location, line)
169             if "cabin_" in line:
170                 continue
171             if location == Location.CABIN:
172                 logger.debug('Cabin count: %d', cabin_count)
173                 cabin_count += 1
174             try:
175                 (mac, _, ip_name, _, ts) = line.split(",")  # type: ignore
176             except Exception as e:
177                 logger.exception(e)
178                 logger.error('SKIPPED BAD LINE> %s', line)
179                 continue
180             mac = mac.strip()
181             (self.location_ts_by_mac[location])[mac] = datetime.datetime.fromtimestamp(
182                 int(ts.strip())
183             )
184             ip_name = ip_name.strip()
185             match = re.match(r"(\d+\.\d+\.\d+\.\d+) +\(([^\)]+)\)", ip_name)
186             if match is not None:
187                 name = match.group(2)
188                 self.names_by_mac[mac] = name
189         if cabin_count > 0:
190             logger.debug('Weird MAC at the cabin')
191             self.weird_mac_at_cabin = True
192
193     def is_anyone_in_location_now(self, location: Location) -> bool:
194         self.maybe_update()
195         if location in self.dark_locations:
196             raise Exception(f"Can't see {location} right now; answer undefined.")
197         for person in Person:
198             if person is not None:
199                 loc = self.where_is_person_now(person)
200                 if location == loc:
201                     return True
202         if location == location.CABIN and self.weird_mac_at_cabin:
203             return True
204         return False
205
206     def where_is_person_now(self, name: Person) -> Location:
207         self.maybe_update()
208         if len(self.dark_locations) > 0:
209             msg = f"Can't see {self.dark_locations} right now; answer confidence impacted"
210             logger.warning(msg)
211             warnings.warn(msg, stacklevel=2)
212         logger.debug('Looking for %s...', name)
213
214         if name is Person.UNKNOWN:
215             if self.weird_mac_at_cabin:
216                 return Location.CABIN
217             else:
218                 return Location.UNKNOWN
219
220         import dict_utils
221
222         votes: Dict[Location, int] = {}
223         tiebreaks: Dict[Location, datetime.datetime] = {}
224         credit = 10000
225         location = None
226         for mac in self.devices_by_person[name]:
227             if mac not in self.names_by_mac:
228                 continue
229             mac_name = self.names_by_mac[mac]
230             logger.debug('Looking for %s... check for mac %s (%s)', name, mac, mac_name)
231             for location in self.location_ts_by_mac:
232                 if mac in self.location_ts_by_mac[location]:
233                     ts = (self.location_ts_by_mac[location])[mac]
234                     logger.debug('Seen %s (%s) at %s since %s', mac, mac_name, location, ts)
235                     tiebreaks[location] = ts
236
237             (
238                 most_recent_location,
239                 _,
240             ) = dict_utils.item_with_max_value(tiebreaks)
241             bonus = credit
242             v = votes.get(most_recent_location, 0)
243             votes[most_recent_location] = v + bonus
244             logger.debug('%s: %s gets %d votes.', name, most_recent_location, bonus)
245             credit = int(credit * 0.2)  # Note: list most important devices first
246             if credit <= 0:
247                 credit = 1
248         if len(votes) > 0:
249             (location, value) = dict_utils.item_with_max_value(votes)
250             if value > 2001:
251                 assert location
252                 return location
253         return Location.UNKNOWN
254
255
256 @bootstrap.initialize
257 def main() -> None:
258     p = PresenceDetection()
259     for person in Person:
260         print(f'{person} => {p.where_is_person_now(person)}')
261     print()
262     for location in Location:
263         print(f'{location} => {p.is_anyone_in_location_now(location)}')
264
265
266 if __name__ == '__main__':
267     main()