Easier and more self documenting patterns for loading/saving Persistent
[python_utils.git] / base_presence.py
index 405b743f90e33a85394528403f06bce71d153cc8..c4d61da3ca4687be7f0baa945cd684de0483552a 100755 (executable)
@@ -1,20 +1,28 @@
 #!/usr/bin/env python3
 
+# © Copyright 2021-2022, Scott Gasch
+
+"""This is a module dealing with trying to guess a person's location
+based on the location of certain devices (e.g. phones, laptops)
+belonging to that person.  It works with networks I run that log
+device MAC addresses active.
+
+"""
+
 import datetime
-from collections import defaultdict
 import logging
 import re
-from typing import Dict, List, Set
 import warnings
+from collections import defaultdict
+from typing import Dict, List, Optional, Set
 
 # Note: this module is fairly early loaded.  Be aware of dependencies.
 import argparse_utils
 import bootstrap
 import config
+import site_config
 from type.locations import Location
 from type.people import Person
-import site_config
-
 
 logger = logging.getLogger(__name__)
 
@@ -24,35 +32,43 @@ cfg = config.add_commandline_args(
 )
 cfg.add_argument(
     "--presence_macs_file",
-    type=argparse_utils.valid_filename,
-    default = "/home/scott/cron/persisted_mac_addresses.txt",
+    type=str,
+    default="/home/scott/cron/persisted_mac_addresses.txt",
     metavar="FILENAME",
-    help="The location of persisted_mac_addresses.txt to use."
+    help="The location of persisted_mac_addresses.txt to use.",
 )
 cfg.add_argument(
     '--presence_tolerable_staleness_seconds',
     type=argparse_utils.valid_duration,
     default=datetime.timedelta(seconds=60 * 5),
     metavar='DURATION',
-    help='Max acceptable age of location data before auto-refreshing'
+    help='Max acceptable age of location data before auto-refreshing',
 )
 
 
 class PresenceDetection(object):
+    """This is a module dealing with trying to guess a person's location
+    based on the location of certain devices (e.g. phones, laptops)
+    belonging to that person.  It works with networks I run that log
+    device MAC addresses active.
+    """
+
     def __init__(self) -> None:
+        """C'tor"""
+
         # Note: list most important devices first.
         self.devices_by_person: Dict[Person, List[str]] = {
             Person.SCOTT: [
-                "DC:E5:5B:0F:03:3D", # pixel6
-                "6C:40:08:AE:DC:2E", # laptop
+                "DC:E5:5B:0F:03:3D",  # pixel6
+                "6C:40:08:AE:DC:2E",  # laptop
             ],
             Person.LYNN: [
-                "08:CC:27:63:26:14", # motog7
-                "B8:31:B5:9A:4F:19", # laptop
+                "08:CC:27:63:26:14",  # motog7
+                "B8:31:B5:9A:4F:19",  # laptop
             ],
             Person.ALEX: [
-                "0C:CB:85:0C:8B:AE", # phone
-                "D0:C6:37:E3:36:9A", # laptop
+                "0C:CB:85:0C:8B:AE",  # phone
+                "D0:C6:37:E3:36:9A",  # laptop
             ],
             Person.AARON_AND_DANA: [
                 "98:B6:E9:E5:5A:7C",
@@ -68,111 +84,121 @@ class PresenceDetection(object):
             ],
         }
         self.run_location = site_config.get_location()
-        logger.debug(f"run_location is {self.run_location}")
+        logger.debug("base_presence run_location is %s", self.run_location)
         self.weird_mac_at_cabin = False
-        self.location_ts_by_mac: Dict[
-            Location, Dict[str, datetime.datetime]
-        ] = defaultdict(dict)
+        self.location_ts_by_mac: Dict[Location, Dict[str, datetime.datetime]] = defaultdict(dict)
         self.names_by_mac: Dict[str, str] = {}
         self.dark_locations: Set[Location] = set()
-        self.last_update = None
+        self.last_update: Optional[datetime.datetime] = None
 
     def maybe_update(self) -> None:
+        """Determine if our state is stale and needs to be updated and do
+        it, if so.
+        """
+
         if self.last_update is None:
             self.update()
         else:
             now = datetime.datetime.now()
             delta = now - self.last_update
-            if delta.total_seconds() > config.config['presence_tolerable_staleness_seconds'].total_seconds():
+            if (
+                delta.total_seconds()
+                > config.config['presence_tolerable_staleness_seconds'].total_seconds()
+            ):
                 logger.debug(
-                    f"It's been {delta.total_seconds()}s since last update; refreshing now."
+                    "It's been %ss since last update; refreshing now.", delta.total_seconds()
                 )
                 self.update()
 
     def update(self) -> None:
