More cleanup, yey!
[python_utils.git] / base_presence.py
1 #!/usr/bin/env python3
2
3 import datetime
4 import logging
5 import re
6 import warnings
7 from collections import defaultdict
8 from typing import Dict, List, Optional, Set
9
10 # Note: this module is fairly early loaded.  Be aware of dependencies.
11 import argparse_utils
12 import bootstrap
13 import config
14 import site_config
15 from type.locations import Location
16 from type.people import Person
17
18 logger = logging.getLogger(__name__)
19
20 cfg = config.add_commandline_args(
21     f"Presence Detection ({__file__})",
22     "Args related to detection of human beings in locations.",
23 )
24 cfg.add_argument(
25     "--presence_macs_file",
26     type=argparse_utils.valid_filename,
27     default="/home/scott/cron/persisted_mac_addresses.txt",
28     metavar="FILENAME",
29     help="The location of persisted_mac_addresses.txt to use.",
30 )
31 cfg.add_argument(
32     '--presence_tolerable_staleness_seconds',
33     type=argparse_utils.valid_duration,
34     default=datetime.timedelta(seconds=60 * 5),
35     metavar='DURATION',
36     help='Max acceptable age of location data before auto-refreshing',
37 )
38
39
40 class PresenceDetection(object):
41     def __init__(self) -> None:
42         # Note: list most important devices first.
43         self.devices_by_person: Dict[Person, List[str]] = {
44             Person.SCOTT: [
45                 "DC:E5:5B:0F:03:3D",  # pixel6
46                 "6C:40:08:AE:DC:2E",  # laptop
47             ],
48             Person.LYNN: [
49                 "08:CC:27:63:26:14",  # motog7
50                 "B8:31:B5:9A:4F:19",  # laptop
51             ],
52             Person.ALEX: [
53                 "0C:CB:85:0C:8B:AE",  # phone
54                 "D0:C6:37:E3:36:9A",  # laptop
55             ],
56             Person.AARON_AND_DANA: [
57                 "98:B6:E9:E5:5A:7C",
58                 "D6:2F:37:CA:B2:9B",
59                 "6C:E8:5C:ED:17:26",
60                 "90:E1:7B:13:7C:E5",
61                 "6E:DC:7C:75:02:1B",
62                 "B2:16:1A:93:7D:50",
63                 "18:65:90:DA:3A:35",
64                 "22:28:C8:7D:3C:85",
65                 "B2:95:23:69:91:F8",
66                 "96:69:2C:88:7A:C3",
67             ],
68         }
69         self.run_location = site_config.get_location()
70         logger.debug("base_presence run_location is %s", self.run_location)
71         self.weird_mac_at_cabin = False
72         self.location_ts_by_mac: Dict[Location, Dict[str, datetime.datetime]] = defaultdict(dict)
73         self.names_by_mac: Dict[str, str] = {}
74         self.dark_locations: Set[Location] = set()
75         self.last_update: Optional[datetime.datetime] = None
76
77     def maybe_update(self) -> None:
78         if self.last_update is None:
79             self.update()
80         else:
81             now = datetime.datetime.now()
82             delta = now - self.last_update
83             if (
84                 delta.total_seconds()
85                 > config.config['presence_tolerable_staleness_seconds'].total_seconds()
86             ):
87                 logger.debug(
88                     "It's been %ss since last update; refreshing now.", delta.total_seconds()
89                 )
90                 self.update()
91
92     def update(self) -> None:
93         self.dark_locations = set()
94         if self.run_location is Location.HOUSE:
95             self.update_from_house()
96         elif self.run_location is Location.CABIN:
97             self.update_from_cabin()
98         else:
99             raise Exception("Where the hell is this running?!")
100         self.last_update = datetime.datetime.now()
101
102     def update_from_house(self) -> None:
103         from exec_utils import cmd
104
105         try:
106             persisted_macs = config.config['presence_macs_file']
107         except KeyError:
108             persisted_macs = '/home/scott/cron/persisted_mac_addresses.txt'
109         self.read_persisted_macs_file(persisted_macs, Location.HOUSE)
110         try:
111             raw = cmd(
112                 "ssh [email protected] 'cat /home/scott/cron/persisted_mac_addresses.txt'",
113                 timeout_seconds=10.0,
114             )
115             self.parse_raw_macs_file(raw, Location.CABIN)
116         except Exception as e:
117             logger.exception(e)
118             msg = "Can't see the cabin right now; presence detection impared."
119             warnings.warn(msg)
120             logger.warning(msg, stacklevel=2)
121             self.dark_locations.add(Location.CABIN)
122
123     def update_from_cabin(self) -> None:
124         from exec_utils import cmd
125
126         try:
127             persisted_macs = config.config['presence_macs_file']
128         except KeyError:
129             persisted_macs = '/home/scott/cron/persisted_mac_addresses.txt'
130         self.read_persisted_macs_file(persisted_macs, Location.CABIN)
131         try:
132             raw = cmd(
133                 "ssh [email protected] 'cat /home/scott/cron/persisted_mac_addresses.txt'",
134                 timeout_seconds=10.0,
135             )
136             self.parse_raw_macs_file(raw, Location.HOUSE)
137         except Exception as e:
138             logger.exception(e)
139             msg = "Can't see the house right now; presence detection impared."
140             logger.warning(msg)
141             warnings.warn(msg, stacklevel=2)
142             self.dark_locations.add(Location.HOUSE)
143
144     def read_persisted_macs_file(self, filename: str, location: Location) -> None:
145         if location is Location.UNKNOWN:
146             return
147         with open(filename, "r") as rf:
148             lines = rf.read()
149         self.parse_raw_macs_file(lines, location)
150
151     def parse_raw_macs_file(self, raw: str, location: Location) -> None:
152         lines = raw.split("\n")
153
154         # CC:F4:11:D7:FA:EE, 2240, 10.0.0.22 (side_deck_high_home), Google, 1611681990
155         cabin_count = 0
156         for line in lines:
157             line = line.strip()
158             if len(line) == 0:
159                 continue
160             logger.debug('%s> %s', location, line)
161             if "cabin_" in line:
162                 continue
163             if location == Location.CABIN:
164                 logger.debug('Cabin count: %d', cabin_count)
165                 cabin_count += 1
166             try:
167                 (mac, _, ip_name, _, ts) = line.split(",")  # type: ignore
168             except Exception as e:
169                 logger.exception(e)
170                 logger.error('SKIPPED BAD LINE> %s', line)
171                 continue
172             mac = mac.strip()
173             (self.location_ts_by_mac[location])[mac] = datetime.datetime.fromtimestamp(
174                 int(ts.strip())
175             )
176             ip_name = ip_name.strip()
177             match = re.match(r"(\d+\.\d+\.\d+\.\d+) +\(([^\)]+)\)", ip_name)
178             if match is not None:
179                 name = match.group(2)
180                 self.names_by_mac[mac] = name
181         if cabin_count > 0:
182             logger.debug('Weird MAC at the cabin')
183             self.weird_mac_at_cabin = True
184
185     def is_anyone_in_location_now(self, location: Location) -> bool:
186         self.maybe_update()
187         if location in self.dark_locations:
188             raise Exception(f"Can't see {location} right now; answer undefined.")
189         for person in Person:
190             if person is not None:
191                 loc = self.where_is_person_now(person)
192                 if location == loc:
193                     return True
194         if location == location.CABIN and self.weird_mac_at_cabin:
195             return True
196         return False
197
198     def where_is_person_now(self, name: Person) -> Location:
199         self.maybe_update()
200         if len(self.dark_locations) > 0:
201             msg = f"Can't see {self.dark_locations} right now; answer confidence impacted"
202             logger.warning(msg)
203             warnings.warn(msg, stacklevel=2)
204         logger.debug('Looking for %s...', name)
205
206         if name is Person.UNKNOWN:
207             if self.weird_mac_at_cabin:
208                 return Location.CABIN
209             else:
210                 return Location.UNKNOWN
211
212         import dict_utils
213
214         votes: Dict[Location, int] = {}
215         tiebreaks: Dict[Location, datetime.datetime] = {}
216         credit = 10000
217         location = None
218         for mac in self.devices_by_person[name]:
219             if mac not in self.names_by_mac:
220                 continue
221             mac_name = self.names_by_mac[mac]
222             logger.debug(
223                 'Looking for %s... check for mac %s (%s)',
224                 name, mac, mac_name
225             )
226             for location in self.location_ts_by_mac:
227                 if mac in self.location_ts_by_mac[location]:
228                     ts = (self.location_ts_by_mac[location])[mac]
229                     logger.debug(
230                         'Seen %s (%s) at %s since %s',
231                         mac, mac_name, location, ts
232                     )
233                     tiebreaks[location] = ts
234
235             (
236                 most_recent_location,
237                 _,
238             ) = dict_utils.item_with_max_value(tiebreaks)
239             bonus = credit
240             v = votes.get(most_recent_location, 0)
241             votes[most_recent_location] = v + bonus
242             logger.debug('%s: %s gets %d votes.', name, most_recent_location, bonus)
243             credit = int(credit * 0.2)  # Note: list most important devices first
244             if credit <= 0:
245                 credit = 1
246         if len(votes) > 0:
247             (location, value) = dict_utils.item_with_max_value(votes)
248             if value > 2001:
249                 assert location
250                 return location
251         return Location.UNKNOWN
252
253
254 @bootstrap.initialize
255 def main() -> None:
256     p = PresenceDetection()
257     for person in Person:
258         print(f'{person} => {p.where_is_person_now(person)}')
259     print()
260
261
262 #    for location in Location:
263 #        print(f'{location} => {p.is_anyone_in_location_now(location)}')
264
265
266 if __name__ == '__main__':
267     main()