d7db41676a03cff60172ed2b25936f51f3b2cfa0
[python_utils.git] / presence.py
1 #!/usr/bin/env python3
2
3 import datetime
4 from collections import defaultdict
5 import enum
6 import logging
7 import re
8 from typing import Dict, List
9
10 # Note: this module is fairly early loaded.  Be aware of dependencies.
11 import argparse_utils
12 import bootstrap
13 import config
14
15 logger = logging.getLogger(__name__)
16
17 cfg = config.add_commandline_args(
18     f"Presence Detection ({__file__})",
19     "Args related to detection of human beings in locations.",
20 )
21 cfg.add_argument(
22     "--presence_macs_file",
23     type=argparse_utils.valid_filename,
24     default = "/home/scott/cron/persisted_mac_addresses.txt",
25     metavar="FILENAME",
26     help="The location of persisted_mac_addresses.txt to use."
27 )
28
29
30 class Person(enum.Enum):
31     UNKNOWN = 0
32     SCOTT = 1
33     LYNN = 2
34     ALEX = 3
35     AARON_AND_DANA = 4
36     AARON = 4
37     DANA = 4
38
39
40 @enum.unique
41 class Location(enum.Enum):
42     UNKNOWN = 0
43     HOUSE = 1
44     CABIN = 2
45
46
47 class PresenceDetection(object):
48     def __init__(self) -> None:
49         # Note: list most important devices first.
50         self.devices_by_person: Dict[Person, List[str]] = {
51             Person.SCOTT: [
52                 "3C:28:6D:10:6D:41", # pixel3
53                 "6C:40:08:AE:DC:2E", # laptop
54             ],
55             Person.LYNN: [
56                 "08:CC:27:63:26:14", # motog7
57                 "B8:31:B5:9A:4F:19", # laptop
58             ],
59             Person.ALEX: [
60                 "0C:CB:85:0C:8B:AE", # phone
61                 "D0:C6:37:E3:36:9A", # laptop
62             ],
63             Person.AARON_AND_DANA: [
64                 "98:B6:E9:E5:5A:7C",
65                 "D6:2F:37:CA:B2:9B",
66                 "6C:E8:5C:ED:17:26",
67                 "90:E1:7B:13:7C:E5",
68                 "6E:DC:7C:75:02:1B",
69                 "B2:16:1A:93:7D:50",
70                 "18:65:90:DA:3A:35",
71                 "22:28:C8:7D:3C:85",
72                 "B2:95:23:69:91:F8",
73                 "96:69:2C:88:7A:C3",
74             ],
75         }
76         self.weird_mac_at_cabin = False
77         self.location_ts_by_mac: Dict[
78             Location, Dict[str, datetime.datetime]
79         ] = defaultdict(dict)
80         self.names_by_mac: Dict[str, str] = {}
81         self.update()
82
83     def update(self) -> None:
84         from exec_utils import cmd
85         try:
86             persisted_macs = config.config['presence_macs_file']
87         except KeyError:
88             persisted_macs = '/home/scott/cron/persisted_mac_addresses.txt'
89         self.read_persisted_macs_file(persisted_macs, Location.HOUSE)
90         raw = cmd(
91             "ssh [email protected] 'cat /home/scott/cron/persisted_mac_addresses.txt'"
92         )
93         self.parse_raw_macs_file(raw, Location.CABIN)
94
95     def read_persisted_macs_file(
96         self, filename: str, location: Location
97     ) -> None:
98         if location is Location.UNKNOWN:
99             return
100         with open(filename, "r") as rf:
101             lines = rf.read()
102         self.parse_raw_macs_file(lines, location)
103
104     def parse_raw_macs_file(self, raw: str, location: Location) -> None:
105         lines = raw.split("\n")
106
107         # CC:F4:11:D7:FA:EE, 2240, 10.0.0.22 (side_deck_high_home), Google, 1611681990
108         cabin_count = 0
109         for line in lines:
110             line = line.strip()
111             if len(line) == 0:
112                 continue
113             logger.debug(f'{location}> {line}')
114             if "cabin_" in line:
115                 continue
116             if location == Location.CABIN:
117                 logger.debug('Cabin count: {cabin_count}')
118                 cabin_count += 1
119             try:
120                 (mac, count, ip_name, mfg, ts) = line.split(",")
121             except Exception as e:
122                 logger.error(f'SKIPPED BAD LINE> {line}')
123                 logger.exception(e)
124                 continue
125             mac = mac.strip()
126             (self.location_ts_by_mac[location])[
127                 mac
128             ] = datetime.datetime.fromtimestamp(int(ts.strip()))
129             ip_name = ip_name.strip()
130             match = re.match(r"(\d+\.\d+\.\d+\.\d+) +\(([^\)]+)\)", ip_name)
131             if match is not None:
132                 name = match.group(2)
133                 self.names_by_mac[mac] = name
134         if cabin_count > 0:
135             logger.debug('Weird MAC at the cabin')
136             self.weird_mac_at_cabin = True
137
138     def is_anyone_in_location_now(self, location: Location) -> bool:
139         for person in Person:
140             if person is not None:
141                 loc = self.where_is_person_now(person)
142                 if location == loc:
143                     return True
144         if location == location.CABIN and self.weird_mac_at_cabin:
145             return True
146         return False
147
148     def where_is_person_now(self, name: Person) -> Location:
149         import dict_utils
150         logger.debug(f'Looking for {name}...')
151
152         if name is Person.UNKNOWN:
153             if self.weird_mac_at_cabin:
154                 return Location.CABIN
155             else:
156                 return Location.UNKNOWN
157         votes: Dict[Location, int] = {}
158         tiebreaks: Dict[Location, datetime.datetime] = {}
159         credit = 10000
160         for mac in self.devices_by_person[name]:
161             if mac not in self.names_by_mac:
162                 continue
163             mac_name = self.names_by_mac[mac]
164             logger.debug(f'Looking for {name}... check for mac {mac} ({mac_name})')
165             for location in self.location_ts_by_mac:
166                 if mac in self.location_ts_by_mac[location]:
167                     ts = (self.location_ts_by_mac[location])[mac]
168                     logger.debug(f'Seen {mac} ({mac_name}) at {location} since {ts}')
169                     tiebreaks[location] = ts
170
171             (most_recent_location, first_seen_ts) = dict_utils.item_with_max_value(tiebreaks)
172             bonus = credit
173             v = votes.get(most_recent_location, 0)
174             votes[most_recent_location] = v + bonus
175             logger.debug(f'{name}: {location} gets {bonus} votes.')
176             credit = int(
177                 credit * 0.2
178             )  # Note: list most important devices first
179             if credit <= 0:
180                 credit = 1
181         if len(votes) > 0:
182             (location, value) = dict_utils.item_with_max_value(votes)
183             if value > 2001:
184                 return location
185         return Location.UNKNOWN
186
187
188 @bootstrap.initialize
189 def main() -> None:
190     p = PresenceDetection()
191     for person in Person:
192         print(f'{person} => {p.where_is_person_now(person)}')
193     print()
194     for location in Location:
195         print(f'{location} => {p.is_anyone_in_location_now(location)}')
196
197
198 if __name__ == '__main__':
199     main()