+        """Unconditionally update our state."""
+
         self.dark_locations = set()
-        if self.run_location is Location.HOUSE:
-            self.update_from_house()
-        elif self.run_location is Location.CABIN:
-            self.update_from_cabin()
-        else:
-            raise Exception("Where the hell is this running?!")
+        self._update_house()
+        self._update_cabin()
         self.last_update = datetime.datetime.now()
 
-    def update_from_house(self) -> None:
+    @staticmethod
+    def _get_raw_data_via_ssh(location: Location) -> Optional[str]:
         from exec_utils import cmd
+
+        canonical = {
+            Location.HOUSE: '[email protected]',
+            Location.CABIN: '[email protected]',
+        }
         try:
-            persisted_macs = config.config['presence_macs_file']
-        except KeyError:
-            persisted_macs = '/home/scott/cron/persisted_mac_addresses.txt'
-        self.read_persisted_macs_file(persisted_macs, Location.HOUSE)
-        try:
-            raw = cmd(
-                "ssh [email protected] 'cat /home/scott/cron/persisted_mac_addresses.txt'",
-                timeout_seconds=10.0,
+            return cmd(
+                f"ssh {canonical[location]} 'cat /home/scott/cron/persisted_mac_addresses.txt'",
+                timeout_seconds=30.0,
             )
-            self.parse_raw_macs_file(raw, Location.CABIN)
-        except Exception as e:
-            logger.exception(e)
-            msg = "Can't see the cabin right now; presence detection impared."
-            warnings.warn(msg)
-            logger.warning(msg, stacklevel=2)
-            self.dark_locations.add(Location.CABIN)
+        except Exception:
+            return None
 
-    def update_from_cabin(self) -> None:
-        from exec_utils import cmd
-        try:
-            persisted_macs = config.config['presence_macs_file']
-        except KeyError:
-            persisted_macs = '/home/scott/cron/persisted_mac_addresses.txt'
-        self.read_persisted_macs_file(persisted_macs, Location.CABIN)
-        try:
-            raw = cmd(
-                "ssh [email protected] 'cat /home/scott/cron/persisted_mac_addresses.txt'",
-                timeout_seconds=10.0,
+    def _get_raw_data(self, location: Location) -> Optional[str]:
+        from os.path import exists
+
+        if self.run_location == location:
+            persisted_macs = config.config.get(
+                'presence_macs_file', '/home/scott/cron/persisted_mac_addresses.txt'
             )
-            self.parse_raw_macs_file(raw, Location.HOUSE)
-        except Exception as e:
-            logger.exception(e)
+            if exists(persisted_macs):
+                with open(persisted_macs, 'r') as rf:
+                    return rf.read()
+            else:
+                return PresenceDetection._get_raw_data_via_ssh(location)
+        else:
+            return PresenceDetection._get_raw_data_via_ssh(location)
+        return None
+
+    def _update_house(self) -> None:
+        data = self._get_raw_data(Location.HOUSE)
+        if data:
+            self._parse_raw_macs_file(data, Location.HOUSE)
+        else:
             msg = "Can't see the house right now; presence detection impared."
-            logger.warning(msg)
-            warnings.warn(msg, stacklevel=2)
+            warnings.warn(msg)
+            logger.warning(msg, stacklevel=2)
             self.dark_locations.add(Location.HOUSE)
 
-    def read_persisted_macs_file(
-        self, filename: str, location: Location
-    ) -> None:
-        if location is Location.UNKNOWN:
-            return
-        with open(filename, "r") as rf:
-            lines = rf.read()
-        self.parse_raw_macs_file(lines, location)
+    def _update_cabin(self) -> None:
+        data = self._get_raw_data(Location.CABIN)
+        if data:
+            self._parse_raw_macs_file(data, Location.CABIN)
+        else:
+            msg = "Can't see the cabin right now; presence detection impared."
+            warnings.warn(msg)
+            logger.warning(msg, stacklevel=2)
+            self.dark_locations.add(Location.CABIN)
+
+    def _parse_raw_macs_file(self, raw: str, location: Location) -> None:
+        """Internal method that parses the contents of the MACs file."""
 
-    def parse_raw_macs_file(self, raw: str, location: Location) -> None:
         lines = raw.split("\n")
 
         # CC:F4:11:D7:FA:EE, 2240, 10.0.0.22 (side_deck_high_home), Google, 1611681990
         cabin_count = 0
         for line in lines:
             line = line.strip()
+            if 'using fake authentication data for X11' in line:
+                continue
             if len(line) == 0:
                 continue
-            logger.debug(f'{location}> {line}')
+            logger.debug('%s> %s', location, line)
             if "cabin_" in line:
                 continue
             if location == Location.CABIN:
-                logger.debug('Cabin count: {cabin_count}')
+                logger.debug('Cabin count: %d', cabin_count)
                 cabin_count += 1
             try:
-                (mac, count, ip_name, mfg, ts) = line.split(",")
+                (mac, _, ip_name, _, ts) = line.split(",")  # type: ignore
             except Exception as e:
-                logger.error(f'SKIPPED BAD LINE> {line}')
                 logger.exception(e)
+                logger.error('SKIPPED BAD LINE> %s', line)
                 continue
             mac = mac.strip()
-            (self.location_ts_by_mac[location])[
-                mac
-            ] = datetime.datetime.fromtimestamp(int(ts.strip()))
+            (self.location_ts_by_mac[location])[mac] = datetime.datetime.fromtimestamp(
+                int(ts.strip())
+            )
             ip_name = ip_name.strip()
             match = re.match(r"(\d+\.\d+\.\d+\.\d+) +\(([^\)]+)\)", ip_name)
             if match is not None:
@@ -183,6 +209,16 @@ class PresenceDetection(object):
             self.weird_mac_at_cabin = True
 
     def is_anyone_in_location_now(self, location: Location) -> bool:
+        """Determine if anyone is in a given location based on the presence of
+        MAC files seen recently on the network.
+
+        Args:
+            location: the location in question
+
+        Returns:
+            True if someone is detected or False otherwise.
+        """
+
         self.maybe_update()
         if location in self.dark_locations:
             raise Exception(f"Can't see {location} right now; answer undefined.")
@@ -196,12 +232,22 @@ class PresenceDetection(object):
         return False
 
     def where_is_person_now(self, name: Person) -> Location:
+        """Given a person, see if we can determine their location based on
+        network MAC addresses.
+
+        Args:
+            name: The person we're looking for.
+
+        Returns:
+            The Location where we think they are (including UNKNOWN).
+        """
+
         self.maybe_update()
         if len(self.dark_locations) > 0:
             msg = f"Can't see {self.dark_locations} right now; answer confidence impacted"
             logger.warning(msg)
             warnings.warn(msg, stacklevel=2)
-        logger.debug(f'Looking for {name}...')
+        logger.debug('Looking for %s...', name)
 
         if name is Person.UNKNOWN:
             if self.weird_mac_at_cabin:
@@ -210,33 +256,37 @@ class PresenceDetection(object):
                 return Location.UNKNOWN
 
         import dict_utils
+
         votes: Dict[Location, int] = {}
         tiebreaks: Dict[Location, datetime.datetime] = {}
         credit = 10000
+        location = None
         for mac in self.devices_by_person[name]:
             if mac not in self.names_by_mac:
                 continue
             mac_name = self.names_by_mac[mac]
-            logger.debug(f'Looking for {name}... check for mac {mac} ({mac_name})')
+            logger.debug('Looking for %s... check for mac %s (%s)', name, mac, mac_name)
             for location in self.location_ts_by_mac:
                 if mac in self.location_ts_by_mac[location]:
                     ts = (self.location_ts_by_mac[location])[mac]
-                    logger.debug(f'Seen {mac} ({mac_name}) at {location} since {ts}')
+                    logger.debug('Seen %s (%s) at %s since %s', mac, mac_name, location, ts)
                     tiebreaks[location] = ts
 
-            (most_recent_location, first_seen_ts) = dict_utils.item_with_max_value(tiebreaks)
+            (
+                most_recent_location,
+                _,
+            ) = dict_utils.item_with_max_value(tiebreaks)
             bonus = credit
             v = votes.get(most_recent_location, 0)
             votes[most_recent_location] = v + bonus
-            logger.debug(f'{name}: {location} gets {bonus} votes.')
-            credit = int(
-                credit * 0.2
-            )  # Note: list most important devices first
+            logger.debug('%s: %s gets %d votes.', name, most_recent_location, bonus)
+            credit = int(credit * 0.2)  # Note: list most important devices first
             if credit <= 0:
                 credit = 1
         if len(votes) > 0:
             (location, value) = dict_utils.item_with_max_value(votes)
             if value > 2001:
+                assert location
                 return location
         return Location.UNKNOWN
 
@@ -247,8 +297,8 @@ def main() -> None:
     for person in Person:
         print(f'{person} => {p.where_is_person_now(person)}')
     print()
-#    for location in Location:
-#        print(f'{location} => {p.is_anyone_in_location_now(location)}')
+    for location in Location:
+        print(f'{location} => {p.is_anyone_in_location_now(location)}')
 
 
 if __name__ == '__main__':