3 # © Copyright 2021-2022, Scott Gasch
5 """This is a module dealing with trying to guess a person's location
6 based on the location of certain devices (e.g. phones, laptops)
7 belonging to that person. It works with networks I run that log
8 device MAC addresses active.
16 from collections import defaultdict
17 from typing import Dict, List, Optional, Set
19 # Note: this module is fairly early loaded. Be aware of dependencies.
24 from type.locations import Location
25 from type.people import Person
27 logger = logging.getLogger(__name__)
29 cfg = config.add_commandline_args(
30 f"Presence Detection ({__file__})",
31 "Args related to detection of human beings in locations.",
34 "--presence_macs_file",
35 type=argparse_utils.valid_filename,
36 default="/home/scott/cron/persisted_mac_addresses.txt",
38 help="The location of persisted_mac_addresses.txt to use.",
41 '--presence_tolerable_staleness_seconds',
42 type=argparse_utils.valid_duration,
43 default=datetime.timedelta(seconds=60 * 5),
45 help='Max acceptable age of location data before auto-refreshing',
49 class PresenceDetection(object):
50 """This is a module dealing with trying to guess a person's location
51 based on the location of certain devices (e.g. phones, laptops)
52 belonging to that person. It works with networks I run that log
53 device MAC addresses active.
56 def __init__(self) -> None:
59 # Note: list most important devices first.
60 self.devices_by_person: Dict[Person, List[str]] = {
62 "DC:E5:5B:0F:03:3D", # pixel6
63 "6C:40:08:AE:DC:2E", # laptop
66 "08:CC:27:63:26:14", # motog7
67 "B8:31:B5:9A:4F:19", # laptop
70 "0C:CB:85:0C:8B:AE", # phone
71 "D0:C6:37:E3:36:9A", # laptop
73 Person.AARON_AND_DANA: [
86 self.run_location = site_config.get_location()
87 logger.debug("base_presence run_location is %s", self.run_location)
88 self.weird_mac_at_cabin = False
89 self.location_ts_by_mac: Dict[Location, Dict[str, datetime.datetime]] = defaultdict(dict)
90 self.names_by_mac: Dict[str, str] = {}
91 self.dark_locations: Set[Location] = set()
92 self.last_update: Optional[datetime.datetime] = None
94 def maybe_update(self) -> None:
95 """Determine if our state is stale and needs to be updated and do
99 if self.last_update is None:
102 now = datetime.datetime.now()
103 delta = now - self.last_update
105 delta.total_seconds()
106 > config.config['presence_tolerable_staleness_seconds'].total_seconds()
109 "It's been %ss since last update; refreshing now.", delta.total_seconds()
113 def update(self) -> None:
114 """Unconditionally update our state."""
116 self.dark_locations = set()
117 if self.run_location is Location.HOUSE:
118 self._update_from_house()
119 elif self.run_location is Location.CABIN:
120 self._update_from_cabin()
122 raise Exception("Where the hell is this running?!")
123 self.last_update = datetime.datetime.now()
125 def _update_from_house(self) -> None:
126 """Internal method for updating from code running on the house
129 from exec_utils import cmd
132 persisted_macs = config.config['presence_macs_file']
134 persisted_macs = '/home/scott/cron/persisted_mac_addresses.txt'
135 self._read_persisted_macs_file(persisted_macs, Location.HOUSE)
139 timeout_seconds=20.0,
141 self._parse_raw_macs_file(raw, Location.CABIN)
142 except Exception as e:
144 msg = "Can't see the cabin right now; presence detection impared."
146 logger.warning(msg, stacklevel=2)
147 self.dark_locations.add(Location.CABIN)
149 def _update_from_cabin(self) -> None:
150 """Internal method for updating from code running on the cabing
153 from exec_utils import cmd
156 persisted_macs = config.config['presence_macs_file']
158 persisted_macs = '/home/scott/cron/persisted_mac_addresses.txt'
159 self._read_persisted_macs_file(persisted_macs, Location.CABIN)
163 timeout_seconds=10.0,
165 self._parse_raw_macs_file(raw, Location.HOUSE)
166 except Exception as e:
168 msg = "Can't see the house right now; presence detection impared."
170 warnings.warn(msg, stacklevel=2)
171 self.dark_locations.add(Location.HOUSE)
173 def _read_persisted_macs_file(self, filename: str, location: Location) -> None:
174 """Internal method that, Given a filename that contains MAC addresses
175 seen on the network recently, reads it in and calls
176 _parse_raw_macs_file with the contents.
179 filename: The name of the file to read
180 location: The location we're reading from
183 if location is Location.UNKNOWN:
185 with open(filename, "r") as rf:
187 self._parse_raw_macs_file(lines, location)
189 def _parse_raw_macs_file(self, raw: str, location: Location) -> None:
190 """Internal method that parses the contents of the MACs file."""
192 lines = raw.split("\n")
194 # CC:F4:11:D7:FA:EE, 2240, 10.0.0.22 (side_deck_high_home), Google, 1611681990
200 logger.debug('%s> %s', location, line)
203 if location == Location.CABIN:
204 logger.debug('Cabin count: %d', cabin_count)
207 (mac, _, ip_name, _, ts) = line.split(",") # type: ignore
208 except Exception as e:
210 logger.error('SKIPPED BAD LINE> %s', line)
213 (self.location_ts_by_mac[location])[mac] = datetime.datetime.fromtimestamp(
216 ip_name = ip_name.strip()
217 match = re.match(r"(\d+\.\d+\.\d+\.\d+) +\(([^\)]+)\)", ip_name)
218 if match is not None:
219 name = match.group(2)
220 self.names_by_mac[mac] = name
222 logger.debug('Weird MAC at the cabin')
223 self.weird_mac_at_cabin = True
225 def is_anyone_in_location_now(self, location: Location) -> bool:
226 """Determine if anyone is in a given location based on the presence of
227 MAC files seen recently on the network.
230 location: the location in question
233 True if someone is detected or False otherwise.
237 if location in self.dark_locations:
238 raise Exception(f"Can't see {location} right now; answer undefined.")
239 for person in Person:
240 if person is not None:
241 loc = self.where_is_person_now(person)
244 if location == location.CABIN and self.weird_mac_at_cabin:
248 def where_is_person_now(self, name: Person) -> Location:
249 """Given a person, see if we can determine their location based on
250 network MAC addresses.
253 name: The person we're looking for.
256 The Location where we think they are (including UNKNOWN).
260 if len(self.dark_locations) > 0:
261 msg = f"Can't see {self.dark_locations} right now; answer confidence impacted"
263 warnings.warn(msg, stacklevel=2)
264 logger.debug('Looking for %s...', name)
266 if name is Person.UNKNOWN:
267 if self.weird_mac_at_cabin:
268 return Location.CABIN
270 return Location.UNKNOWN
274 votes: Dict[Location, int] = {}
275 tiebreaks: Dict[Location, datetime.datetime] = {}
278 for mac in self.devices_by_person[name]:
279 if mac not in self.names_by_mac:
281 mac_name = self.names_by_mac[mac]
282 logger.debug('Looking for %s... check for mac %s (%s)', name, mac, mac_name)
283 for location in self.location_ts_by_mac:
284 if mac in self.location_ts_by_mac[location]:
285 ts = (self.location_ts_by_mac[location])[mac]
286 logger.debug('Seen %s (%s) at %s since %s', mac, mac_name, location, ts)
287 tiebreaks[location] = ts
290 most_recent_location,
292 ) = dict_utils.item_with_max_value(tiebreaks)
294 v = votes.get(most_recent_location, 0)
295 votes[most_recent_location] = v + bonus
296 logger.debug('%s: %s gets %d votes.', name, most_recent_location, bonus)
297 credit = int(credit * 0.2) # Note: list most important devices first
301 (location, value) = dict_utils.item_with_max_value(votes)
305 return Location.UNKNOWN
308 @bootstrap.initialize
310 p = PresenceDetection()
311 for person in Person:
312 print(f'{person} => {p.where_is_person_now(person)}')
314 for location in Location:
315 print(f'{location} => {p.is_anyone_in_location_now(location)}')
318 if __name__ == '__main__':