Make presence detection work from cabin or house and deal with a
[python_utils.git] / presence.py
old mode 100644 (file)
new mode 100755 (executable)
index b6e9fc3..477b14f
@@ -2,15 +2,18 @@
 
 import datetime
 from collections import defaultdict
-import enum
 import logging
 import re
-from typing import Dict, List
+from typing import Dict, List, Set
 
+# Note: this module is fairly early loaded.  Be aware of dependencies.
 import argparse_utils
+import bootstrap
 import config
-import dict_utils
-import exec_utils
+from locations import Location
+from people import Person
+import site_config
+
 
 logger = logging.getLogger(__name__)
 
@@ -27,40 +30,21 @@ cfg.add_argument(
 )
 
 
-class Person(enum.Enum):
-    UNKNOWN = 0
-    SCOTT = 1
-    LYNN = 2
-    ALEX = 3
-    AARON_AND_DANA = 4
-    AARON = 4
-    DANA = 4
-
-
-class Location(enum.Enum):
-    UNKNOWN = 0
-    HOUSE = 1
-    CABIN = 2
-
-
 class PresenceDetection(object):
     def __init__(self) -> None:
         # Note: list most important devices first.
         self.devices_by_person: Dict[Person, List[str]] = {
             Person.SCOTT: [
-                "3C:28:6D:10:6D:41",
-                "D4:61:2E:88:18:09",
-                "6C:40:08:AE:DC:2E",
-                "14:7D:DA:6A:20:D7",
+                "3C:28:6D:10:6D:41", # pixel3
+                "6C:40:08:AE:DC:2E", # laptop
             ],
             Person.LYNN: [
-                "08:CC:27:63:26:14",
-                "B8:31:B5:9A:4F:19",
+                "08:CC:27:63:26:14", # motog7
+                "B8:31:B5:9A:4F:19", # laptop
             ],
             Person.ALEX: [
-                "0C:CB:85:0C:8B:AE",
-                "D0:C6:37:E3:36:9A",
+                "0C:CB:85:0C:8B:AE", # phone
+                "D0:C6:37:E3:36:9A", # laptop
             ],
             Person.AARON_AND_DANA: [
                 "98:B6:E9:E5:5A:7C",
@@ -75,20 +59,60 @@ class PresenceDetection(object):
                 "96:69:2C:88:7A:C3",
             ],
         }
+        self.run_location = site_config.get_location()
+        logger.debug(f"run_location is {self.run_location}")
         self.weird_mac_at_cabin = False
         self.location_ts_by_mac: Dict[
             Location, Dict[str, datetime.datetime]
         ] = defaultdict(dict)
         self.names_by_mac: Dict[str, str] = {}
+        self.dark_locations: Set[Location] = set()
         self.update()
 
     def update(self) -> None:
-        persisted_macs = config.config['presence_macs_file']
+        self.dark_locations = set()
+        if self.run_location is Location.HOUSE:
+            self.update_from_house()
+        elif self.run_location is Location.CABIN:
+            self.update_from_cabin()
+        else:
+            raise Exception("Where the hell is this running?!")
+
+    def update_from_house(self) -> None:
+        from exec_utils import cmd_with_timeout
+        try:
+            persisted_macs = config.config['presence_macs_file']
+        except KeyError:
+            persisted_macs = '/home/scott/cron/persisted_mac_addresses.txt'
         self.read_persisted_macs_file(persisted_macs, Location.HOUSE)
