Various changes.
[python_utils.git] / presence.py
1 #!/usr/bin/env python3
2
3 import datetime
4 from collections import defaultdict
5 import enum
6 import logging
7 import re
8 import sys
9 from typing import Dict, List
10
11 # Note: this module is fairly early loaded.  Be aware of dependencies.
12 import argparse_utils
13 import bootstrap
14 import config
15
16 logger = logging.getLogger(__name__)
17
18 cfg = config.add_commandline_args(
19     f"Presence Detection ({__file__})",
20     "Args related to detection of human beings in locations.",
21 )
22 cfg.add_argument(
23     "--presence_macs_file",
24     type=argparse_utils.valid_filename,
25     default = "/home/scott/cron/persisted_mac_addresses.txt",
26     metavar="FILENAME",
27     help="The location of persisted_mac_addresses.txt to use."
28 )
29
30
31 class Person(enum.Enum):
32     UNKNOWN = 0
33     SCOTT = 1
34     LYNN = 2
35     ALEX = 3
36     AARON_AND_DANA = 4
37     AARON = 4
38     DANA = 4
39
40
41 @enum.unique
42 class Location(enum.Enum):
43     UNKNOWN = 0
44     HOUSE = 1
45     CABIN = 2
46
47
48 class PresenceDetection(object):
49     def __init__(self) -> None:
50         # Note: list most important devices first.
51         self.devices_by_person: Dict[Person, List[str]] = {
52             Person.SCOTT: [
53                 "3C:28:6D:10:6D:41", # pixel3
54                 "6C:40:08:AE:DC:2E", # laptop
55             ],
56             Person.LYNN: [
57                 "08:CC:27:63:26:14", # motog7
58                 "B8:31:B5:9A:4F:19", # laptop
59             ],
60             Person.ALEX: [
61                 "0C:CB:85:0C:8B:AE", # phone
62                 "D0:C6:37:E3:36:9A", # laptop
63             ],
64             Person.AARON_AND_DANA: [
65                 "98:B6:E9:E5:5A:7C",
66                 "D6:2F:37:CA:B2:9B",
67                 "6C:E8:5C:ED:17:26",
68                 "90:E1:7B:13:7C:E5",
69                 "6E:DC:7C:75:02:1B",
70                 "B2:16:1A:93:7D:50",
71                 "18:65:90:DA:3A:35",
72                 "22:28:C8:7D:3C:85",
73                 "B2:95:23:69:91:F8",
74                 "96:69:2C:88:7A:C3",
75             ],
76         }
77         self.weird_mac_at_cabin = False
78         self.location_ts_by_mac: Dict[
79             Location, Dict[str, datetime.datetime]
80         ] = defaultdict(dict)
81         self.names_by_mac: Dict[str, str] = {}
82         self.update()
83
84     def update(self) -> None:
85         from exec_utils import cmd_with_timeout
86         try:
87             persisted_macs = config.config['presence_macs_file']
88         except KeyError:
89             persisted_macs = '/home/scott/cron/persisted_mac_addresses.txt'
90         self.read_persisted_macs_file(persisted_macs, Location.HOUSE)
91         try:
92             raw = cmd_with_timeout(
93                 "ssh [email protected] 'cat /home/scott/cron/persisted_mac_addresses.txt'",
94                 timeout_seconds=10.0,
95             )
96             self.parse_raw_macs_file(raw, Location.CABIN)
97         except Exception as e:
98             logger.exception(e)
99             logger.error(
100                 'Unable to fetch MAC Addresses from meerkat; can\'t do proper presence detection.'
101             )
102             sys.exit(1)
103
104     def read_persisted_macs_file(
105         self, filename: str, location: Location
106     ) -> None:
107         if location is Location.UNKNOWN:
108             return
109         with open(filename, "r") as rf:
110             lines = rf.read()
111         self.parse_raw_macs_file(lines, location)
112
113     def parse_raw_macs_file(self, raw: str, location: Location) -> None:
114         lines = raw.split("\n")
115
116         # CC:F4:11:D7:FA:EE, 2240, 10.0.0.22 (side_deck_high_home), Google, 1611681990
117         cabin_count = 0
118         for line in lines:
119             line = line.strip()
120             if len(line) == 0:
121                 continue
122             logger.debug(f'{location}> {line}')
123             if "cabin_" in line:
124                 continue
125             if location == Location.CABIN:
126                 logger.debug('Cabin count: {cabin_count}')
127                 cabin_count += 1
128             try:
129                 (mac, count, ip_name, mfg, ts) = line.split(",")
130             except Exception as e:
131                 logger.error(f'SKIPPED BAD LINE> {line}')
132                 logger.exception(e)
133                 continue
134             mac = mac.strip()
135             (self.location_ts_by_mac[location])[
136                 mac
137             ] = datetime.datetime.fromtimestamp(int(ts.strip()))
138             ip_name = ip_name.strip()
139             match = re.match(r"(\d+\.\d+\.\d+\.\d+) +\(([^\)]+)\)", ip_name)
140             if match is not None:
141                 name = match.group(2)
142                 self.names_by_mac[mac] = name
143         if cabin_count > 0:
144             logger.debug('Weird MAC at the cabin')
145             self.weird_mac_at_cabin = True
146
147     def is_anyone_in_location_now(self, location: Location) -> bool:
148         for person in Person:
149             if person is not None:
150                 loc = self.where_is_person_now(person)
151                 if location == loc:
152                     return True
153         if location == location.CABIN and self.weird_mac_at_cabin:
154             return True
155         return False
156
157     def where_is_person_now(self, name: Person) -> Location:
158         import dict_utils
159         logger.debug(f'Looking for {name}...')
160
161         if name is Person.UNKNOWN:
162             if self.weird_mac_at_cabin:
163                 return Location.CABIN
164             else:
165                 return Location.UNKNOWN
166         votes: Dict[Location, int] = {}
167         tiebreaks: Dict[Location, datetime.datetime] = {}
168         credit = 10000
169         for mac in self.devices_by_person[name]:
170             if mac not in self.names_by_mac:
171                 continue
172             mac_name = self.names_by_mac[mac]
173             logger.debug(f'Looking for {name}... check for mac {mac} ({mac_name})')
174             for location in self.location_ts_by_mac:
175                 if mac in self.location_ts_by_mac[location]:
176                     ts = (self.location_ts_by_mac[location])[mac]
177                     logger.debug(f'Seen {mac} ({mac_name}) at {location} since {ts}')
178                     tiebreaks[location] = ts
179
180             (most_recent_location, first_seen_ts) = dict_utils.item_with_max_value(tiebreaks)
181             bonus = credit
182             v = votes.get(most_recent_location, 0)
183             votes[most_recent_location] = v + bonus
184             logger.debug(f'{name}: {location} gets {bonus} votes.')
185             credit = int(
186                 credit * 0.2
187             )  # Note: list most important devices first
188             if credit <= 0:
189                 credit = 1
190         if len(votes) > 0:
191             (location, value) = dict_utils.item_with_max_value(votes)
192             if value > 2001:
193                 return location
194         return Location.UNKNOWN
195
196
197 @bootstrap.initialize
198 def main() -> None:
199     p = PresenceDetection()
200     for person in Person:
201         print(f'{person} => {p.where_is_person_now(person)}')
202     print()
203     for location in Location:
204         print(f'{location} => {p.is_anyone_in_location_now(location)}')
205
206
207 if __name__ == '__main__':
208     main()