#!/usr/bin/env python3 import datetime from collections import defaultdict import enum import logging import re import sys from typing import Dict, List import argparse_utils import bootstrap import config import dict_utils import exec_utils logger = logging.getLogger(__name__) cfg = config.add_commandline_args( f"Presence Detection ({__file__})", "Args related to detection of human beings in locations.", ) cfg.add_argument( "--presence_macs_file", type=argparse_utils.valid_filename, default = "/home/scott/cron/persisted_mac_addresses.txt", metavar="FILENAME", help="The location of persisted_mac_addresses.txt to use." ) class Person(enum.Enum): UNKNOWN = 0 SCOTT = 1 LYNN = 2 ALEX = 3 AARON_AND_DANA = 4 AARON = 4 DANA = 4 @enum.unique class Location(enum.Enum): UNKNOWN = 0 HOUSE = 1 CABIN = 2 class PresenceDetection(object): def __init__(self) -> None: # Note: list most important devices first. self.devices_by_person: Dict[Person, List[str]] = { Person.SCOTT: [ "3C:28:6D:10:6D:41", "D4:61:2E:88:18:09", "6C:40:08:AE:DC:2E", "14:7D:DA:6A:20:D7", ], Person.LYNN: [ "08:CC:27:63:26:14", "B8:31:B5:9A:4F:19", ], Person.ALEX: [ "0C:CB:85:0C:8B:AE", "D0:C6:37:E3:36:9A", ], Person.AARON_AND_DANA: [ "98:B6:E9:E5:5A:7C", "D6:2F:37:CA:B2:9B", "6C:E8:5C:ED:17:26", "90:E1:7B:13:7C:E5", "6E:DC:7C:75:02:1B", "B2:16:1A:93:7D:50", "18:65:90:DA:3A:35", "22:28:C8:7D:3C:85", "B2:95:23:69:91:F8", "96:69:2C:88:7A:C3", ], } self.location_ts_by_mac: Dict[ Location, Dict[str, datetime.datetime] ] = defaultdict(dict) self.names_by_mac: Dict[str, str] = {} persisted_macs = config.config['presence_macs_file'] self.read_persisted_macs_file(persisted_macs, Location.HOUSE) raw = exec_utils.cmd( "ssh scott@meerkat.cabin 'cat /home/scott/cron/persisted_mac_addresses.txt'" ) self.parse_raw_macs_file(raw, Location.CABIN) # os.remove(filename) def read_persisted_macs_file( self, filename: str, location: Location ) -> None: if location is Location.UNKNOWN: return with open(filename, "r") as rf: lines = rf.read() self.parse_raw_macs_file(lines, location) def parse_raw_macs_file(self, raw: str, location: Location) -> None: lines = raw.split("\n") # CC:F4:11:D7:FA:EE, 2240, 10.0.0.22 (side_deck_high_home), Google, 1611681990 for line in lines: line = line.strip() if len(line) == 0: continue logger.debug(f'{location}> {line}') try: (mac, count, ip_name, mfg, ts) = line.split(",") except Exception as e: logger.error(f'SKIPPED BAD LINE> {line}') logger.exception(e) continue mac = mac.strip() (self.location_ts_by_mac[location])[ mac ] = datetime.datetime.fromtimestamp(int(ts.strip())) ip_name = ip_name.strip() match = re.match(r"(\d+\.\d+\.\d+\.\d+) +\(([^\)]+)\)", ip_name) if match is not None: name = match.group(2) self.names_by_mac[mac] = name def is_anyone_in_location_now(self, location: Location) -> bool: for person in Person: if person is not None: loc = self.where_is_person_now(person) if location == loc: return True return False def where_is_person_now(self, name: Person) -> Location: if name is Person.UNKNOWN: return Location.UNKNOWN votes: Dict[Location, int] = {} tiebreaks: Dict[Location, datetime.datetime] = {} credit = 10000 for mac in self.devices_by_person[name]: if mac not in self.names_by_mac: continue for location in self.location_ts_by_mac: if mac in self.location_ts_by_mac[location]: ts = (self.location_ts_by_mac[location])[mac] tiebreaks[location] = ts location = dict_utils.key_with_min_value(tiebreaks) v = votes.get(location, 0) votes[location] = v + credit credit = int( credit * 0.667 ) # Note: list most important devices first if credit == 0: credit = 1 if len(votes) > 0: item = dict_utils.item_with_max_value(votes) return item[0] return Location.UNKNOWN @bootstrap.initialize def main() -> None: config.parse() p = PresenceDetection() for loc in Location: print(f'{loc}: {p.is_anyone_in_location_now(loc)}') for u in Person: print(f'{u}: {p.where_is_person_now(u)}') sys.exit(0) if __name__ == '__main__': main()