7 from collections import defaultdict
8 from typing import Dict, List, Optional, Set
10 # Note: this module is fairly early loaded. Be aware of dependencies.
15 from type.locations import Location
16 from type.people import Person
18 logger = logging.getLogger(__name__)
20 cfg = config.add_commandline_args(
21 f"Presence Detection ({__file__})",
22 "Args related to detection of human beings in locations.",
25 "--presence_macs_file",
26 type=argparse_utils.valid_filename,
27 default="/home/scott/cron/persisted_mac_addresses.txt",
29 help="The location of persisted_mac_addresses.txt to use.",
32 '--presence_tolerable_staleness_seconds',
33 type=argparse_utils.valid_duration,
34 default=datetime.timedelta(seconds=60 * 5),
36 help='Max acceptable age of location data before auto-refreshing',
40 class PresenceDetection(object):
41 def __init__(self) -> None:
42 # Note: list most important devices first.
43 self.devices_by_person: Dict[Person, List[str]] = {
45 "DC:E5:5B:0F:03:3D", # pixel6
46 "6C:40:08:AE:DC:2E", # laptop
49 "08:CC:27:63:26:14", # motog7
50 "B8:31:B5:9A:4F:19", # laptop
53 "0C:CB:85:0C:8B:AE", # phone
54 "D0:C6:37:E3:36:9A", # laptop
56 Person.AARON_AND_DANA: [
69 self.run_location = site_config.get_location()
70 logger.debug(f"run_location is {self.run_location}")
71 self.weird_mac_at_cabin = False
72 self.location_ts_by_mac: Dict[Location, Dict[str, datetime.datetime]] = defaultdict(dict)
73 self.names_by_mac: Dict[str, str] = {}
74 self.dark_locations: Set[Location] = set()
75 self.last_update: Optional[datetime.datetime] = None
77 def maybe_update(self) -> None:
78 if self.last_update is None:
81 now = datetime.datetime.now()
82 delta = now - self.last_update
85 > config.config['presence_tolerable_staleness_seconds'].total_seconds()
88 f"It's been {delta.total_seconds()}s since last update; refreshing now."
92 def update(self) -> None:
93 self.dark_locations = set()
94 if self.run_location is Location.HOUSE:
95 self.update_from_house()
96 elif self.run_location is Location.CABIN:
97 self.update_from_cabin()
99 raise Exception("Where the hell is this running?!")
100 self.last_update = datetime.datetime.now()
102 def update_from_house(self) -> None:
103 from exec_utils import cmd
106 persisted_macs = config.config['presence_macs_file']
108 persisted_macs = '/home/scott/cron/persisted_mac_addresses.txt'
109 self.read_persisted_macs_file(persisted_macs, Location.HOUSE)
113 timeout_seconds=10.0,
115 self.parse_raw_macs_file(raw, Location.CABIN)
116 except Exception as e:
118 msg = "Can't see the cabin right now; presence detection impared."
120 logger.warning(msg, stacklevel=2)
121 self.dark_locations.add(Location.CABIN)
123 def update_from_cabin(self) -> None:
124 from exec_utils import cmd
127 persisted_macs = config.config['presence_macs_file']
129 persisted_macs = '/home/scott/cron/persisted_mac_addresses.txt'
130 self.read_persisted_macs_file(persisted_macs, Location.CABIN)
134 timeout_seconds=10.0,
136 self.parse_raw_macs_file(raw, Location.HOUSE)
137 except Exception as e:
139 msg = "Can't see the house right now; presence detection impared."
141 warnings.warn(msg, stacklevel=2)
142 self.dark_locations.add(Location.HOUSE)
144 def read_persisted_macs_file(self, filename: str, location: Location) -> None:
145 if location is Location.UNKNOWN:
147 with open(filename, "r") as rf:
149 self.parse_raw_macs_file(lines, location)
151 def parse_raw_macs_file(self, raw: str, location: Location) -> None:
152 lines = raw.split("\n")
154 # CC:F4:11:D7:FA:EE, 2240, 10.0.0.22 (side_deck_high_home), Google, 1611681990
160 logger.debug(f'{location}> {line}')
163 if location == Location.CABIN:
164 logger.debug(f'Cabin count: {cabin_count}')
167 (mac, count, ip_name, mfg, ts) = line.split(",")
168 except Exception as e:
169 logger.error(f'SKIPPED BAD LINE> {line}')
173 (self.location_ts_by_mac[location])[mac] = datetime.datetime.fromtimestamp(
176 ip_name = ip_name.strip()
177 match = re.match(r"(\d+\.\d+\.\d+\.\d+) +\(([^\)]+)\)", ip_name)
178 if match is not None:
179 name = match.group(2)
180 self.names_by_mac[mac] = name
182 logger.debug('Weird MAC at the cabin')
183 self.weird_mac_at_cabin = True
185 def is_anyone_in_location_now(self, location: Location) -> bool:
187 if location in self.dark_locations:
188 raise Exception(f"Can't see {location} right now; answer undefined.")
189 for person in Person:
190 if person is not None:
191 loc = self.where_is_person_now(person)
194 if location == location.CABIN and self.weird_mac_at_cabin:
198 def where_is_person_now(self, name: Person) -> Location:
200 if len(self.dark_locations) > 0:
201 msg = f"Can't see {self.dark_locations} right now; answer confidence impacted"
203 warnings.warn(msg, stacklevel=2)
204 logger.debug(f'Looking for {name}...')
206 if name is Person.UNKNOWN:
207 if self.weird_mac_at_cabin:
208 return Location.CABIN
210 return Location.UNKNOWN
214 votes: Dict[Location, int] = {}
215 tiebreaks: Dict[Location, datetime.datetime] = {}
217 for mac in self.devices_by_person[name]:
218 if mac not in self.names_by_mac:
220 mac_name = self.names_by_mac[mac]
221 logger.debug(f'Looking for {name}... check for mac {mac} ({mac_name})')
222 for location in self.location_ts_by_mac:
223 if mac in self.location_ts_by_mac[location]:
224 ts = (self.location_ts_by_mac[location])[mac]
225 logger.debug(f'Seen {mac} ({mac_name}) at {location} since {ts}')
226 tiebreaks[location] = ts
229 most_recent_location,
231 ) = dict_utils.item_with_max_value(tiebreaks)
233 v = votes.get(most_recent_location, 0)
234 votes[most_recent_location] = v + bonus
235 logger.debug(f'{name}: {location} gets {bonus} votes.')
236 credit = int(credit * 0.2) # Note: list most important devices first
240 (location, value) = dict_utils.item_with_max_value(votes)
243 return Location.UNKNOWN
246 @bootstrap.initialize
248 p = PresenceDetection()
249 for person in Person:
250 print(f'{person} => {p.where_is_person_now(person)}')
254 # for location in Location:
255 # print(f'{location} => {p.is_anyone_in_location_now(location)}')
258 if __name__ == '__main__':