Ugh, a bunch of things. @overrides. --lmodule. Chromecasts. etc...
[python_utils.git] / smart_home / registry.py
diff --git a/smart_home/registry.py b/smart_home/registry.py
new file mode 100644 (file)
index 0000000..2d23981
--- /dev/null
@@ -0,0 +1,200 @@
+#!/usr/bin/env python3
+
+import logging
+import re
+from typing import List, Optional, Set
+
+import argparse_utils
+import config
+import file_utils
+import logical_search
+import smart_home.device as device
+import smart_home.cameras as cameras
+import smart_home.chromecasts as chromecasts
+import smart_home.lights as lights
+import smart_home.outlets as outlets
+
+args = config.add_commandline_args(
+    f"Smart Home Registry ({__file__})",
+    "Args related to the smart home configuration registry."
+)
+args.add_argument(
+    '--smart_home_registry_file_location',
+    default='/home/scott/bin/network_mac_addresses.txt',
+    metavar='FILENAME',
+    help='The location of network_mac_addresses.txt',
+    type=argparse_utils.valid_filename,
+)
+
+
+logger = logging.getLogger(__file__)
+
+
+class SmartHomeRegistry(object):
+    def __init__(
+            self,
+            registry_file: Optional[str] = None,
+            filters: List[str] = ['smart'],
+    ) -> None:
+        self._macs_by_name = {}
+        self._keywords_by_name = {}
+        self._keywords_by_mac = {}
+        self._names_by_mac = {}
+        self._corpus = logical_search.Corpus()
+
+        # Read the disk config file...
+        if registry_file is None:
+            registry_file = config.config[
+                'smart_home_registry_file_location'
+            ]
+        assert file_utils.does_file_exist(registry_file)
+        logger.debug(f'Reading {registry_file}')
+        with open(registry_file, "r") as f:
+            contents = f.readlines()
+
+        # Parse the contents...
+        for line in contents:
+            line = line.rstrip("\n")
+            line = re.sub(r"#.*$", r"", line)
+            line = line.strip()
+            if line == "":
+                continue
+            logger.debug(f'SH-CONFIG> {line}')
+            (mac, name, keywords) = line.split(",")
+            mac = mac.strip()
+            name = name.strip()
+            keywords = keywords.strip()
+
+            skip = False
+            if filters is not None:
+                for f in filters:
+                    if f not in keywords:
+                        logger.debug(f'Skipping this entry b/c of filter {f}')
+                        skip = True
+                        break
+            if not skip:
+                self._macs_by_name[name] = mac
+                self._keywords_by_name[name] = keywords
+                self._keywords_by_mac[mac] = keywords
+                self._names_by_mac[mac] = name
+                self.index_device(name, keywords, mac)
+
+    def index_device(self, name: str, keywords: str, mac: str) -> None:
+        properties = [("name", name)]
+        tags = set()
+        for kw in keywords.split():
+            if ":" in kw:
+                key, value = kw.split(":")
+                properties.append((key, value))
+            else:
+                tags.add(kw)
+        device = logical_search.Document(
+            docid=mac,
+            tags=tags,
+            properties=properties,
+            reference=None,
+        )
+        logger.debug(f'Indexing document {device}')
+        self._corpus.add_doc(device)
+
+    def __repr__(self) -> str:
+        s = "Known devices:\n"
+        for name, keywords in self._keywords_by_name.items():
+            mac = self._macs_by_name[name]
+            s += f"  {name} ({mac}) => {keywords}\n"
+        return s
+
+    def get_keywords_by_name(self, name: str) -> Optional[device.Device]:
+        return self._keywords_by_name.get(name, None)
+
+    def get_macs_by_name(self, name: str) -> Set[str]:
+        retval = set()
+        for (mac, lname) in self._names_by_mac.items():
+            if name in lname:
+                retval.add(mac)
+        return retval
+
+    def get_macs_by_keyword(self, keyword: str) -> Set[str]:
+        retval = set()
+        for (mac, keywords) in self._keywords_by_mac.items():
+            if keyword in keywords:
+                retval.add(mac)
+        return retval
+
+    def get_device_by_name(self, name: str) -> Optional[device.Device]:
+        if name in self._macs_by_name:
+            return self.get_device_by_mac(self._macs_by_name[name])
+        return None
+
+    def get_all_devices(self) -> List[device.Device]:
+        retval = []
+        for (mac, kws) in self._keywords_by_mac.items():
+            if mac is not None:
+                device = self.get_device_by_mac(mac)
+                if device is not None:
+                    retval.append(device)
+        return retval
+
+    def get_device_by_mac(self, mac: str) -> Optional[device.Device]:
+        if mac in self._keywords_by_mac:
+            name = self._names_by_mac[mac]
+            kws = self._keywords_by_mac[mac]
+            logger.debug(f'Found {name} -> {mac} ({kws})')
+            try:
+                if 'light' in kws.lower():
+                    if 'tplink' in kws.lower():
+                        logger.debug('    ...a TPLinkLight')
+                        return lights.TPLinkLight(name, mac, kws)
+                    elif 'tuya' in kws.lower():
+                        logger.debug('    ...a TuyaLight')
+                        return lights.TuyaLight(name, mac, kws)
+                    elif 'goog' in kws.lower():
+                        logger.debug('    ...a GoogleLight')
+                        return lights.GoogleLight(name, mac, kws)
+                    else:
+                        raise Exception(f'Unknown light device: {name}, {mac}, {kws}')
+                elif 'outlet' in kws.lower():
+                    if 'tplink' in kws.lower():
+                        if 'children' in kws.lower():
+                            logger.debug('    ...a TPLinkOutletWithChildren')
+                            return outlets.TPLinkOutletWithChildren(name, mac, kws)
+                        else:
+                            logger.debug('    ...a TPLinkOutlet')
+                            return outlets.TPLinkOutlet(name, mac, kws)
+                    elif 'goog' in kws.lower():
+                        logger.debug('    ...a GoogleOutlet')
+                        return outlets.GoogleOutlet(name, mac, kws)
+                    else:
+                        raise Exception(f'Unknown outlet device: {name}, {mac}, {kws}')
+                elif 'camera' in kws.lower():
+                    logger.debug('    ...a BaseCamera')
+                    return cameras.BaseCamera(name, mac, kws)
+                elif 'ccast' in kws.lower():
+                    logger.debug('    ...a Chromecast')
+                    return chromecasts.BaseChromecast(name, mac, kws)
+                else:
+                    logger.debug('    ...an unknown device (should this be here?)')
+                    return device.Device(name, mac, kws)
+            except Exception as e:
+                logger.warning(
+                    f'Got exception {e} while trying to communicate with device {name}/{mac}.'
+                )
+                return device.Device(name, mac, kws)
+        logger.warning(f'{mac} is not a known smart home device, returning None')
+        return None
+
+    def query(self, query: str) -> List[device.Device]:
+        """Evaluates a lighting query expression formed of keywords to search
+        for, logical operators (and, or, not), and parenthesis.
+        Returns a list of matching lights.
+        """
+        retval = []
+        logger.debug(f'Executing query {query}')
+        results = self._corpus.query(query)
+        if results is not None:
+            for mac in results:
+                if mac is not None:
+                    device = self.get_device_by_mac(mac)
+                    if device is not None:
+                        retval.append(device)
+        return retval