#!/usr/bin/env python3 import datetime from collections import defaultdict import logging import re from typing import Dict, List, Set import warnings # Note: this module is fairly early loaded. Be aware of dependencies. import argparse_utils import bootstrap import config from type.locations import Location from type.people import Person import site_config logger = logging.getLogger(__name__) cfg = config.add_commandline_args( f"Presence Detection ({__file__})", "Args related to detection of human beings in locations.", ) cfg.add_argument( "--presence_macs_file", type=argparse_utils.valid_filename, default="/home/scott/cron/persisted_mac_addresses.txt", metavar="FILENAME", help="The location of persisted_mac_addresses.txt to use.", ) cfg.add_argument( '--presence_tolerable_staleness_seconds', type=argparse_utils.valid_duration, default=datetime.timedelta(seconds=60 * 5), metavar='DURATION', help='Max acceptable age of location data before auto-refreshing', ) class PresenceDetection(object): def __init__(self) -> None: # Note: list most important devices first. self.devices_by_person: Dict[Person, List[str]] = { Person.SCOTT: [ "DC:E5:5B:0F:03:3D", # pixel6 "6C:40:08:AE:DC:2E", # laptop ], Person.LYNN: [ "08:CC:27:63:26:14", # motog7 "B8:31:B5:9A:4F:19", # laptop ], Person.ALEX: [ "0C:CB:85:0C:8B:AE", # phone "D0:C6:37:E3:36:9A", # laptop ], Person.AARON_AND_DANA: [ "98:B6:E9:E5:5A:7C", "D6:2F:37:CA:B2:9B", "6C:E8:5C:ED:17:26", "90:E1:7B:13:7C:E5", "6E:DC:7C:75:02:1B", "B2:16:1A:93:7D:50", "18:65:90:DA:3A:35", "22:28:C8:7D:3C:85", "B2:95:23:69:91:F8", "96:69:2C:88:7A:C3", ], } self.run_location = site_config.get_location() logger.debug(f"run_location is {self.run_location}") self.weird_mac_at_cabin = False self.location_ts_by_mac: Dict[ Location, Dict[str, datetime.datetime] ] = defaultdict(dict) self.names_by_mac: Dict[str, str] = {} self.dark_locations: Set[Location] = set() self.last_update = None def maybe_update(self) -> None: if self.last_update is None: self.update() else: now = datetime.datetime.now() delta = now - self.last_update if ( delta.total_seconds() > config.config['presence_tolerable_staleness_seconds'].total_seconds() ): logger.debug( f"It's been {delta.total_seconds()}s since last update; refreshing now." ) self.update() def update(self) -> None: self.dark_locations = set() if self.run_location is Location.HOUSE: self.update_from_house() elif self.run_location is Location.CABIN: self.update_from_cabin() else: raise Exception("Where the hell is this running?!") self.last_update = datetime.datetime.now() def update_from_house(self) -> None: from exec_utils import cmd try: persisted_macs = config.config['presence_macs_file'] except KeyError: persisted_macs = '/home/scott/cron/persisted_mac_addresses.txt' self.read_persisted_macs_file(persisted_macs, Location.HOUSE) try: raw = cmd( "ssh scott@meerkat.cabin 'cat /home/scott/cron/persisted_mac_addresses.txt'", timeout_seconds=10.0, ) self.parse_raw_macs_file(raw, Location.CABIN) except Exception as e: logger.exception(e) msg = "Can't see the cabin right now; presence detection impared." warnings.warn(msg) logger.warning(msg, stacklevel=2) self.dark_locations.add(Location.CABIN) def update_from_cabin(self) -> None: from exec_utils import cmd try: persisted_macs = config.config['presence_macs_file'] except KeyError: persisted_macs = '/home/scott/cron/persisted_mac_addresses.txt' self.read_persisted_macs_file(persisted_macs, Location.CABIN) try: raw = cmd( "ssh scott@wennabe.house 'cat /home/scott/cron/persisted_mac_addresses.txt'", timeout_seconds=10.0, ) self.parse_raw_macs_file(raw, Location.HOUSE) except Exception as e: logger.exception(e) msg = "Can't see the house right now; presence detection impared." logger.warning(msg) warnings.warn(msg, stacklevel=2) self.dark_locations.add(Location.HOUSE) def read_persisted_macs_file(self, filename: str, location: Location) -> None: if location is Location.UNKNOWN: return with open(filename, "r") as rf: lines = rf.read() self.parse_raw_macs_file(lines, location) def parse_raw_macs_file(self, raw: str, location: Location) -> None: lines = raw.split("\n") # CC:F4:11:D7:FA:EE, 2240, 10.0.0.22 (side_deck_high_home), Google, 1611681990 cabin_count = 0 for line in lines: line = line.strip() if len(line) == 0: continue logger.debug(f'{location}> {line}') if "cabin_" in line: continue if location == Location.CABIN: logger.debug(f'Cabin count: {cabin_count}') cabin_count += 1 try: (mac, count, ip_name, mfg, ts) = line.split(",") except Exception as e: logger.error(f'SKIPPED BAD LINE> {line}') logger.exception(e) continue mac = mac.strip() (self.location_ts_by_mac[location])[mac] = datetime.datetime.fromtimestamp( int(ts.strip()) ) ip_name = ip_name.strip() match = re.match(r"(\d+\.\d+\.\d+\.\d+) +\(([^\)]+)\)", ip_name) if match is not None: name = match.group(2) self.names_by_mac[mac] = name if cabin_count > 0: logger.debug('Weird MAC at the cabin') self.weird_mac_at_cabin = True def is_anyone_in_location_now(self, location: Location) -> bool: self.maybe_update() if location in self.dark_locations: raise Exception(f"Can't see {location} right now; answer undefined.") for person in Person: if person is not None: loc = self.where_is_person_now(person) if location == loc: return True if location == location.CABIN and self.weird_mac_at_cabin: return True return False def where_is_person_now(self, name: Person) -> Location: self.maybe_update() if len(self.dark_locations) > 0: msg = ( f"Can't see {self.dark_locations} right now; answer confidence impacted" ) logger.warning(msg) warnings.warn(msg, stacklevel=2) logger.debug(f'Looking for {name}...') if name is Person.UNKNOWN: if self.weird_mac_at_cabin: return Location.CABIN else: return Location.UNKNOWN import dict_utils votes: Dict[Location, int] = {} tiebreaks: Dict[Location, datetime.datetime] = {} credit = 10000 for mac in self.devices_by_person[name]: if mac not in self.names_by_mac: continue mac_name = self.names_by_mac[mac] logger.debug(f'Looking for {name}... check for mac {mac} ({mac_name})') for location in self.location_ts_by_mac: if mac in self.location_ts_by_mac[location]: ts = (self.location_ts_by_mac[location])[mac] logger.debug(f'Seen {mac} ({mac_name}) at {location} since {ts}') tiebreaks[location] = ts (most_recent_location, first_seen_ts) = dict_utils.item_with_max_value( tiebreaks ) bonus = credit v = votes.get(most_recent_location, 0) votes[most_recent_location] = v + bonus logger.debug(f'{name}: {location} gets {bonus} votes.') credit = int(credit * 0.2) # Note: list most important devices first if credit <= 0: credit = 1 if len(votes) > 0: (location, value) = dict_utils.item_with_max_value(votes) if value > 2001: return location return Location.UNKNOWN @bootstrap.initialize def main() -> None: p = PresenceDetection() for person in Person: print(f'{person} => {p.where_is_person_now(person)}') print() # for location in Location: # print(f'{location} => {p.is_anyone_in_location_now(location)}') if __name__ == '__main__': main()