#!/usr/bin/env python3 import logging import re from typing import List, Optional, Set import argparse_utils import config import file_utils import logical_search import smart_home.cameras as cameras import smart_home.chromecasts as chromecasts import smart_home.device as device import smart_home.lights as lights import smart_home.outlets as outlets args = config.add_commandline_args( f"Smart Home Registry ({__file__})", "Args related to the smart home configuration registry.", ) args.add_argument( '--smart_home_registry_file_location', default='/home/scott/bin/network_mac_addresses.txt', metavar='FILENAME', help='The location of network_mac_addresses.txt', type=argparse_utils.valid_filename, ) logger = logging.getLogger(__file__) class SmartHomeRegistry(object): def __init__( self, registry_file: Optional[str] = None, filters: List[str] = ['smart'], ) -> None: self._macs_by_name = {} self._keywords_by_name = {} self._keywords_by_mac = {} self._names_by_mac = {} self._corpus = logical_search.Corpus() # Read the disk config file... if registry_file is None: registry_file = config.config['smart_home_registry_file_location'] assert file_utils.does_file_exist(registry_file) logger.debug(f'Reading {registry_file}') with open(registry_file, "r") as rf: contents = rf.readlines() # Parse the contents... for line in contents: line = line.rstrip("\n") line = re.sub(r"#.*$", r"", line) line = line.strip() if line == "": continue logger.debug(f'SH-CONFIG> {line}') try: (mac, name, keywords) = line.split(",") except ValueError: msg = f'SH-CONFIG> "{line}" is malformed?! Skipping it.' logger.warning(msg) continue mac = mac.strip() name = name.strip() keywords = keywords.strip() skip = False if filters is not None: for f in filters: if f not in keywords: logger.debug(f'Skipping this entry b/c of filter {f}') skip = True break if not skip: self._macs_by_name[name] = mac self._keywords_by_name[name] = keywords self._keywords_by_mac[mac] = keywords self._names_by_mac[mac] = name self.index_device(name, keywords, mac) def index_device(self, name: str, keywords: str, mac: str) -> None: properties = [("name", name)] tags = set() for kw in keywords.split(): if ":" in kw: key, value = kw.split(":") properties.append((key, value)) else: tags.add(kw) device = logical_search.Document( docid=mac, tags=tags, properties=properties, reference=None, ) logger.debug(f'Indexing document {device}') self._corpus.add_doc(device) def __repr__(self) -> str: s = "Known devices:\n" for name, keywords in self._keywords_by_name.items(): mac = self._macs_by_name[name] s += f" {name} ({mac}) => {keywords}\n" return s def get_keywords_by_name(self, name: str) -> Optional[device.Device]: return self._keywords_by_name.get(name, None) def get_macs_by_name(self, name: str) -> Set[str]: retval = set() for (mac, lname) in self._names_by_mac.items(): if name in lname: retval.add(mac) return retval def get_macs_by_keyword(self, keyword: str) -> Set[str]: retval = set() for (mac, keywords) in self._keywords_by_mac.items(): if keyword in keywords: retval.add(mac) return retval def get_device_by_name(self, name: str) -> Optional[device.Device]: if name in self._macs_by_name: return self.get_device_by_mac(self._macs_by_name[name]) return None def get_all_devices(self) -> List[device.Device]: retval = [] for (mac, kws) in self._keywords_by_mac.items(): if mac is not None: device = self.get_device_by_mac(mac) if device is not None: retval.append(device) return retval def get_device_by_mac(self, mac: str) -> Optional[device.Device]: if mac in self._keywords_by_mac: name = self._names_by_mac[mac] kws = self._keywords_by_mac[mac] logger.debug(f'Found {name} -> {mac} ({kws})') try: if 'light' in kws.lower(): if 'tplink' in kws.lower(): logger.debug(' ...a TPLinkLight') return lights.TPLinkLight(name, mac, kws) elif 'tuya' in kws.lower(): logger.debug(' ...a TuyaLight') return lights.TuyaLight(name, mac, kws) elif 'goog' in kws.lower(): logger.debug(' ...a GoogleLight') return lights.GoogleLight(name, mac, kws) else: raise Exception(f'Unknown light device: {name}, {mac}, {kws}') elif 'outlet' in kws.lower(): if 'tplink' in kws.lower(): if 'children' in kws.lower(): logger.debug(' ...a TPLinkOutletWithChildren') return outlets.TPLinkOutletWithChildren(name, mac, kws) else: logger.debug(' ...a TPLinkOutlet') return outlets.TPLinkOutlet(name, mac, kws) elif 'meross' in kws.lower(): logger.debug(' ...a MerossOutlet') return outlets.MerossOutlet(name, mac, kws) elif 'goog' in kws.lower(): logger.debug(' ...a GoogleOutlet') return outlets.GoogleOutlet(name, mac, kws) else: raise Exception(f'Unknown outlet device: {name}, {mac}, {kws}') elif 'camera' in kws.lower(): logger.debug(' ...a BaseCamera') return cameras.BaseCamera(name, mac, kws) elif 'ccast' in kws.lower(): logger.debug(' ...a Chromecast') return chromecasts.BaseChromecast(name, mac, kws) else: logger.debug(' ...an unknown device (should this be here?)') return device.Device(name, mac, kws) except Exception as e: logger.exception(e) logger.debug( f'Device {name} at {mac} with {kws} confused me, returning a generic Device' ) return device.Device(name, mac, kws) msg = f'{mac} is not a known smart home device, returning None' logger.warning(msg) return None def query(self, query: str) -> List[device.Device]: """Evaluates a lighting query expression formed of keywords to search for, logical operators (and, or, not), and parenthesis. Returns a list of matching lights. """ retval = [] logger.debug(f'Executing query {query}') results = self._corpus.query(query) if results is not None: for mac in results: if mac is not None: device = self.get_device_by_mac(mac) if device is not None: retval.append(device) return retval