4 from collections import defaultdict
7 from typing import Dict, List, Set
9 # Note: this module is fairly early loaded. Be aware of dependencies.
13 from locations import Location
14 from people import Person
18 logger = logging.getLogger(__name__)
20 cfg = config.add_commandline_args(
21 f"Presence Detection ({__file__})",
22 "Args related to detection of human beings in locations.",
25 "--presence_macs_file",
26 type=argparse_utils.valid_filename,
27 default = "/home/scott/cron/persisted_mac_addresses.txt",
29 help="The location of persisted_mac_addresses.txt to use."
33 class PresenceDetection(object):
34 def __init__(self) -> None:
35 # Note: list most important devices first.
36 self.devices_by_person: Dict[Person, List[str]] = {
38 "3C:28:6D:10:6D:41", # pixel3
39 "6C:40:08:AE:DC:2E", # laptop
42 "08:CC:27:63:26:14", # motog7
43 "B8:31:B5:9A:4F:19", # laptop
46 "0C:CB:85:0C:8B:AE", # phone
47 "D0:C6:37:E3:36:9A", # laptop
49 Person.AARON_AND_DANA: [
62 self.run_location = site_config.get_location()
63 logger.debug(f"run_location is {self.run_location}")
64 self.weird_mac_at_cabin = False
65 self.location_ts_by_mac: Dict[
66 Location, Dict[str, datetime.datetime]
68 self.names_by_mac: Dict[str, str] = {}
69 self.dark_locations: Set[Location] = set()
72 def update(self) -> None:
73 self.dark_locations = set()
74 if self.run_location is Location.HOUSE:
75 self.update_from_house()
76 elif self.run_location is Location.CABIN:
77 self.update_from_cabin()
79 raise Exception("Where the hell is this running?!")
81 def update_from_house(self) -> None:
82 from exec_utils import cmd_with_timeout
84 persisted_macs = config.config['presence_macs_file']
86 persisted_macs = '/home/scott/cron/persisted_mac_addresses.txt'
87 self.read_persisted_macs_file(persisted_macs, Location.HOUSE)
89 raw = cmd_with_timeout(
93 self.parse_raw_macs_file(raw, Location.CABIN)
94 except Exception as e:
96 logger.warning("Can't see the cabin right now; presence detection impared.")
97 self.dark_locations.add(Location.CABIN)
99 def update_from_cabin(self) -> None:
100 from exec_utils import cmd_with_timeout
102 persisted_macs = config.config['presence_macs_file']
104 persisted_macs = '/home/scott/cron/persisted_mac_addresses.txt'
105 self.read_persisted_macs_file(persisted_macs, Location.CABIN)
107 raw = cmd_with_timeout(
109 timeout_seconds=10.0,
111 self.parse_raw_macs_file(raw, Location.HOUSE)
112 except Exception as e:
114 logger.warning(f"Can't see the house right now; presence detection impared.")
115 self.dark_locations.add(Location.HOUSE)
117 def read_persisted_macs_file(
118 self, filename: str, location: Location
120 if location is Location.UNKNOWN:
122 with open(filename, "r") as rf:
124 self.parse_raw_macs_file(lines, location)
126 def parse_raw_macs_file(self, raw: str, location: Location) -> None:
127 lines = raw.split("\n")
129 # CC:F4:11:D7:FA:EE, 2240, 10.0.0.22 (side_deck_high_home), Google, 1611681990
135 logger.debug(f'{location}> {line}')
138 if location == Location.CABIN:
139 logger.debug('Cabin count: {cabin_count}')
142 (mac, count, ip_name, mfg, ts) = line.split(",")
143 except Exception as e:
144 logger.error(f'SKIPPED BAD LINE> {line}')
148 (self.location_ts_by_mac[location])[
150 ] = datetime.datetime.fromtimestamp(int(ts.strip()))
151 ip_name = ip_name.strip()
152 match = re.match(r"(\d+\.\d+\.\d+\.\d+) +\(([^\)]+)\)", ip_name)
153 if match is not None:
154 name = match.group(2)
155 self.names_by_mac[mac] = name
157 logger.debug('Weird MAC at the cabin')
158 self.weird_mac_at_cabin = True
160 def is_anyone_in_location_now(self, location: Location) -> bool:
161 if location in self.dark_locations:
162 raise Exception("Can't see {location} right now; answer undefined.")
163 for person in Person:
164 if person is not None:
165 loc = self.where_is_person_now(person)
168 if location == location.CABIN and self.weird_mac_at_cabin:
172 def where_is_person_now(self, name: Person) -> Location:
174 if len(self.dark_locations) > 0:
176 f"Can't see {self.dark_locations} right now; answer confidence impacted"
178 logger.debug(f'Looking for {name}...')
180 if name is Person.UNKNOWN:
181 if self.weird_mac_at_cabin:
182 return Location.CABIN
184 return Location.UNKNOWN
185 votes: Dict[Location, int] = {}
186 tiebreaks: Dict[Location, datetime.datetime] = {}
188 for mac in self.devices_by_person[name]:
189 if mac not in self.names_by_mac:
191 mac_name = self.names_by_mac[mac]
192 logger.debug(f'Looking for {name}... check for mac {mac} ({mac_name})')
193 for location in self.location_ts_by_mac:
194 if mac in self.location_ts_by_mac[location]:
195 ts = (self.location_ts_by_mac[location])[mac]
196 logger.debug(f'Seen {mac} ({mac_name}) at {location} since {ts}')
197 tiebreaks[location] = ts
199 (most_recent_location, first_seen_ts) = dict_utils.item_with_max_value(tiebreaks)
201 v = votes.get(most_recent_location, 0)
202 votes[most_recent_location] = v + bonus
203 logger.debug(f'{name}: {location} gets {bonus} votes.')
206 ) # Note: list most important devices first
210 (location, value) = dict_utils.item_with_max_value(votes)
213 return Location.UNKNOWN
216 @bootstrap.initialize
218 p = PresenceDetection()
219 for person in Person:
220 print(f'{person} => {p.where_is_person_now(person)}')
222 for location in Location:
223 print(f'{location} => {p.is_anyone_in_location_now(location)}')
226 if __name__ == '__main__':