c69712480826253b95979013a527bad6741c5901
[python_utils.git] / presence.py
1 #!/usr/bin/env python3
2
3 import datetime
4 from collections import defaultdict
5 import enum
6 import logging
7 import re
8 from typing import Dict, List
9
10 import argparse_utils
11 import bootstrap
12 import config
13
14 logger = logging.getLogger(__name__)
15
16 cfg = config.add_commandline_args(
17     f"Presence Detection ({__file__})",
18     "Args related to detection of human beings in locations.",
19 )
20 cfg.add_argument(
21     "--presence_macs_file",
22     type=argparse_utils.valid_filename,
23     default = "/home/scott/cron/persisted_mac_addresses.txt",
24     metavar="FILENAME",
25     help="The location of persisted_mac_addresses.txt to use."
26 )
27
28
29 class Person(enum.Enum):
30     UNKNOWN = 0
31     SCOTT = 1
32     LYNN = 2
33     ALEX = 3
34     AARON_AND_DANA = 4
35     AARON = 4
36     DANA = 4
37
38
39 @enum.unique
40 class Location(enum.Enum):
41     UNKNOWN = 0
42     HOUSE = 1
43     CABIN = 2
44
45
46 class PresenceDetection(object):
47     def __init__(self) -> None:
48         # Note: list most important devices first.
49         self.devices_by_person: Dict[Person, List[str]] = {
50             Person.SCOTT: [
51                 "3C:28:6D:10:6D:41",
52                 "D4:61:2E:88:18:09",
53                 "6C:40:08:AE:DC:2E",
54                 "14:7D:DA:6A:20:D7",
55             ],
56             Person.LYNN: [
57                 "08:CC:27:63:26:14",
58                 "B8:31:B5:9A:4F:19",
59             ],
60             Person.ALEX: [
61                 "0C:CB:85:0C:8B:AE",
62                 "D0:C6:37:E3:36:9A",
63             ],
64             Person.AARON_AND_DANA: [
65                 "98:B6:E9:E5:5A:7C",
66                 "D6:2F:37:CA:B2:9B",
67                 "6C:E8:5C:ED:17:26",
68                 "90:E1:7B:13:7C:E5",
69                 "6E:DC:7C:75:02:1B",
70                 "B2:16:1A:93:7D:50",
71                 "18:65:90:DA:3A:35",
72                 "22:28:C8:7D:3C:85",
73                 "B2:95:23:69:91:F8",
74                 "96:69:2C:88:7A:C3",
75             ],
76         }
77         self.weird_mac_at_cabin = False
78         self.location_ts_by_mac: Dict[
79             Location, Dict[str, datetime.datetime]
80         ] = defaultdict(dict)
81         self.names_by_mac: Dict[str, str] = {}
82         self.update()
83
84     def update(self) -> None:
85         from exec_utils import cmd
86         persisted_macs = config.config['presence_macs_file']
87         self.read_persisted_macs_file(persisted_macs, Location.HOUSE)
88         raw = cmd(
89             "ssh [email protected] 'cat /home/scott/cron/persisted_mac_addresses.txt'"
90         )
91         self.parse_raw_macs_file(raw, Location.CABIN)
92
93     def read_persisted_macs_file(
94         self, filename: str, location: Location
95     ) -> None:
96         if location is Location.UNKNOWN:
97             return
98         with open(filename, "r") as rf:
99             lines = rf.read()
100         self.parse_raw_macs_file(lines, location)
101
102     def parse_raw_macs_file(self, raw: str, location: Location) -> None:
103         lines = raw.split("\n")
104
105         # CC:F4:11:D7:FA:EE, 2240, 10.0.0.22 (side_deck_high_home), Google, 1611681990
106         cabin_count = 0
107         for line in lines:
108             line = line.strip()
109             if len(line) == 0:
110                 continue
111             logger.debug(f'{location}> {line}')
112             if "cabin_" in line:
113                 continue
114             if location == Location.CABIN:
115                 logger.debug('Cabin count: {cabin_count}')
116                 cabin_count += 1
117             try:
118                 (mac, count, ip_name, mfg, ts) = line.split(",")
119             except Exception as e:
120                 logger.error(f'SKIPPED BAD LINE> {line}')
121                 logger.exception(e)
122                 continue
123             mac = mac.strip()
124             (self.location_ts_by_mac[location])[
125                 mac
126             ] = datetime.datetime.fromtimestamp(int(ts.strip()))
127             ip_name = ip_name.strip()
128             match = re.match(r"(\d+\.\d+\.\d+\.\d+) +\(([^\)]+)\)", ip_name)
129             if match is not None:
130                 name = match.group(2)
131                 self.names_by_mac[mac] = name
132         if cabin_count > 0:
133             logger.debug('Weird MAC at the cabin')
134             self.weird_mac_at_cabin = True
135
136     def is_anyone_in_location_now(self, location: Location) -> bool:
137         for person in Person:
138             if person is not None:
139                 loc = self.where_is_person_now(person)
140                 if location == loc:
141                     return True
142         if location == location.CABIN and self.weird_mac_at_cabin:
143             return True
144         return False
145
146     def where_is_person_now(self, name: Person) -> Location:
147         import dict_utils
148
149         if name is Person.UNKNOWN:
150             if self.weird_mac_at_cabin:
151                 return Location.CABIN
152             else:
153                 return Location.UNKNOWN
154         votes: Dict[Location, int] = {}
155         tiebreaks: Dict[Location, datetime.datetime] = {}
156         credit = 10000
157         for mac in self.devices_by_person[name]:
158             logger.debug(f'Looking for {name}... check for mac {mac}')
159             if mac not in self.names_by_mac:
160                 continue
161             for location in self.location_ts_by_mac:
162                 if mac in self.location_ts_by_mac[location]:
163                     ts = (self.location_ts_by_mac[location])[mac]
164                     logger.debug(f'I saw {mac} at {location} at {ts}')
165                     tiebreaks[location] = ts
166             location = dict_utils.key_with_min_value(tiebreaks)
167             v = votes.get(location, 0)
168             votes[location] = v + credit
169             logger.debug('{name}: {location} gets {credit} votes.')
170             credit = int(
171                 credit * 0.667
172             )  # Note: list most important devices first
173             if credit <= 0:
174                 credit = 1
175         if len(votes) > 0:
176             item = dict_utils.item_with_max_value(votes)
177             return item[0]
178         return Location.UNKNOWN
179
180
181 @bootstrap.initialize
182 def main() -> None:
183     p = PresenceDetection()
184     for person in Person:
185         print(f'{person} => {p.where_is_person_now(person)}')
186     print()
187     for location in Location:
188         print(f'{location} => {p.is_anyone_in_location_now(location)}')
189
190
191 if __name__ == '__main__':
192     main()