--- /dev/null
+#!/usr/bin/env python3
+
+import logging
+import re
+from typing import List, Optional, Set
+
+import argparse_utils
+import config
+import file_utils
+import logical_search
+import smart_home.device as device
+import smart_home.cameras as cameras
+import smart_home.chromecasts as chromecasts
+import smart_home.lights as lights
+import smart_home.outlets as outlets
+
+args = config.add_commandline_args(
+ f"Smart Home Registry ({__file__})",
+ "Args related to the smart home configuration registry."
+)
+args.add_argument(
+ '--smart_home_registry_file_location',
+ default='/home/scott/bin/network_mac_addresses.txt',
+ metavar='FILENAME',
+ help='The location of network_mac_addresses.txt',
+ type=argparse_utils.valid_filename,
+)
+
+
+logger = logging.getLogger(__file__)
+
+
+class SmartHomeRegistry(object):
+ def __init__(
+ self,
+ registry_file: Optional[str] = None,
+ filters: List[str] = ['smart'],
+ ) -> None:
+ self._macs_by_name = {}
+ self._keywords_by_name = {}
+ self._keywords_by_mac = {}
+ self._names_by_mac = {}
+ self._corpus = logical_search.Corpus()
+
+ # Read the disk config file...
+ if registry_file is None:
+ registry_file = config.config[
+ 'smart_home_registry_file_location'
+ ]
+ assert file_utils.does_file_exist(registry_file)
+ logger.debug(f'Reading {registry_file}')
+ with open(registry_file, "r") as f:
+ contents = f.readlines()
+
+ # Parse the contents...
+ for line in contents:
+ line = line.rstrip("\n")
+ line = re.sub(r"#.*$", r"", line)
+ line = line.strip()
+ if line == "":
+ continue
+ logger.debug(f'SH-CONFIG> {line}')
+ (mac, name, keywords) = line.split(",")
+ mac = mac.strip()
+ name = name.strip()
+ keywords = keywords.strip()
+
+ skip = False
+ if filters is not None:
+ for f in filters:
+ if f not in keywords:
+ logger.debug(f'Skipping this entry b/c of filter {f}')
+ skip = True
+ break
+ if not skip:
+ self._macs_by_name[name] = mac
+ self._keywords_by_name[name] = keywords
+ self._keywords_by_mac[mac] = keywords
+ self._names_by_mac[mac] = name
+ self.index_device(name, keywords, mac)
+
+ def index_device(self, name: str, keywords: str, mac: str) -> None:
+ properties = [("name", name)]
+ tags = set()
+ for kw in keywords.split():
+ if ":" in kw:
+ key, value = kw.split(":")
+ properties.append((key, value))
+ else:
+ tags.add(kw)
+ device = logical_search.Document(
+ docid=mac,
+ tags=tags,
+ properties=properties,
+ reference=None,
+ )
+ logger.debug(f'Indexing document {device}')
+ self._corpus.add_doc(device)
+
+ def __repr__(self) -> str:
+ s = "Known devices:\n"
+ for name, keywords in self._keywords_by_name.items():
+ mac = self._macs_by_name[name]
+ s += f" {name} ({mac}) => {keywords}\n"
+ return s
+
+ def get_keywords_by_name(self, name: str) -> Optional[device.Device]:
+ return self._keywords_by_name.get(name, None)
+
+ def get_macs_by_name(self, name: str) -> Set[str]:
+ retval = set()
+ for (mac, lname) in self._names_by_mac.items():
+ if name in lname:
+ retval.add(mac)
+ return retval
+
+ def get_macs_by_keyword(self, keyword: str) -> Set[str]:
+ retval = set()
+ for (mac, keywords) in self._keywords_by_mac.items():
+ if keyword in keywords:
+ retval.add(mac)
+ return retval
+
+ def get_device_by_name(self, name: str) -> Optional[device.Device]:
+ if name in self._macs_by_name:
+ return self.get_device_by_mac(self._macs_by_name[name])
+ return None
+
+ def get_all_devices(self) -> List[device.Device]:
+ retval = []
+ for (mac, kws) in self._keywords_by_mac.items():
+ if mac is not None:
+ device = self.get_device_by_mac(mac)
+ if device is not None:
+ retval.append(device)
+ return retval
+
+ def get_device_by_mac(self, mac: str) -> Optional[device.Device]:
+ if mac in self._keywords_by_mac:
+ name = self._names_by_mac[mac]
+ kws = self._keywords_by_mac[mac]
+ logger.debug(f'Found {name} -> {mac} ({kws})')
+ try:
+ if 'light' in kws.lower():
+ if 'tplink' in kws.lower():
+ logger.debug(' ...a TPLinkLight')
+ return lights.TPLinkLight(name, mac, kws)
+ elif 'tuya' in kws.lower():
+ logger.debug(' ...a TuyaLight')
+ return lights.TuyaLight(name, mac, kws)
+ elif 'goog' in kws.lower():
+ logger.debug(' ...a GoogleLight')
+ return lights.GoogleLight(name, mac, kws)
+ else:
+ raise Exception(f'Unknown light device: {name}, {mac}, {kws}')
+ elif 'outlet' in kws.lower():
+ if 'tplink' in kws.lower():
+ if 'children' in kws.lower():
+ logger.debug(' ...a TPLinkOutletWithChildren')
+ return outlets.TPLinkOutletWithChildren(name, mac, kws)
+ else:
+ logger.debug(' ...a TPLinkOutlet')
+ return outlets.TPLinkOutlet(name, mac, kws)
+ elif 'goog' in kws.lower():
+ logger.debug(' ...a GoogleOutlet')
+ return outlets.GoogleOutlet(name, mac, kws)
+ else:
+ raise Exception(f'Unknown outlet device: {name}, {mac}, {kws}')
+ elif 'camera' in kws.lower():
+ logger.debug(' ...a BaseCamera')
+ return cameras.BaseCamera(name, mac, kws)
+ elif 'ccast' in kws.lower():
+ logger.debug(' ...a Chromecast')
+ return chromecasts.BaseChromecast(name, mac, kws)
+ else:
+ logger.debug(' ...an unknown device (should this be here?)')
+ return device.Device(name, mac, kws)
+ except Exception as e:
+ logger.warning(
+ f'Got exception {e} while trying to communicate with device {name}/{mac}.'
+ )
+ return device.Device(name, mac, kws)
+ logger.warning(f'{mac} is not a known smart home device, returning None')
+ return None
+
+ def query(self, query: str) -> List[device.Device]:
+ """Evaluates a lighting query expression formed of keywords to search
+ for, logical operators (and, or, not), and parenthesis.
+ Returns a list of matching lights.
+ """
+ retval = []
+ logger.debug(f'Executing query {query}')
+ results = self._corpus.query(query)
+ if results is not None:
+ for mac in results:
+ if mac is not None:
+ device = self.get_device_by_mac(mac)
+ if device is not None:
+ retval.append(device)
+ return retval