#!/usr/bin/env python3
+# © Copyright 2021-2022, Scott Gasch
+
+"""This is a module dealing with trying to guess a person's location
+based on the location of certain devices (e.g. phones, laptops)
+belonging to that person. It works with networks I run that log
+device MAC addresses active.
+
+"""
+
import datetime
-from collections import defaultdict
import logging
import re
-from typing import Dict, List, Set
import warnings
+from collections import defaultdict
+from typing import Dict, List, Optional, Set
# Note: this module is fairly early loaded. Be aware of dependencies.
import argparse_utils
import bootstrap
import config
+import site_config
from type.locations import Location
from type.people import Person
-import site_config
-
logger = logging.getLogger(__name__)
)
cfg.add_argument(
"--presence_macs_file",
- type=argparse_utils.valid_filename,
- default = "/home/scott/cron/persisted_mac_addresses.txt",
+ type=str,
+ default="/home/scott/cron/persisted_mac_addresses.txt",
metavar="FILENAME",
- help="The location of persisted_mac_addresses.txt to use."
+ help="The location of persisted_mac_addresses.txt to use.",
)
cfg.add_argument(
'--presence_tolerable_staleness_seconds',
type=argparse_utils.valid_duration,
default=datetime.timedelta(seconds=60 * 5),
metavar='DURATION',
- help='Max acceptable age of location data before auto-refreshing'
+ help='Max acceptable age of location data before auto-refreshing',
)
class PresenceDetection(object):
+ """This is a module dealing with trying to guess a person's location
+ based on the location of certain devices (e.g. phones, laptops)
+ belonging to that person. It works with networks I run that log
+ device MAC addresses active.
+ """
+
def __init__(self) -> None:
+ """C'tor"""
+
# Note: list most important devices first.
self.devices_by_person: Dict[Person, List[str]] = {
Person.SCOTT: [
- "DC:E5:5B:0F:03:3D", # pixel6
- "6C:40:08:AE:DC:2E", # laptop
+ "DC:E5:5B:0F:03:3D", # pixel6
+ "6C:40:08:AE:DC:2E", # laptop
],
Person.LYNN: [
- "08:CC:27:63:26:14", # motog7
- "B8:31:B5:9A:4F:19", # laptop
+ "08:CC:27:63:26:14", # motog7
+ "B8:31:B5:9A:4F:19", # laptop
],
Person.ALEX: [
- "0C:CB:85:0C:8B:AE", # phone
- "D0:C6:37:E3:36:9A", # laptop
+ "0C:CB:85:0C:8B:AE", # phone
+ "D0:C6:37:E3:36:9A", # laptop
],
Person.AARON_AND_DANA: [
"98:B6:E9:E5:5A:7C",
],
}
self.run_location = site_config.get_location()
- logger.debug(f"run_location is {self.run_location}")
+ logger.debug("base_presence run_location is %s", self.run_location)
self.weird_mac_at_cabin = False
- self.location_ts_by_mac: Dict[
- Location, Dict[str, datetime.datetime]
- ] = defaultdict(dict)
+ self.location_ts_by_mac: Dict[Location, Dict[str, datetime.datetime]] = defaultdict(dict)
self.names_by_mac: Dict[str, str] = {}
self.dark_locations: Set[Location] = set()
- self.last_update = None
+ self.last_update: Optional[datetime.datetime] = None
def maybe_update(self) -> None:
+ """Determine if our state is stale and needs to be updated and do
+ it, if so.
+ """
+
if self.last_update is None:
self.update()
else:
now = datetime.datetime.now()
delta = now - self.last_update
- if delta.total_seconds() > config.config['presence_tolerable_staleness_seconds'].total_seconds():
+ if (
+ delta.total_seconds()
+ > config.config['presence_tolerable_staleness_seconds'].total_seconds()
+ ):
logger.debug(
- f"It's been {delta.total_seconds()}s since last update; refreshing now."
+ "It's been %ss since last update; refreshing now.", delta.total_seconds()
)
self.update()
def update(self) -> None:
+ """Unconditionally update our state."""
+
self.dark_locations = set()
- if self.run_location is Location.HOUSE:
- self.update_from_house()
- elif self.run_location is Location.CABIN:
- self.update_from_cabin()
- else:
- raise Exception("Where the hell is this running?!")
+ self._update_house()
+ self._update_cabin()
self.last_update = datetime.datetime.now()
- def update_from_house(self) -> None:
+ @staticmethod
+ def _get_raw_data_via_ssh(location: Location) -> Optional[str]:
from exec_utils import cmd
+
+ canonical = {
+ }
try:
- persisted_macs = config.config['presence_macs_file']
- except KeyError:
- persisted_macs = '/home/scott/cron/persisted_mac_addresses.txt'
- self.read_persisted_macs_file(persisted_macs, Location.HOUSE)
- try:
- raw = cmd(
- timeout_seconds=10.0,
+ return cmd(
+ f"ssh {canonical[location]} 'cat /home/scott/cron/persisted_mac_addresses.txt'",
+ timeout_seconds=30.0,
)
- self.parse_raw_macs_file(raw, Location.CABIN)
- except Exception as e:
- logger.exception(e)
- msg = "Can't see the cabin right now; presence detection impared."
- logger.warning(msg)
- warnings.warn(msg)
- self.dark_locations.add(Location.CABIN)
+ except Exception:
+ return None
- def update_from_cabin(self) -> None:
- from exec_utils import cmd
- try:
- persisted_macs = config.config['presence_macs_file']
- except KeyError:
- persisted_macs = '/home/scott/cron/persisted_mac_addresses.txt'
- self.read_persisted_macs_file(persisted_macs, Location.CABIN)
- try:
- raw = cmd(
- timeout_seconds=10.0,
+ def _get_raw_data(self, location: Location) -> Optional[str]:
+ from os.path import exists
+
+ if self.run_location == location:
+ persisted_macs = config.config.get(
+ 'presence_macs_file', '/home/scott/cron/persisted_mac_addresses.txt'
)
- self.parse_raw_macs_file(raw, Location.HOUSE)
- except Exception as e:
- logger.exception(e)
+ if exists(persisted_macs):
+ with open(persisted_macs, 'r') as rf:
+ return rf.read()
+ else:
+ return PresenceDetection._get_raw_data_via_ssh(location)
+ else:
+ return PresenceDetection._get_raw_data_via_ssh(location)
+ return None
+
+ def _update_house(self) -> None:
+ data = self._get_raw_data(Location.HOUSE)
+ if data:
+ self._parse_raw_macs_file(data, Location.HOUSE)
+ else:
msg = "Can't see the house right now; presence detection impared."
- logger.warning(msg)
warnings.warn(msg)
+ logger.warning(msg, stacklevel=2)
self.dark_locations.add(Location.HOUSE)
- def read_persisted_macs_file(
- self, filename: str, location: Location
- ) -> None:
- if location is Location.UNKNOWN:
- return
- with open(filename, "r") as rf:
- lines = rf.read()
- self.parse_raw_macs_file(lines, location)
+ def _update_cabin(self) -> None:
+ data = self._get_raw_data(Location.CABIN)
+ if data:
+ self._parse_raw_macs_file(data, Location.CABIN)
+ else:
+ msg = "Can't see the cabin right now; presence detection impared."
+ warnings.warn(msg)
+ logger.warning(msg, stacklevel=2)
+ self.dark_locations.add(Location.CABIN)
+
+ def _parse_raw_macs_file(self, raw: str, location: Location) -> None:
+ """Internal method that parses the contents of the MACs file."""
- def parse_raw_macs_file(self, raw: str, location: Location) -> None:
lines = raw.split("\n")
# CC:F4:11:D7:FA:EE, 2240, 10.0.0.22 (side_deck_high_home), Google, 1611681990
cabin_count = 0
for line in lines:
line = line.strip()
+ if 'using fake authentication data for X11' in line:
+ continue
if len(line) == 0:
continue
- logger.debug(f'{location}> {line}')
+ logger.debug('%s> %s', location, line)
if "cabin_" in line:
continue
if location == Location.CABIN:
- logger.debug('Cabin count: {cabin_count}')
+ logger.debug('Cabin count: %d', cabin_count)
cabin_count += 1
try:
- (mac, count, ip_name, mfg, ts) = line.split(",")
+ (mac, _, ip_name, _, ts) = line.split(",") # type: ignore
except Exception as e:
- logger.error(f'SKIPPED BAD LINE> {line}')
logger.exception(e)
+ logger.error('SKIPPED BAD LINE> %s', line)
continue
mac = mac.strip()
- (self.location_ts_by_mac[location])[
- mac
- ] = datetime.datetime.fromtimestamp(int(ts.strip()))
+ (self.location_ts_by_mac[location])[mac] = datetime.datetime.fromtimestamp(
+ int(ts.strip())
+ )
ip_name = ip_name.strip()
match = re.match(r"(\d+\.\d+\.\d+\.\d+) +\(([^\)]+)\)", ip_name)
if match is not None:
self.weird_mac_at_cabin = True
def is_anyone_in_location_now(self, location: Location) -> bool:
+ """Determine if anyone is in a given location based on the presence of
+ MAC files seen recently on the network.
+
+ Args:
+ location: the location in question
+
+ Returns:
+ True if someone is detected or False otherwise.
+ """
+
self.maybe_update()
if location in self.dark_locations:
raise Exception(f"Can't see {location} right now; answer undefined.")
return False
def where_is_person_now(self, name: Person) -> Location:
+ """Given a person, see if we can determine their location based on
+ network MAC addresses.
+
+ Args:
+ name: The person we're looking for.
+
+ Returns:
+ The Location where we think they are (including UNKNOWN).
+ """
+
self.maybe_update()
if len(self.dark_locations) > 0:
msg = f"Can't see {self.dark_locations} right now; answer confidence impacted"
logger.warning(msg)
- warnings.warn(msg)
- logger.debug(f'Looking for {name}...')
+ warnings.warn(msg, stacklevel=2)
+ logger.debug('Looking for %s...', name)
if name is Person.UNKNOWN:
if self.weird_mac_at_cabin:
return Location.UNKNOWN
import dict_utils
+
votes: Dict[Location, int] = {}
tiebreaks: Dict[Location, datetime.datetime] = {}
credit = 10000
+ location = None
for mac in self.devices_by_person[name]:
if mac not in self.names_by_mac:
continue
mac_name = self.names_by_mac[mac]
- logger.debug(f'Looking for {name}... check for mac {mac} ({mac_name})')
+ logger.debug('Looking for %s... check for mac %s (%s)', name, mac, mac_name)
for location in self.location_ts_by_mac:
if mac in self.location_ts_by_mac[location]:
ts = (self.location_ts_by_mac[location])[mac]
- logger.debug(f'Seen {mac} ({mac_name}) at {location} since {ts}')
+ logger.debug('Seen %s (%s) at %s since %s', mac, mac_name, location, ts)
tiebreaks[location] = ts
- (most_recent_location, first_seen_ts) = dict_utils.item_with_max_value(tiebreaks)
+ (
+ most_recent_location,
+ _,
+ ) = dict_utils.item_with_max_value(tiebreaks)
bonus = credit
v = votes.get(most_recent_location, 0)
votes[most_recent_location] = v + bonus
- logger.debug(f'{name}: {location} gets {bonus} votes.')
- credit = int(
- credit * 0.2
- ) # Note: list most important devices first
+ logger.debug('%s: %s gets %d votes.', name, most_recent_location, bonus)
+ credit = int(credit * 0.2) # Note: list most important devices first
if credit <= 0:
credit = 1
if len(votes) > 0:
(location, value) = dict_utils.item_with_max_value(votes)
if value > 2001:
+ assert location
return location
return Location.UNKNOWN
for person in Person:
print(f'{person} => {p.where_is_person_now(person)}')
print()
-# for location in Location:
-# print(f'{location} => {p.is_anyone_in_location_now(location)}')
+ for location in Location:
+ print(f'{location} => {p.is_anyone_in_location_now(location)}')
if __name__ == '__main__':