X-Git-Url: https://wannabe.guru.org/gitweb/?a=blobdiff_plain;f=base_presence.py;h=c4d61da3ca4687be7f0baa945cd684de0483552a;hb=8f8cb3032e0593e8e95ba1b54ca425573540ad7b;hp=94c5e2f0b2ecbcf278df0c7ed09bf357286242f3;hpb=bb3ebebb1a2458fa52b042b35d8e5fbad408ff80;p=python_utils.git diff --git a/base_presence.py b/base_presence.py index 94c5e2f..c4d61da 100755 --- a/base_presence.py +++ b/base_presence.py @@ -1,20 +1,28 @@ #!/usr/bin/env python3 +# © Copyright 2021-2022, Scott Gasch + +"""This is a module dealing with trying to guess a person's location +based on the location of certain devices (e.g. phones, laptops) +belonging to that person. It works with networks I run that log +device MAC addresses active. + +""" + import datetime -from collections import defaultdict import logging import re -from typing import Dict, List, Set import warnings +from collections import defaultdict +from typing import Dict, List, Optional, Set # Note: this module is fairly early loaded. Be aware of dependencies. import argparse_utils import bootstrap import config +import site_config from type.locations import Location from type.people import Person -import site_config - logger = logging.getLogger(__name__) @@ -24,35 +32,43 @@ cfg = config.add_commandline_args( ) cfg.add_argument( "--presence_macs_file", - type=argparse_utils.valid_filename, - default = "/home/scott/cron/persisted_mac_addresses.txt", + type=str, + default="/home/scott/cron/persisted_mac_addresses.txt", metavar="FILENAME", - help="The location of persisted_mac_addresses.txt to use." + help="The location of persisted_mac_addresses.txt to use.", ) cfg.add_argument( '--presence_tolerable_staleness_seconds', type=argparse_utils.valid_duration, default=datetime.timedelta(seconds=60 * 5), metavar='DURATION', - help='Max acceptable age of location data before auto-refreshing' + help='Max acceptable age of location data before auto-refreshing', ) class PresenceDetection(object): + """This is a module dealing with trying to guess a person's location + based on the location of certain devices (e.g. phones, laptops) + belonging to that person. It works with networks I run that log + device MAC addresses active. + """ + def __init__(self) -> None: + """C'tor""" + # Note: list most important devices first. self.devices_by_person: Dict[Person, List[str]] = { Person.SCOTT: [ - "DC:E5:5B:0F:03:3D", # pixel6 - "6C:40:08:AE:DC:2E", # laptop + "DC:E5:5B:0F:03:3D", # pixel6 + "6C:40:08:AE:DC:2E", # laptop ], Person.LYNN: [ - "08:CC:27:63:26:14", # motog7 - "B8:31:B5:9A:4F:19", # laptop + "08:CC:27:63:26:14", # motog7 + "B8:31:B5:9A:4F:19", # laptop ], Person.ALEX: [ - "0C:CB:85:0C:8B:AE", # phone - "D0:C6:37:E3:36:9A", # laptop + "0C:CB:85:0C:8B:AE", # phone + "D0:C6:37:E3:36:9A", # laptop ], Person.AARON_AND_DANA: [ "98:B6:E9:E5:5A:7C", @@ -68,111 +84,121 @@ class PresenceDetection(object): ], } self.run_location = site_config.get_location() - logger.debug(f"run_location is {self.run_location}") + logger.debug("base_presence run_location is %s", self.run_location) self.weird_mac_at_cabin = False - self.location_ts_by_mac: Dict[ - Location, Dict[str, datetime.datetime] - ] = defaultdict(dict) + self.location_ts_by_mac: Dict[Location, Dict[str, datetime.datetime]] = defaultdict(dict) self.names_by_mac: Dict[str, str] = {} self.dark_locations: Set[Location] = set() - self.last_update = None + self.last_update: Optional[datetime.datetime] = None def maybe_update(self) -> None: + """Determine if our state is stale and needs to be updated and do + it, if so. + """ + if self.last_update is None: self.update() else: now = datetime.datetime.now() delta = now - self.last_update - if delta.total_seconds() > config.config['presence_tolerable_staleness_seconds'].total_seconds(): + if ( + delta.total_seconds() + > config.config['presence_tolerable_staleness_seconds'].total_seconds() + ): logger.debug( - f"It's been {delta.total_seconds()}s since last update; refreshing now." + "It's been %ss since last update; refreshing now.", delta.total_seconds() ) self.update() def update(self) -> None: + """Unconditionally update our state.""" + self.dark_locations = set() - if self.run_location is Location.HOUSE: - self.update_from_house() - elif self.run_location is Location.CABIN: - self.update_from_cabin() - else: - raise Exception("Where the hell is this running?!") + self._update_house() + self._update_cabin() self.last_update = datetime.datetime.now() - def update_from_house(self) -> None: + @staticmethod + def _get_raw_data_via_ssh(location: Location) -> Optional[str]: from exec_utils import cmd + + canonical = { + Location.HOUSE: 'scott@wannabe.house', + Location.CABIN: 'scott@meerkat.cabin', + } try: - persisted_macs = config.config['presence_macs_file'] - except KeyError: - persisted_macs = '/home/scott/cron/persisted_mac_addresses.txt' - self.read_persisted_macs_file(persisted_macs, Location.HOUSE) - try: - raw = cmd( - "ssh scott@meerkat.cabin 'cat /home/scott/cron/persisted_mac_addresses.txt'", - timeout_seconds=10.0, + return cmd( + f"ssh {canonical[location]} 'cat /home/scott/cron/persisted_mac_addresses.txt'", + timeout_seconds=30.0, ) - self.parse_raw_macs_file(raw, Location.CABIN) - except Exception as e: - logger.exception(e) - msg = "Can't see the cabin right now; presence detection impared." - warnings.warn(msg) - logger.warning(msg, stacklevel=2) - self.dark_locations.add(Location.CABIN) + except Exception: + return None - def update_from_cabin(self) -> None: - from exec_utils import cmd - try: - persisted_macs = config.config['presence_macs_file'] - except KeyError: - persisted_macs = '/home/scott/cron/persisted_mac_addresses.txt' - self.read_persisted_macs_file(persisted_macs, Location.CABIN) - try: - raw = cmd( - "ssh scott@wennabe.house 'cat /home/scott/cron/persisted_mac_addresses.txt'", - timeout_seconds=10.0, + def _get_raw_data(self, location: Location) -> Optional[str]: + from os.path import exists + + if self.run_location == location: + persisted_macs = config.config.get( + 'presence_macs_file', '/home/scott/cron/persisted_mac_addresses.txt' ) - self.parse_raw_macs_file(raw, Location.HOUSE) - except Exception as e: - logger.exception(e) + if exists(persisted_macs): + with open(persisted_macs, 'r') as rf: + return rf.read() + else: + return PresenceDetection._get_raw_data_via_ssh(location) + else: + return PresenceDetection._get_raw_data_via_ssh(location) + return None + + def _update_house(self) -> None: + data = self._get_raw_data(Location.HOUSE) + if data: + self._parse_raw_macs_file(data, Location.HOUSE) + else: msg = "Can't see the house right now; presence detection impared." - logger.warning(msg) - warnings.warn(msg, stacklevel=2) + warnings.warn(msg) + logger.warning(msg, stacklevel=2) self.dark_locations.add(Location.HOUSE) - def read_persisted_macs_file( - self, filename: str, location: Location - ) -> None: - if location is Location.UNKNOWN: - return - with open(filename, "r") as rf: - lines = rf.read() - self.parse_raw_macs_file(lines, location) + def _update_cabin(self) -> None: + data = self._get_raw_data(Location.CABIN) + if data: + self._parse_raw_macs_file(data, Location.CABIN) + else: + msg = "Can't see the cabin right now; presence detection impared." + warnings.warn(msg) + logger.warning(msg, stacklevel=2) + self.dark_locations.add(Location.CABIN) + + def _parse_raw_macs_file(self, raw: str, location: Location) -> None: + """Internal method that parses the contents of the MACs file.""" - def parse_raw_macs_file(self, raw: str, location: Location) -> None: lines = raw.split("\n") # CC:F4:11:D7:FA:EE, 2240, 10.0.0.22 (side_deck_high_home), Google, 1611681990 cabin_count = 0 for line in lines: line = line.strip() + if 'using fake authentication data for X11' in line: + continue if len(line) == 0: continue - logger.debug(f'{location}> {line}') + logger.debug('%s> %s', location, line) if "cabin_" in line: continue if location == Location.CABIN: - logger.debug(f'Cabin count: {cabin_count}') + logger.debug('Cabin count: %d', cabin_count) cabin_count += 1 try: - (mac, count, ip_name, mfg, ts) = line.split(",") + (mac, _, ip_name, _, ts) = line.split(",") # type: ignore except Exception as e: - logger.error(f'SKIPPED BAD LINE> {line}') logger.exception(e) + logger.error('SKIPPED BAD LINE> %s', line) continue mac = mac.strip() - (self.location_ts_by_mac[location])[ - mac - ] = datetime.datetime.fromtimestamp(int(ts.strip())) + (self.location_ts_by_mac[location])[mac] = datetime.datetime.fromtimestamp( + int(ts.strip()) + ) ip_name = ip_name.strip() match = re.match(r"(\d+\.\d+\.\d+\.\d+) +\(([^\)]+)\)", ip_name) if match is not None: @@ -183,6 +209,16 @@ class PresenceDetection(object): self.weird_mac_at_cabin = True def is_anyone_in_location_now(self, location: Location) -> bool: + """Determine if anyone is in a given location based on the presence of + MAC files seen recently on the network. + + Args: + location: the location in question + + Returns: + True if someone is detected or False otherwise. + """ + self.maybe_update() if location in self.dark_locations: raise Exception(f"Can't see {location} right now; answer undefined.") @@ -196,12 +232,22 @@ class PresenceDetection(object): return False def where_is_person_now(self, name: Person) -> Location: + """Given a person, see if we can determine their location based on + network MAC addresses. + + Args: + name: The person we're looking for. + + Returns: + The Location where we think they are (including UNKNOWN). + """ + self.maybe_update() if len(self.dark_locations) > 0: msg = f"Can't see {self.dark_locations} right now; answer confidence impacted" logger.warning(msg) warnings.warn(msg, stacklevel=2) - logger.debug(f'Looking for {name}...') + logger.debug('Looking for %s...', name) if name is Person.UNKNOWN: if self.weird_mac_at_cabin: @@ -210,33 +256,37 @@ class PresenceDetection(object): return Location.UNKNOWN import dict_utils + votes: Dict[Location, int] = {} tiebreaks: Dict[Location, datetime.datetime] = {} credit = 10000 + location = None for mac in self.devices_by_person[name]: if mac not in self.names_by_mac: continue mac_name = self.names_by_mac[mac] - logger.debug(f'Looking for {name}... check for mac {mac} ({mac_name})') + logger.debug('Looking for %s... check for mac %s (%s)', name, mac, mac_name) for location in self.location_ts_by_mac: if mac in self.location_ts_by_mac[location]: ts = (self.location_ts_by_mac[location])[mac] - logger.debug(f'Seen {mac} ({mac_name}) at {location} since {ts}') + logger.debug('Seen %s (%s) at %s since %s', mac, mac_name, location, ts) tiebreaks[location] = ts - (most_recent_location, first_seen_ts) = dict_utils.item_with_max_value(tiebreaks) + ( + most_recent_location, + _, + ) = dict_utils.item_with_max_value(tiebreaks) bonus = credit v = votes.get(most_recent_location, 0) votes[most_recent_location] = v + bonus - logger.debug(f'{name}: {location} gets {bonus} votes.') - credit = int( - credit * 0.2 - ) # Note: list most important devices first + logger.debug('%s: %s gets %d votes.', name, most_recent_location, bonus) + credit = int(credit * 0.2) # Note: list most important devices first if credit <= 0: credit = 1 if len(votes) > 0: (location, value) = dict_utils.item_with_max_value(votes) if value > 2001: + assert location return location return Location.UNKNOWN @@ -247,8 +297,8 @@ def main() -> None: for person in Person: print(f'{person} => {p.where_is_person_now(person)}') print() -# for location in Location: -# print(f'{location} => {p.is_anyone_in_location_now(location)}') + for location in Location: + print(f'{location} => {p.is_anyone_in_location_now(location)}') if __name__ == '__main__':