477b14ffa0a83c2a0cf8c18a90ab6537953cfeda
[python_utils.git] / presence.py
1 #!/usr/bin/env python3
2
3 import datetime
4 from collections import defaultdict
5 import logging
6 import re
7 from typing import Dict, List, Set
8
9 # Note: this module is fairly early loaded.  Be aware of dependencies.
10 import argparse_utils
11 import bootstrap
12 import config
13 from locations import Location
14 from people import Person
15 import site_config
16
17
18 logger = logging.getLogger(__name__)
19
20 cfg = config.add_commandline_args(
21     f"Presence Detection ({__file__})",
22     "Args related to detection of human beings in locations.",
23 )
24 cfg.add_argument(
25     "--presence_macs_file",
26     type=argparse_utils.valid_filename,
27     default = "/home/scott/cron/persisted_mac_addresses.txt",
28     metavar="FILENAME",
29     help="The location of persisted_mac_addresses.txt to use."
30 )
31
32
33 class PresenceDetection(object):
34     def __init__(self) -> None:
35         # Note: list most important devices first.
36         self.devices_by_person: Dict[Person, List[str]] = {
37             Person.SCOTT: [
38                 "3C:28:6D:10:6D:41", # pixel3
39                 "6C:40:08:AE:DC:2E", # laptop
40             ],
41             Person.LYNN: [
42                 "08:CC:27:63:26:14", # motog7
43                 "B8:31:B5:9A:4F:19", # laptop
44             ],
45             Person.ALEX: [
46                 "0C:CB:85:0C:8B:AE", # phone
47                 "D0:C6:37:E3:36:9A", # laptop
48             ],
49             Person.AARON_AND_DANA: [
50                 "98:B6:E9:E5:5A:7C",
51                 "D6:2F:37:CA:B2:9B",
52                 "6C:E8:5C:ED:17:26",
53                 "90:E1:7B:13:7C:E5",
54                 "6E:DC:7C:75:02:1B",
55                 "B2:16:1A:93:7D:50",
56                 "18:65:90:DA:3A:35",
57                 "22:28:C8:7D:3C:85",
58                 "B2:95:23:69:91:F8",
59                 "96:69:2C:88:7A:C3",
60             ],
61         }
62         self.run_location = site_config.get_location()
63         logger.debug(f"run_location is {self.run_location}")
64         self.weird_mac_at_cabin = False
65         self.location_ts_by_mac: Dict[
66             Location, Dict[str, datetime.datetime]
67         ] = defaultdict(dict)
68         self.names_by_mac: Dict[str, str] = {}
69         self.dark_locations: Set[Location] = set()
70         self.update()
71
72     def update(self) -> None:
73         self.dark_locations = set()
74         if self.run_location is Location.HOUSE:
75             self.update_from_house()
76         elif self.run_location is Location.CABIN:
77             self.update_from_cabin()
78         else:
79             raise Exception("Where the hell is this running?!")
80
81     def update_from_house(self) -> None:
82         from exec_utils import cmd_with_timeout
83         try:
84             persisted_macs = config.config['presence_macs_file']
85         except KeyError:
86             persisted_macs = '/home/scott/cron/persisted_mac_addresses.txt'
87         self.read_persisted_macs_file(persisted_macs, Location.HOUSE)
88         try:
89             raw = cmd_with_timeout(
90                 "ssh [email protected] 'cat /home/scott/cron/persisted_mac_addresses.txt'",
91                 timeout_seconds=10.0,
92             )
93             self.parse_raw_macs_file(raw, Location.CABIN)
94         except Exception as e:
95             logger.exception(e)
96             logger.warning("Can't see the cabin right now; presence detection impared.")
97             self.dark_locations.add(Location.CABIN)
98
99     def update_from_cabin(self) -> None:
100         from exec_utils import cmd_with_timeout
101         try:
102             persisted_macs = config.config['presence_macs_file']
103         except KeyError:
104             persisted_macs = '/home/scott/cron/persisted_mac_addresses.txt'
105         self.read_persisted_macs_file(persisted_macs, Location.CABIN)
106         try:
107             raw = cmd_with_timeout(
108                 "ssh [email protected] 'cat /home/scott/cron/persisted_mac_addresses.txt'",
109                 timeout_seconds=10.0,
110             )
111             self.parse_raw_macs_file(raw, Location.HOUSE)
112         except Exception as e:
113             logger.exception(e)
114             logger.warning(f"Can't see the house right now; presence detection impared.")
115             self.dark_locations.add(Location.HOUSE)
116
117     def read_persisted_macs_file(
118         self, filename: str, location: Location
119     ) -> None:
120         if location is Location.UNKNOWN:
121             return
122         with open(filename, "r") as rf:
123             lines = rf.read()
124         self.parse_raw_macs_file(lines, location)
125
126     def parse_raw_macs_file(self, raw: str, location: Location) -> None:
127         lines = raw.split("\n")
128
129         # CC:F4:11:D7:FA:EE, 2240, 10.0.0.22 (side_deck_high_home), Google, 1611681990
130         cabin_count = 0
131         for line in lines:
132             line = line.strip()
133             if len(line) == 0:
134                 continue
135             logger.debug(f'{location}> {line}')
136             if "cabin_" in line:
137                 continue
138             if location == Location.CABIN:
139                 logger.debug('Cabin count: {cabin_count}')
140                 cabin_count += 1
141             try:
142                 (mac, count, ip_name, mfg, ts) = line.split(",")
143             except Exception as e:
144                 logger.error(f'SKIPPED BAD LINE> {line}')
145                 logger.exception(e)
146                 continue
147             mac = mac.strip()
148             (self.location_ts_by_mac[location])[
149                 mac
150             ] = datetime.datetime.fromtimestamp(int(ts.strip()))
151             ip_name = ip_name.strip()
152             match = re.match(r"(\d+\.\d+\.\d+\.\d+) +\(([^\)]+)\)", ip_name)
153             if match is not None:
154                 name = match.group(2)
155                 self.names_by_mac[mac] = name
156         if cabin_count > 0:
157             logger.debug('Weird MAC at the cabin')
158             self.weird_mac_at_cabin = True
159
160     def is_anyone_in_location_now(self, location: Location) -> bool:
161         if location in self.dark_locations:
162             raise Exception("Can't see {location} right now; answer undefined.")
163         for person in Person:
164             if person is not None:
165                 loc = self.where_is_person_now(person)
166                 if location == loc:
167                     return True
168         if location == location.CABIN and self.weird_mac_at_cabin:
169             return True
170         return False
171
172     def where_is_person_now(self, name: Person) -> Location:
173         import dict_utils
174         if len(self.dark_locations) > 0:
175             logger.warning(
176                 f"Can't see {self.dark_locations} right now; answer confidence impacted"
177             )
178         logger.debug(f'Looking for {name}...')
179
180         if name is Person.UNKNOWN:
181             if self.weird_mac_at_cabin:
182                 return Location.CABIN
183             else:
184                 return Location.UNKNOWN
185         votes: Dict[Location, int] = {}
186         tiebreaks: Dict[Location, datetime.datetime] = {}
187         credit = 10000
188         for mac in self.devices_by_person[name]:
189             if mac not in self.names_by_mac:
190                 continue
191             mac_name = self.names_by_mac[mac]
192             logger.debug(f'Looking for {name}... check for mac {mac} ({mac_name})')
193             for location in self.location_ts_by_mac:
194                 if mac in self.location_ts_by_mac[location]:
195                     ts = (self.location_ts_by_mac[location])[mac]
196                     logger.debug(f'Seen {mac} ({mac_name}) at {location} since {ts}')
197                     tiebreaks[location] = ts
198
199             (most_recent_location, first_seen_ts) = dict_utils.item_with_max_value(tiebreaks)
200             bonus = credit
201             v = votes.get(most_recent_location, 0)
202             votes[most_recent_location] = v + bonus
203             logger.debug(f'{name}: {location} gets {bonus} votes.')
204             credit = int(
205                 credit * 0.2
206             )  # Note: list most important devices first
207             if credit <= 0:
208                 credit = 1
209         if len(votes) > 0:
210             (location, value) = dict_utils.item_with_max_value(votes)
211             if value > 2001:
212                 return location
213         return Location.UNKNOWN
214
215
216 @bootstrap.initialize
217 def main() -> None:
218     p = PresenceDetection()
219     for person in Person:
220         print(f'{person} => {p.where_is_person_now(person)}')
221     print()
222     for location in Location:
223         print(f'{location} => {p.is_anyone_in_location_now(location)}')
224
225
226 if __name__ == '__main__':
227     main()