A bunch of changes...
[python_utils.git] / base_presence.py
diff --git a/base_presence.py b/base_presence.py
new file mode 100755 (executable)
index 0000000..cd62bb7
--- /dev/null
@@ -0,0 +1,250 @@
+#!/usr/bin/env python3
+
+import datetime
+from collections import defaultdict
+import logging
+import re
+from typing import Dict, List, Set
+
+# Note: this module is fairly early loaded.  Be aware of dependencies.
+import argparse_utils
+import bootstrap
+import config
+from type.locations import Location
+from type.people import Person
+import site_config
+
+
+logger = logging.getLogger(__name__)
+
+cfg = config.add_commandline_args(
+    f"Presence Detection ({__file__})",
+    "Args related to detection of human beings in locations.",
+)
+cfg.add_argument(
+    "--presence_macs_file",
+    type=argparse_utils.valid_filename,
+    default = "/home/scott/cron/persisted_mac_addresses.txt",
+    metavar="FILENAME",
+    help="The location of persisted_mac_addresses.txt to use."
+)
+cfg.add_argument(
+    '--presence_tolerable_staleness_seconds',
+    type=argparse_utils.valid_duration,
+    default=datetime.timedelta(seconds=60 * 5),
+    metavar='DURATION',
+    help='Max acceptable age of location data before auto-refreshing'
+)
+
+
+class PresenceDetection(object):
+    def __init__(self) -> None:
+        # Note: list most important devices first.
+        self.devices_by_person: Dict[Person, List[str]] = {
+            Person.SCOTT: [
+                "3C:28:6D:10:6D:41", # pixel3
+                "6C:40:08:AE:DC:2E", # laptop
+            ],
+            Person.LYNN: [
+                "08:CC:27:63:26:14", # motog7
+                "B8:31:B5:9A:4F:19", # laptop
+            ],
+            Person.ALEX: [
+                "0C:CB:85:0C:8B:AE", # phone
+                "D0:C6:37:E3:36:9A", # laptop
+            ],
+            Person.AARON_AND_DANA: [
+                "98:B6:E9:E5:5A:7C",
+                "D6:2F:37:CA:B2:9B",
+                "6C:E8:5C:ED:17:26",
+                "90:E1:7B:13:7C:E5",
+                "6E:DC:7C:75:02:1B",
+                "B2:16:1A:93:7D:50",
+                "18:65:90:DA:3A:35",
+                "22:28:C8:7D:3C:85",
+                "B2:95:23:69:91:F8",
+                "96:69:2C:88:7A:C3",
+            ],
+        }
+        self.run_location = site_config.get_location()
+        logger.debug(f"run_location is {self.run_location}")
+        self.weird_mac_at_cabin = False
+        self.location_ts_by_mac: Dict[
+            Location, Dict[str, datetime.datetime]
+        ] = defaultdict(dict)
+        self.names_by_mac: Dict[str, str] = {}
+        self.dark_locations: Set[Location] = set()
+        self.last_update = None
+
+    def maybe_update(self) -> None:
+        if self.last_update is None:
+            self.update()
+        else:
+            now = datetime.datetime.now()
+            delta = now - self.last_update
+            if delta.total_seconds() > config.config['presence_tolerable_staleness_seconds'].total_seconds():
+                logger.debug(
+                    f"It's been {delta.total_seconds()}s since last update; refreshing now."
+                )
+                self.update()
+
+    def update(self) -> None:
+        self.dark_locations = set()
+        if self.run_location is Location.HOUSE:
+            self.update_from_house()
+        elif self.run_location is Location.CABIN:
+            self.update_from_cabin()
+        else:
+            raise Exception("Where the hell is this running?!")
+        self.last_update = datetime.datetime.now()
+
+    def update_from_house(self) -> None:
+        from exec_utils import cmd
+        try:
+            persisted_macs = config.config['presence_macs_file']
+        except KeyError:
+            persisted_macs = '/home/scott/cron/persisted_mac_addresses.txt'
+        self.read_persisted_macs_file(persisted_macs, Location.HOUSE)
+        try:
+            raw = cmd(
+                "ssh [email protected] 'cat /home/scott/cron/persisted_mac_addresses.txt'",
+                timeout_seconds=10.0,
+            )
+            self.parse_raw_macs_file(raw, Location.CABIN)
+        except Exception as e:
+            logger.exception(e)
+            logger.warning("Can't see the cabin right now; presence detection impared.")
+            self.dark_locations.add(Location.CABIN)
+
+    def update_from_cabin(self) -> None:
+        from exec_utils import cmd
+        try:
+            persisted_macs = config.config['presence_macs_file']
+        except KeyError:
+            persisted_macs = '/home/scott/cron/persisted_mac_addresses.txt'
+        self.read_persisted_macs_file(persisted_macs, Location.CABIN)
+        try:
+            raw = cmd(
+                "ssh [email protected] 'cat /home/scott/cron/persisted_mac_addresses.txt'",
+                timeout_seconds=10.0,
+            )
+            self.parse_raw_macs_file(raw, Location.HOUSE)
+        except Exception as e:
+            logger.exception(e)
+            logger.warning("Can't see the house right now; presence detection impared.")
+            self.dark_locations.add(Location.HOUSE)
+
+    def read_persisted_macs_file(
+        self, filename: str, location: Location
+    ) -> None:
+        if location is Location.UNKNOWN:
+            return
+        with open(filename, "r") as rf:
+            lines = rf.read()
+        self.parse_raw_macs_file(lines, location)
+
+    def parse_raw_macs_file(self, raw: str, location: Location) -> None:
+        lines = raw.split("\n")
+
+        # CC:F4:11:D7:FA:EE, 2240, 10.0.0.22 (side_deck_high_home), Google, 1611681990
+        cabin_count = 0
+        for line in lines:
+            line = line.strip()
+            if len(line) == 0:
+                continue
+            logger.debug(f'{location}> {line}')
+            if "cabin_" in line:
+                continue
+            if location == Location.CABIN:
+                logger.debug('Cabin count: {cabin_count}')
+                cabin_count += 1
+            try:
+                (mac, count, ip_name, mfg, ts) = line.split(",")
+            except Exception as e:
+                logger.error(f'SKIPPED BAD LINE> {line}')
+                logger.exception(e)
+                continue
+            mac = mac.strip()
+            (self.location_ts_by_mac[location])[
+                mac
+            ] = datetime.datetime.fromtimestamp(int(ts.strip()))
+            ip_name = ip_name.strip()
+            match = re.match(r"(\d+\.\d+\.\d+\.\d+) +\(([^\)]+)\)", ip_name)
+            if match is not None:
+                name = match.group(2)
+                self.names_by_mac[mac] = name
+        if cabin_count > 0:
+            logger.debug('Weird MAC at the cabin')
+            self.weird_mac_at_cabin = True
+
+    def is_anyone_in_location_now(self, location: Location) -> bool:
+        self.maybe_update()
+        if location in self.dark_locations:
+            raise Exception(f"Can't see {location} right now; answer undefined.")
+        for person in Person:
+            if person is not None:
+                loc = self.where_is_person_now(person)
+                if location == loc:
+                    return True
+        if location == location.CABIN and self.weird_mac_at_cabin:
+            return True
+        return False
+
+    def where_is_person_now(self, name: Person) -> Location:
+        self.maybe_update()
+        if len(self.dark_locations) > 0:
+            logger.warning(
+                f"Can't see {self.dark_locations} right now; answer confidence impacted"
+            )
+        logger.debug(f'Looking for {name}...')
+
+        if name is Person.UNKNOWN:
+            if self.weird_mac_at_cabin:
+                return Location.CABIN
+            else:
+                return Location.UNKNOWN
+
+        import dict_utils
+        votes: Dict[Location, int] = {}
+        tiebreaks: Dict[Location, datetime.datetime] = {}
+        credit = 10000
+        for mac in self.devices_by_person[name]:
+            if mac not in self.names_by_mac:
+                continue
+            mac_name = self.names_by_mac[mac]
+            logger.debug(f'Looking for {name}... check for mac {mac} ({mac_name})')
+            for location in self.location_ts_by_mac:
+                if mac in self.location_ts_by_mac[location]:
+                    ts = (self.location_ts_by_mac[location])[mac]
+                    logger.debug(f'Seen {mac} ({mac_name}) at {location} since {ts}')
+                    tiebreaks[location] = ts
+
+            (most_recent_location, first_seen_ts) = dict_utils.item_with_max_value(tiebreaks)
+            bonus = credit
+            v = votes.get(most_recent_location, 0)
+            votes[most_recent_location] = v + bonus
+            logger.debug(f'{name}: {location} gets {bonus} votes.')
+            credit = int(
+                credit * 0.2
+            )  # Note: list most important devices first
+            if credit <= 0:
+                credit = 1
+        if len(votes) > 0:
+            (location, value) = dict_utils.item_with_max_value(votes)
+            if value > 2001:
+                return location
+        return Location.UNKNOWN
+
+
+def main() -> None:
+    p = PresenceDetection()
+    for person in Person:
+        print(f'{person} => {p.where_is_person_now(person)}')
+    print()
+    for location in Location:
+        print(f'{location} => {p.is_anyone_in_location_now(location)}')
+
+
+if __name__ == '__main__':
+    main()