#!/usr/bin/env python3 # © Copyright 2021-2022, Scott Gasch """A searchable registry of known smart home devices and a factory for constructing our wrappers around them.""" import logging import re from typing import Dict, List, Optional, Set import argparse_utils import config import file_utils import logical_search from smart_home import cameras, chromecasts, device, lights, outlets args = config.add_commandline_args( f"Smart Home Registry ({__file__})", "Args related to the smart home configuration registry.", ) args.add_argument( '--smart_home_registry_file_location', default='/home/scott/bin/network_mac_addresses.txt', metavar='FILENAME', help='The location of network_mac_addresses.txt', type=argparse_utils.valid_filename, ) logger = logging.getLogger(__name__) class SmartHomeRegistry(object): """A searchable registry of known smart home devices and a factory for constructing our wrappers around them.""" def __init__( self, registry_file: Optional[str] = None, filters: List[str] = ['smart'], ) -> None: self._macs_by_name: Dict[str, str] = {} self._keywords_by_name: Dict[str, str] = {} self._keywords_by_mac: Dict[str, str] = {} self._names_by_mac: Dict[str, str] = {} self._corpus: logical_search.Corpus = logical_search.Corpus() # Read the disk config file... if registry_file is None: registry_file = config.config['smart_home_registry_file_location'] assert file_utils.does_file_exist(registry_file) logger.debug('Reading %s', registry_file) with open(registry_file, "r") as rf: contents = rf.readlines() # Parse the contents... for line in contents: line = line.rstrip("\n") line = re.sub(r"#.*$", r"", line) line = line.strip() if line == "": continue logger.debug('SH-CONFIG> %s', line) try: (mac, name, keywords) = line.split(",") except ValueError: logger.warning('SH-CONFIG> "%s" is malformed?! Skipping it.', line) continue mac = mac.strip() name = name.strip() keywords = keywords.strip() skip = False if filters is not None: for f in filters: if f not in keywords: logger.debug('Skipping this entry b/c of filter: %s', f) skip = True break if not skip: self._macs_by_name[name] = mac self._keywords_by_name[name] = keywords self._keywords_by_mac[mac] = keywords self._names_by_mac[mac] = name self._index_device(name, keywords, mac) def _index_device(self, name: str, keywords: str, mac: str) -> None: properties = [("name", name)] tags = set() for kw in keywords.split(): if ":" in kw: key, value = kw.split(":") properties.append((key, value)) else: tags.add(kw) dev = logical_search.Document( docid=mac, tags=tags, properties=properties, reference=None, ) logger.debug('Indexing document: %s', dev) self._corpus.add_doc(dev) def __repr__(self) -> str: s = "Known devices:\n" for name, keywords in self._keywords_by_name.items(): mac = self._macs_by_name[name] s += f" {name} ({mac}) => {keywords}\n" return s def get_keywords_by_name(self, name: str) -> Optional[str]: """Given the name of a device, get its keywords. >>> reg = SmartHomeRegistry('/home/scott/bin/network_mac_addresses.txt') >>> reg.get_keywords_by_name('near_kitchen_lamp') 'wifi smart light goog meross test' >>> reg.get_keywords_by_name('unknown') is None True """ return self._keywords_by_name.get(name, None) def get_macs_by_name(self, name: str) -> Set[str]: """Given the name of a device, get its MAC address(es) >>> reg = SmartHomeRegistry('/home/scott/bin/network_mac_addresses.txt') >>> reg.get_macs_by_name('near_kitchen_lamp') {'34:29:8F:12:34:8E'} >>> reg.get_macs_by_name('unknown') set() """ retval = set() for (mac, lname) in self._names_by_mac.items(): if name in lname: retval.add(mac) return retval def get_macs_by_keyword(self, keyword: str) -> Set[str]: """Given a keyword, return the set of MAC address(es) that have that keyword. >>> reg = SmartHomeRegistry('/home/scott/bin/network_mac_addresses.txt') >>> r = reg.get_macs_by_keyword('test') >>> e = set(['34:29:8F:12:26:74' , '34:29:8F:12:34:8E']) >>> r == e True >>> reg.get_macs_by_keyword('unknown') set() """ retval = set() for (mac, keywords) in self._keywords_by_mac.items(): if keyword in keywords: retval.add(mac) return retval def get_device_by_name(self, name: str) -> Optional[device.Device]: """Given a name, return its Device object.""" if name in self._macs_by_name: return self.get_device_by_mac(self._macs_by_name[name]) return None def get_all_devices(self) -> List[device.Device]: """Return a list of all known devices.""" retval = [] for mac, _ in self._keywords_by_mac.items(): if mac is not None: dev = self.get_device_by_mac(mac) if dev is not None: retval.append(dev) return retval def get_device_by_mac(self, mac: str) -> Optional[device.Device]: """Given a MAC address, return its Device object.""" if mac in self._keywords_by_mac: name = self._names_by_mac[mac] kws = self._keywords_by_mac[mac] logger.debug('Found %s -> %s (%s)', name, mac, kws) try: if 'light' in kws.lower(): if 'tplink' in kws.lower(): logger.debug(' ...a TPLinkLight') return lights.TPLinkLight(name, mac, kws) elif 'tuya' in kws.lower(): logger.debug(' ...a TuyaLight') return lights.TuyaLight(name, mac, kws) elif 'goog' in kws.lower(): logger.debug(' ...a GoogleLight') return lights.GoogleLight(name, mac, kws) else: raise Exception(f'Unknown light device: {name}, {mac}, {kws}') elif 'outlet' in kws.lower(): if 'tplink' in kws.lower(): if 'children' in kws.lower(): logger.debug(' ...a TPLinkOutletWithChildren') return outlets.TPLinkOutletWithChildren(name, mac, kws) else: logger.debug(' ...a TPLinkOutlet') return outlets.TPLinkOutlet(name, mac, kws) elif 'meross' in kws.lower(): logger.debug(' ...a MerossOutlet') return outlets.MerossOutlet(name, mac, kws) elif 'goog' in kws.lower(): logger.debug(' ...a GoogleOutlet') return outlets.GoogleOutlet(name, mac, kws) else: raise Exception(f'Unknown outlet device: {name}, {mac}, {kws}') elif 'camera' in kws.lower(): logger.debug(' ...a BaseCamera') return cameras.BaseCamera(name, mac, kws) elif 'ccast' in kws.lower(): logger.debug(' ...a Chromecast') return chromecasts.BaseChromecast(name, mac, kws) else: logger.debug(' ...an unknown device (should this be here?)') return device.Device(name, mac, kws) except Exception as e: logger.exception(e) logger.debug( 'Device %s at %s with %s confused me; returning a generic Device', name, mac, kws, ) return device.Device(name, mac, kws) logger.warning('%s is not a known smart home device, returning None', mac) return None def query(self, query: str) -> List[device.Device]: """Evaluates a device query expression formed of keywords to search for, logical operators (and, or, not), and parenthesis. Returns a list of matching lights. """ retval = [] logger.debug('Executing query: %s', query) results = self._corpus.query(query) if results is not None: for mac in results: if mac is not None: dev = self.get_device_by_mac(mac) if dev is not None: retval.append(dev) return retval if __name__ == '__main__': import doctest doctest.testmod()