#!/usr/bin/env python3 # © Copyright 2021-2022, Scott Gasch """This is a module dealing with trying to guess a person's location based on the location of certain devices (e.g. phones, laptops) belonging to that person. It works with networks I run that log device MAC addresses active. """ import datetime import logging import re import warnings from collections import defaultdict from typing import Dict, List, Optional, Set # Note: this module is fairly early loaded. Be aware of dependencies. import argparse_utils import bootstrap import config import site_config from type.locations import Location from type.people import Person logger = logging.getLogger(__name__) cfg = config.add_commandline_args( f"Presence Detection ({__file__})", "Args related to detection of human beings in locations.", ) cfg.add_argument( "--presence_macs_file", type=str, default="/home/scott/cron/persisted_mac_addresses.txt", metavar="FILENAME", help="The location of persisted_mac_addresses.txt to use.", ) cfg.add_argument( '--presence_tolerable_staleness_seconds', type=argparse_utils.valid_duration, default=datetime.timedelta(seconds=60 * 5), metavar='DURATION', help='Max acceptable age of location data before auto-refreshing', ) class PresenceDetection(object): """This is a module dealing with trying to guess a person's location based on the location of certain devices (e.g. phones, laptops) belonging to that person. It works with networks I run that log device MAC addresses active. """ def __init__(self) -> None: """C'tor""" # Note: list most important devices first. self.devices_by_person: Dict[Person, List[str]] = { Person.SCOTT: [ "DC:E5:5B:0F:03:3D", # pixel6 "6C:40:08:AE:DC:2E", # laptop ], Person.LYNN: [ "08:CC:27:63:26:14", # motog7 "B8:31:B5:9A:4F:19", # laptop ], Person.ALEX: [ "0C:CB:85:0C:8B:AE", # phone "D0:C6:37:E3:36:9A", # laptop ], Person.AARON_AND_DANA: [ "98:B6:E9:E5:5A:7C", "D6:2F:37:CA:B2:9B", "6C:E8:5C:ED:17:26", "90:E1:7B:13:7C:E5", "6E:DC:7C:75:02:1B", "B2:16:1A:93:7D:50", "18:65:90:DA:3A:35", "22:28:C8:7D:3C:85", "B2:95:23:69:91:F8", "96:69:2C:88:7A:C3", ], } self.run_location = site_config.get_location() logger.debug("base_presence run_location is %s", self.run_location) self.weird_mac_at_cabin = False self.location_ts_by_mac: Dict[Location, Dict[str, datetime.datetime]] = defaultdict(dict) self.names_by_mac: Dict[str, str] = {} self.dark_locations: Set[Location] = set() self.last_update: Optional[datetime.datetime] = None def maybe_update(self) -> None: """Determine if our state is stale and needs to be updated and do it, if so. """ if self.last_update is None: self.update() else: now = datetime.datetime.now() delta = now - self.last_update if ( delta.total_seconds() > config.config['presence_tolerable_staleness_seconds'].total_seconds() ): logger.debug( "It's been %ss since last update; refreshing now.", delta.total_seconds() ) self.update() def update(self) -> None: """Unconditionally update our state.""" self.dark_locations = set() self._update_house() self._update_cabin() self.last_update = datetime.datetime.now() @staticmethod def _get_raw_data_via_ssh(location: Location) -> Optional[str]: from exec_utils import cmd canonical = { Location.HOUSE: 'scott@wannabe.house', Location.CABIN: 'scott@meerkat.cabin', } try: return cmd( f"ssh {canonical[location]} 'cat /home/scott/cron/persisted_mac_addresses.txt'", timeout_seconds=30.0, ) except Exception: return None def _get_raw_data(self, location: Location) -> Optional[str]: from os.path import exists if self.run_location == location: persisted_macs = config.config.get( 'presence_macs_file', '/home/scott/cron/persisted_mac_addresses.txt' ) if exists(persisted_macs): with open(persisted_macs, 'r') as rf: return rf.read() else: return PresenceDetection._get_raw_data_via_ssh(location) else: return PresenceDetection._get_raw_data_via_ssh(location) return None def _update_house(self) -> None: data = self._get_raw_data(Location.HOUSE) if data: self._parse_raw_macs_file(data, Location.HOUSE) else: msg = "Can't see the house right now; presence detection impared." warnings.warn(msg) logger.warning(msg, stacklevel=2) self.dark_locations.add(Location.HOUSE) def _update_cabin(self) -> None: data = self._get_raw_data(Location.CABIN) if data: self._parse_raw_macs_file(data, Location.CABIN) else: msg = "Can't see the cabin right now; presence detection impared." warnings.warn(msg) logger.warning(msg, stacklevel=2) self.dark_locations.add(Location.CABIN) def _parse_raw_macs_file(self, raw: str, location: Location) -> None: """Internal method that parses the contents of the MACs file.""" lines = raw.split("\n") # CC:F4:11:D7:FA:EE, 2240, 10.0.0.22 (side_deck_high_home), Google, 1611681990 cabin_count = 0 for line in lines: line = line.strip() if 'using fake authentication data for X11' in line: continue if len(line) == 0: continue logger.debug('%s> %s', location, line) if "cabin_" in line: continue if location == Location.CABIN: logger.debug('Cabin count: %d', cabin_count) cabin_count += 1 try: (mac, _, ip_name, _, ts) = line.split(",") # type: ignore except Exception as e: logger.exception(e) logger.error('SKIPPED BAD LINE> %s', line) continue mac = mac.strip() (self.location_ts_by_mac[location])[mac] = datetime.datetime.fromtimestamp( int(ts.strip()) ) ip_name = ip_name.strip() match = re.match(r"(\d+\.\d+\.\d+\.\d+) +\(([^\)]+)\)", ip_name) if match is not None: name = match.group(2) self.names_by_mac[mac] = name if cabin_count > 0: logger.debug('Weird MAC at the cabin') self.weird_mac_at_cabin = True def is_anyone_in_location_now(self, location: Location) -> bool: """Determine if anyone is in a given location based on the presence of MAC files seen recently on the network. Args: location: the location in question Returns: True if someone is detected or False otherwise. """ self.maybe_update() if location in self.dark_locations: raise Exception(f"Can't see {location} right now; answer undefined.") for person in Person: if person is not None: loc = self.where_is_person_now(person) if location == loc: return True if location == location.CABIN and self.weird_mac_at_cabin: return True return False def where_is_person_now(self, name: Person) -> Location: """Given a person, see if we can determine their location based on network MAC addresses. Args: name: The person we're looking for. Returns: The Location where we think they are (including UNKNOWN). """ self.maybe_update() if len(self.dark_locations) > 0: msg = f"Can't see {self.dark_locations} right now; answer confidence impacted" logger.warning(msg) warnings.warn(msg, stacklevel=2) logger.debug('Looking for %s...', name) if name is Person.UNKNOWN: if self.weird_mac_at_cabin: return Location.CABIN else: return Location.UNKNOWN import dict_utils votes: Dict[Location, int] = {} tiebreaks: Dict[Location, datetime.datetime] = {} credit = 10000 location = None for mac in self.devices_by_person[name]: if mac not in self.names_by_mac: continue mac_name = self.names_by_mac[mac] logger.debug('Looking for %s... check for mac %s (%s)', name, mac, mac_name) for location in self.location_ts_by_mac: if mac in self.location_ts_by_mac[location]: ts = (self.location_ts_by_mac[location])[mac] logger.debug('Seen %s (%s) at %s since %s', mac, mac_name, location, ts) tiebreaks[location] = ts ( most_recent_location, _, ) = dict_utils.item_with_max_value(tiebreaks) bonus = credit v = votes.get(most_recent_location, 0) votes[most_recent_location] = v + bonus logger.debug('%s: %s gets %d votes.', name, most_recent_location, bonus) credit = int(credit * 0.2) # Note: list most important devices first if credit <= 0: credit = 1 if len(votes) > 0: (location, value) = dict_utils.item_with_max_value(votes) if value > 2001: assert location return location return Location.UNKNOWN @bootstrap.initialize def main() -> None: p = PresenceDetection() for person in Person: print(f'{person} => {p.where_is_person_now(person)}') print() for location in Location: print(f'{location} => {p.is_anyone_in_location_now(location)}') if __name__ == '__main__': main()