+++ /dev/null
-#!/usr/bin/env python3
-
-import datetime
-from collections import defaultdict
-import logging
-import re
-from typing import Dict, List, Set
-
-# Note: this module is fairly early loaded. Be aware of dependencies.
-import argparse_utils
-import bootstrap
-import config
-from locations import Location
-from people import Person
-import site_config
-
-
-logger = logging.getLogger(__name__)
-
-cfg = config.add_commandline_args(
- f"Presence Detection ({__file__})",
- "Args related to detection of human beings in locations.",
-)
-cfg.add_argument(
- "--presence_macs_file",
- type=argparse_utils.valid_filename,
- default = "/home/scott/cron/persisted_mac_addresses.txt",
- metavar="FILENAME",
- help="The location of persisted_mac_addresses.txt to use."
-)
-cfg.add_argument(
- '--presence_tolerable_staleness_seconds',
- type=argparse_utils.valid_duration,
- default=datetime.timedelta(seconds=60 * 5),
- metavar='DURATION',
- help='Max acceptable age of location data before auto-refreshing'
-)
-
-
-class PresenceDetection(object):
- def __init__(self) -> None:
- # Note: list most important devices first.
- self.devices_by_person: Dict[Person, List[str]] = {
- Person.SCOTT: [
- "3C:28:6D:10:6D:41", # pixel3
- "6C:40:08:AE:DC:2E", # laptop
- ],
- Person.LYNN: [
- "08:CC:27:63:26:14", # motog7
- "B8:31:B5:9A:4F:19", # laptop
- ],
- Person.ALEX: [
- "0C:CB:85:0C:8B:AE", # phone
- "D0:C6:37:E3:36:9A", # laptop
- ],
- Person.AARON_AND_DANA: [
- "98:B6:E9:E5:5A:7C",
- "D6:2F:37:CA:B2:9B",
- "6C:E8:5C:ED:17:26",
- "90:E1:7B:13:7C:E5",
- "6E:DC:7C:75:02:1B",
- "B2:16:1A:93:7D:50",
- "18:65:90:DA:3A:35",
- "22:28:C8:7D:3C:85",
- "B2:95:23:69:91:F8",
- "96:69:2C:88:7A:C3",
- ],
- }
- self.run_location = site_config.get_location()
- logger.debug(f"run_location is {self.run_location}")
- self.weird_mac_at_cabin = False
- self.location_ts_by_mac: Dict[
- Location, Dict[str, datetime.datetime]
- ] = defaultdict(dict)
- self.names_by_mac: Dict[str, str] = {}
- self.dark_locations: Set[Location] = set()
- self.last_update = None
-
- def maybe_update(self) -> None:
- if self.last_update is None:
- self.update()
- else:
- now = datetime.datetime.now()
- delta = now - self.last_update
- if delta.total_seconds() > config.config['presence_tolerable_staleness_seconds'].total_seconds():
- logger.debug(
- f"It's been {delta.total_seconds()}s since last update; refreshing now."
- )
- self.update()
-
- def update(self) -> None:
- self.dark_locations = set()
- if self.run_location is Location.HOUSE:
- self.update_from_house()
- elif self.run_location is Location.CABIN:
- self.update_from_cabin()
- else:
- raise Exception("Where the hell is this running?!")
- self.last_update = datetime.datetime.now()
-
- def update_from_house(self) -> None:
- from exec_utils import cmd_with_timeout
- try:
- persisted_macs = config.config['presence_macs_file']
- except KeyError:
- persisted_macs = '/home/scott/cron/persisted_mac_addresses.txt'
- self.read_persisted_macs_file(persisted_macs, Location.HOUSE)
- try:
- raw = cmd_with_timeout(
- timeout_seconds=10.0,
- )
- self.parse_raw_macs_file(raw, Location.CABIN)
- except Exception as e:
- logger.exception(e)
- logger.warning("Can't see the cabin right now; presence detection impared.")
- self.dark_locations.add(Location.CABIN)
-
- def update_from_cabin(self) -> None:
- from exec_utils import cmd_with_timeout
- try:
- persisted_macs = config.config['presence_macs_file']
- except KeyError:
- persisted_macs = '/home/scott/cron/persisted_mac_addresses.txt'
- self.read_persisted_macs_file(persisted_macs, Location.CABIN)
- try:
- raw = cmd_with_timeout(
- timeout_seconds=10.0,
- )
- self.parse_raw_macs_file(raw, Location.HOUSE)
- except Exception as e:
- logger.exception(e)
- logger.warning("Can't see the house right now; presence detection impared.")
- self.dark_locations.add(Location.HOUSE)
-
- def read_persisted_macs_file(
- self, filename: str, location: Location
- ) -> None:
- if location is Location.UNKNOWN:
- return
- with open(filename, "r") as rf:
- lines = rf.read()
- self.parse_raw_macs_file(lines, location)
-
- def parse_raw_macs_file(self, raw: str, location: Location) -> None:
- lines = raw.split("\n")
-
- # CC:F4:11:D7:FA:EE, 2240, 10.0.0.22 (side_deck_high_home), Google, 1611681990
- cabin_count = 0
- for line in lines:
- line = line.strip()
- if len(line) == 0:
- continue
- logger.debug(f'{location}> {line}')
- if "cabin_" in line:
- continue
- if location == Location.CABIN:
- logger.debug('Cabin count: {cabin_count}')
- cabin_count += 1
- try:
- (mac, count, ip_name, mfg, ts) = line.split(",")
- except Exception as e:
- logger.error(f'SKIPPED BAD LINE> {line}')
- logger.exception(e)
- continue
- mac = mac.strip()
- (self.location_ts_by_mac[location])[
- mac
- ] = datetime.datetime.fromtimestamp(int(ts.strip()))
- ip_name = ip_name.strip()
- match = re.match(r"(\d+\.\d+\.\d+\.\d+) +\(([^\)]+)\)", ip_name)
- if match is not None:
- name = match.group(2)
- self.names_by_mac[mac] = name
- if cabin_count > 0:
- logger.debug('Weird MAC at the cabin')
- self.weird_mac_at_cabin = True
-
- def is_anyone_in_location_now(self, location: Location) -> bool:
- self.maybe_update()
- if location in self.dark_locations:
- raise Exception("Can't see {location} right now; answer undefined.")
- for person in Person:
- if person is not None:
- loc = self.where_is_person_now(person)
- if location == loc:
- return True
- if location == location.CABIN and self.weird_mac_at_cabin:
- return True
- return False
-
- def where_is_person_now(self, name: Person) -> Location:
- self.maybe_update()
- if len(self.dark_locations) > 0:
- logger.warning(
- f"Can't see {self.dark_locations} right now; answer confidence impacted"
- )
- logger.debug(f'Looking for {name}...')
-
- if name is Person.UNKNOWN:
- if self.weird_mac_at_cabin:
- return Location.CABIN
- else:
- return Location.UNKNOWN
-
- import dict_utils
- votes: Dict[Location, int] = {}
- tiebreaks: Dict[Location, datetime.datetime] = {}
- credit = 10000
- for mac in self.devices_by_person[name]:
- if mac not in self.names_by_mac:
- continue
- mac_name = self.names_by_mac[mac]
- logger.debug(f'Looking for {name}... check for mac {mac} ({mac_name})')
- for location in self.location_ts_by_mac:
- if mac in self.location_ts_by_mac[location]:
- ts = (self.location_ts_by_mac[location])[mac]
- logger.debug(f'Seen {mac} ({mac_name}) at {location} since {ts}')
- tiebreaks[location] = ts
-
- (most_recent_location, first_seen_ts) = dict_utils.item_with_max_value(tiebreaks)
- bonus = credit
- v = votes.get(most_recent_location, 0)
- votes[most_recent_location] = v + bonus
- logger.debug(f'{name}: {location} gets {bonus} votes.')
- credit = int(
- credit * 0.2
- ) # Note: list most important devices first
- if credit <= 0:
- credit = 1
- if len(votes) > 0:
- (location, value) = dict_utils.item_with_max_value(votes)
- if value > 2001:
- return location
- return Location.UNKNOWN
-
-
-def main() -> None:
- p = PresenceDetection()
- for person in Person:
- print(f'{person} => {p.where_is_person_now(person)}')
- print()
- for location in Location:
- print(f'{location} => {p.is_anyone_in_location_now(location)}')
-
-
-if __name__ == '__main__':
- main()