Detect unknown persons at the cabin.
[python_utils.git] / presence.py
1 #!/usr/bin/env python3
2
3 import datetime
4 from collections import defaultdict
5 import enum
6 import logging
7 import re
8 import sys
9 from typing import Dict, List
10
11 import argparse_utils
12 import bootstrap
13 import config
14 import dict_utils
15 import exec_utils
16
17 logger = logging.getLogger(__name__)
18
19 cfg = config.add_commandline_args(
20     f"Presence Detection ({__file__})",
21     "Args related to detection of human beings in locations.",
22 )
23 cfg.add_argument(
24     "--presence_macs_file",
25     type=argparse_utils.valid_filename,
26     default = "/home/scott/cron/persisted_mac_addresses.txt",
27     metavar="FILENAME",
28     help="The location of persisted_mac_addresses.txt to use."
29 )
30
31
32 class Person(enum.Enum):
33     UNKNOWN = 0
34     SCOTT = 1
35     LYNN = 2
36     ALEX = 3
37     AARON_AND_DANA = 4
38     AARON = 4
39     DANA = 4
40
41
42 @enum.unique
43 class Location(enum.Enum):
44     UNKNOWN = 0
45     HOUSE = 1
46     CABIN = 2
47
48
49 class PresenceDetection(object):
50     def __init__(self) -> None:
51         # Note: list most important devices first.
52         self.devices_by_person: Dict[Person, List[str]] = {
53             Person.SCOTT: [
54                 "3C:28:6D:10:6D:41",
55                 "D4:61:2E:88:18:09",
56                 "6C:40:08:AE:DC:2E",
57                 "14:7D:DA:6A:20:D7",
58             ],
59             Person.LYNN: [
60                 "08:CC:27:63:26:14",
61                 "B8:31:B5:9A:4F:19",
62             ],
63             Person.ALEX: [
64                 "0C:CB:85:0C:8B:AE",
65                 "D0:C6:37:E3:36:9A",
66             ],
67             Person.AARON_AND_DANA: [
68                 "98:B6:E9:E5:5A:7C",
69                 "D6:2F:37:CA:B2:9B",
70                 "6C:E8:5C:ED:17:26",
71                 "90:E1:7B:13:7C:E5",
72                 "6E:DC:7C:75:02:1B",
73                 "B2:16:1A:93:7D:50",
74                 "18:65:90:DA:3A:35",
75                 "22:28:C8:7D:3C:85",
76                 "B2:95:23:69:91:F8",
77                 "96:69:2C:88:7A:C3",
78             ],
79         }
80         self.weird_mac_at_cabin = False
81         self.location_ts_by_mac: Dict[
82             Location, Dict[str, datetime.datetime]
83         ] = defaultdict(dict)
84         self.names_by_mac: Dict[str, str] = {}
85         persisted_macs = config.config['presence_macs_file']
86         self.read_persisted_macs_file(persisted_macs, Location.HOUSE)
87         raw = exec_utils.cmd(
88             "ssh [email protected] 'cat /home/scott/cron/persisted_mac_addresses.txt'"
89         )
90         self.parse_raw_macs_file(raw, Location.CABIN)
91
92     def read_persisted_macs_file(
93         self, filename: str, location: Location
94     ) -> None:
95         if location is Location.UNKNOWN:
96             return
97         with open(filename, "r") as rf:
98             lines = rf.read()
99         self.parse_raw_macs_file(lines, location)
100
101     def parse_raw_macs_file(self, raw: str, location: Location) -> None:
102         lines = raw.split("\n")
103
104         # CC:F4:11:D7:FA:EE, 2240, 10.0.0.22 (side_deck_high_home), Google, 1611681990
105         cabin_count = 0
106         for line in lines:
107             line = line.strip()
108             if len(line) == 0:
109                 continue
110             logger.debug(f'{location}> {line}')
111             if "cabin_" in line:
112                 continue
113             if location == Location.CABIN:
114                 cabin_count += 1
115             try:
116                 (mac, count, ip_name, mfg, ts) = line.split(",")
117             except Exception as e:
118                 logger.error(f'SKIPPED BAD LINE> {line}')
119                 logger.exception(e)
120                 continue
121             mac = mac.strip()
122             (self.location_ts_by_mac[location])[
123                 mac
124             ] = datetime.datetime.fromtimestamp(int(ts.strip()))
125             ip_name = ip_name.strip()
126             match = re.match(r"(\d+\.\d+\.\d+\.\d+) +\(([^\)]+)\)", ip_name)
127             if match is not None:
128                 name = match.group(2)
129                 self.names_by_mac[mac] = name
130         if cabin_count > 0:
131             self.weird_mac_at_cabin = True
132
133     def is_anyone_in_location_now(self, location: Location) -> bool:
134         for person in Person:
135             if person is not None:
136                 loc = self.where_is_person_now(person)
137                 if location == loc:
138                     return True
139         if location == location.CABIN and self.weird_mac_at_cabin:
140             return True
141         return False
142
143     def where_is_person_now(self, name: Person) -> Location:
144         if name is Person.UNKNOWN:
145             if self.weird_mac_at_cabin:
146                 return Location.CABIN
147             else:
148                 return Location.UNKNOWN
149         votes: Dict[Location, int] = {}
150         tiebreaks: Dict[Location, datetime.datetime] = {}
151         credit = 10000
152         for mac in self.devices_by_person[name]:
153             if mac not in self.names_by_mac:
154                 continue
155             for location in self.location_ts_by_mac:
156                 if mac in self.location_ts_by_mac[location]:
157                     ts = (self.location_ts_by_mac[location])[mac]
158                     tiebreaks[location] = ts
159             location = dict_utils.key_with_min_value(tiebreaks)
160             v = votes.get(location, 0)
161             votes[location] = v + credit
162             credit = int(
163                 credit * 0.667
164             )  # Note: list most important devices first
165             if credit <= 0:
166                 credit = 1
167         if len(votes) > 0:
168             item = dict_utils.item_with_max_value(votes)
169             return item[0]
170         return Location.UNKNOWN
171
172
173 @bootstrap.initialize
174 def main() -> None:
175     config.parse()
176     p = PresenceDetection()
177
178     for loc in Location:
179         print(f'{loc}: {p.is_anyone_in_location_now(loc)}')
180
181     for u in Person:
182         print(f'{u}: {p.where_is_person_now(u)}')
183     sys.exit(0)
184
185
186 if __name__ == '__main__':
187     main()