-        raw = exec_utils.cmd(
-            "ssh [email protected] 'cat /home/scott/cron/persisted_mac_addresses.txt'"
-        )
-        self.parse_raw_macs_file(raw, Location.CABIN)
+        try:
+            raw = cmd_with_timeout(
+                "ssh [email protected] 'cat /home/scott/cron/persisted_mac_addresses.txt'",
+                timeout_seconds=10.0,
+            )
+            self.parse_raw_macs_file(raw, Location.CABIN)
+        except Exception as e:
+            logger.exception(e)
+            logger.warning("Can't see the cabin right now; presence detection impared.")
+            self.dark_locations.add(Location.CABIN)
+
+    def update_from_cabin(self) -> None:
+        from exec_utils import cmd_with_timeout
+        try:
+            persisted_macs = config.config['presence_macs_file']
+        except KeyError:
+            persisted_macs = '/home/scott/cron/persisted_mac_addresses.txt'
+        self.read_persisted_macs_file(persisted_macs, Location.CABIN)
+        try:
+            raw = cmd_with_timeout(
+                "ssh [email protected] 'cat /home/scott/cron/persisted_mac_addresses.txt'",
+                timeout_seconds=10.0,
+            )
+            self.parse_raw_macs_file(raw, Location.HOUSE)
+        except Exception as e:
+            logger.exception(e)
+            logger.warning(f"Can't see the house right now; presence detection impared.")
+            self.dark_locations.add(Location.HOUSE)
 
     def read_persisted_macs_file(
         self, filename: str, location: Location
@@ -112,6 +136,7 @@ class PresenceDetection(object):
             if "cabin_" in line:
                 continue
             if location == Location.CABIN:
+                logger.debug('Cabin count: {cabin_count}')
                 cabin_count += 1
             try:
                 (mac, count, ip_name, mfg, ts) = line.split(",")
@@ -129,9 +154,12 @@ class PresenceDetection(object):
                 name = match.group(2)
                 self.names_by_mac[mac] = name
         if cabin_count > 0:
+            logger.debug('Weird MAC at the cabin')
             self.weird_mac_at_cabin = True
 
     def is_anyone_in_location_now(self, location: Location) -> bool:
+        if location in self.dark_locations:
+            raise Exception("Can't see {location} right now; answer undefined.")
         for person in Person:
             if person is not None:
                 loc = self.where_is_person_now(person)
@@ -142,6 +170,13 @@ class PresenceDetection(object):
         return False
 
     def where_is_person_now(self, name: Person) -> Location:
+        import dict_utils
+        if len(self.dark_locations) > 0:
+            logger.warning(
+                f"Can't see {self.dark_locations} right now; answer confidence impacted"
+            )
+        logger.debug(f'Looking for {name}...')
+
         if name is Person.UNKNOWN:
             if self.weird_mac_at_cabin:
                 return Location.CABIN
@@ -153,19 +188,40 @@ class PresenceDetection(object):
         for mac in self.devices_by_person[name]:
             if mac not in self.names_by_mac:
                 continue
+            mac_name = self.names_by_mac[mac]
+            logger.debug(f'Looking for {name}... check for mac {mac} ({mac_name})')
             for location in self.location_ts_by_mac:
                 if mac in self.location_ts_by_mac[location]:
                     ts = (self.location_ts_by_mac[location])[mac]
+                    logger.debug(f'Seen {mac} ({mac_name}) at {location} since {ts}')
                     tiebreaks[location] = ts
-            location = dict_utils.key_with_min_value(tiebreaks)
-            v = votes.get(location, 0)
-            votes[location] = v + credit
+
+            (most_recent_location, first_seen_ts) = dict_utils.item_with_max_value(tiebreaks)
+            bonus = credit
+            v = votes.get(most_recent_location, 0)
+            votes[most_recent_location] = v + bonus
+            logger.debug(f'{name}: {location} gets {bonus} votes.')
             credit = int(
-                credit * 0.667
+                credit * 0.2
             )  # Note: list most important devices first
             if credit <= 0:
                 credit = 1
         if len(votes) > 0:
-            item = dict_utils.item_with_max_value(votes)
-            return item[0]
+            (location, value) = dict_utils.item_with_max_value(votes)
+            if value > 2001:
+                return location
         return Location.UNKNOWN
+
+
+def main() -> None:
+    p = PresenceDetection()
+    for person in Person:
+        print(f'{person} => {p.where_is_person_now(person)}')
+    print()
+    for location in Location:
+        print(f'{location} => {p.is_anyone_in_location_now(location)}')
+
+
+if __name__ == '__main__':
+    main()