Since this thing is on the innerwebs I suppose it should have a
[python_utils.git] / smart_home / lights.py
index 6ca6e71db2329a65402116a5cf769048023521f6..ee23fb08f4e60ef4664338cde7e53bdefd340544 100644 (file)
@@ -1,80 +1,65 @@
 #!/usr/bin/env python3
 
+# © Copyright 2021-2022, Scott Gasch
+
 """Utilities for dealing with the smart lights."""
 
-from abc import ABC, abstractmethod
 import datetime
-import json
 import logging
-import os
 import re
-import subprocess
-import sys
-from typing import Any, Dict, List, Optional, Set
+from abc import abstractmethod
+from typing import Any, Dict, List, Optional, Tuple
 
 import tinytuya as tt
+from overrides import overrides
 
+import ansi
 import argparse_utils
+import arper
 import config
-import logging_utils
-import logical_search
-from google_assistant import ask_google, GoogleResponse
-from decorator_utils import timeout, memoized
+import smart_home.device as dev
+import smart_home.tplink_utils as tplink
+from decorator_utils import memoized
+from google_assistant import GoogleResponse, ask_google
 
 logger = logging.getLogger(__name__)
 
-parser = config.add_commandline_args(
-    f"Light Utils ({__file__})",
-    "Args related to light utilities."
+args = config.add_commandline_args(
+    f"Smart Lights ({__file__})",
+    "Args related to smart lights.",
 )
-parser.add_argument(
-    '--light_utils_tplink_location',
+args.add_argument(
+    '--smart_lights_tplink_location',
     default='/home/scott/bin/tplink.py',
     metavar='FILENAME',
     help='The location of the tplink.py helper',
     type=argparse_utils.valid_filename,
 )
-parser.add_argument(
-    '--light_utils_network_mac_addresses_location',
-    default='/home/scott/bin/network_mac_addresses.txt',
-    metavar='FILENAME',
-    help='The location of network_mac_addresses.txt',
-    type=argparse_utils.valid_filename,
-)
-
 
-@timeout(
-    5.0, use_signals=False, error_message="Timed out waiting for tplink.py"
-)
-def tplink_light_command(command: str) -> bool:
-    result = os.system(command)
-    signal = result & 0xFF
-    if signal != 0:
-        logger.warning(f'{command} died with signal {signal}')
-        logging_utils.hlog("%s died with signal %d" % (command, signal))
-        return False
-    else:
-        exit_value = result >> 8
-        if exit_value != 0:
-            logger.warning(f'{command} failed, exited {exit_value}')
-            logging_utils.hlog("%s failed, exit %d" % (command, exit_value))
-            return False
-    logger.debug(f'{command} succeeded.')
-    return True
 
+class BaseLight(dev.Device):
+    """A base class representing a smart light."""
 
-class Light(ABC):
     def __init__(self, name: str, mac: str, keywords: str = "") -> None:
-        self.name = name.strip()
-        self.mac = mac.strip()
-        self.keywords = keywords.strip()
-        self.kws = keywords.split()
+        super().__init__(name.strip(), mac.strip(), keywords)
 
-    def get_name(self) -> str:
-        return self.name
+    @staticmethod
+    def parse_color_string(color: str) -> Optional[Tuple[int, int, int]]:
+        m = re.match(
+            'r#?([0-9a-fA-F][0-9a-fA-F])([0-9a-fA-F][0-9a-fA-F])([0-9a-fA-F][0-9a-fA-F])',
+            color,
+        )
+        if m is not None and len(m.groups()) == 3:
+            red = int(m.group(0), 16)
+            green = int(m.group(1), 16)
+            blue = int(m.group(2), 16)
+            return (red, green, blue)
+        color = color.lower()
+        return ansi.COLOR_NAMES_TO_RGB.get(color, None)
 
-    def get_mac(self) -> str:
-        return self.mac
+    @abstractmethod
+    def status(self) -> str:
+        pass
 
     @abstractmethod
     def turn_on(self) -> bool:
@@ -104,19 +89,9 @@ class Light(ABC):
     def make_color(self, color: str) -> bool:
         pass
 
-    def get_keywords(self) -> List[str]:
-        return self.kws
 
-    def has_keyword(self, keyword: str) -> bool:
-        for kw in self.kws:
-            if kw == keyword:
-                return True
-        return False
-
-
-class GoogleLight(Light):
-    def __init__(self, name: str, mac: str, keywords: str = "") -> None:
-        super().__init__(name, mac, keywords)
+class GoogleLight(BaseLight):
+    """A smart light controlled by talking to Google."""
 
     def goog_name(self) -> str:
         name = self.get_name()
@@ -126,25 +101,34 @@ class GoogleLight(Light):
     def parse_google_response(response: GoogleResponse) -> bool:
         return response.success
 
+    @overrides
     def turn_on(self) -> bool:
-        return GoogleLight.parse_google_response(
-            ask_google(f"turn {self.goog_name()} on")
-        )
+        return GoogleLight.parse_google_response(ask_google(f"turn {self.goog_name()} on"))
 
+    @overrides
     def turn_off(self) -> bool:
-        return GoogleLight.parse_google_response(
-            ask_google(f"turn {self.goog_name()} off")
-        )
+        return GoogleLight.parse_google_response(ask_google(f"turn {self.goog_name()} off"))
+
+    @overrides
+    def status(self) -> str:
+        if self.is_on():
+            return 'ON'
+        return 'off'
 
+    @overrides
     def is_on(self) -> bool:
         r = ask_google(f"is {self.goog_name()} on?")
         if not r.success:
             return False
-        return 'is on' in r.audio_transcription
+        if r.audio_transcription is not None:
+            return 'is on' in r.audio_transcription
+        raise Exception("Can't reach Google?!")
 
+    @overrides
     def is_off(self) -> bool:
         return not self.is_on()
 
+    @overrides
     def get_dimmer_level(self) -> Optional[int]:
         if not self.has_keyword("dimmer"):
             return False
@@ -154,13 +138,15 @@ class GoogleLight(Light):
 
         # the bookcase one is set to 40% bright
         txt = r.audio_transcription
-        m = re.search(r"(\d+)% bright", txt)
-        if m is not None:
-            return int(m.group(1))
-        if "is off" in txt:
-            return 0
+        if txt is not None:
+            m = re.search(r"(\d+)% bright", txt)
+            if m is not None:
+                return int(m.group(1))
+            if "is off" in txt:
+                return 0
         return None
 
+    @overrides
     def set_dimmer_level(self, level: int) -> bool:
         if not self.has_keyword("dimmer"):
             return False
@@ -174,13 +160,14 @@ class GoogleLight(Light):
             return True
         return False
 
+    @overrides
     def make_color(self, color: str) -> bool:
-        return GoogleLight.parse_google_response(
-            ask_google(f"make {self.goog_name()} {color}")
-        )
+        return GoogleLight.parse_google_response(ask_google(f"make {self.goog_name()} {color}"))
+
 
+class TuyaLight(BaseLight):
+    """A Tuya smart light."""
 
-class TuyaLight(Light):
     ids_by_mac = {
         '68:C6:3A:DE:1A:94': '8844664268c63ade1a94',
         '68:C6:3A:DE:27:1A': '8844664268c63ade271a',
@@ -199,63 +186,80 @@ class TuyaLight(Light):
     }
 
     def __init__(self, name: str, mac: str, keywords: str = "") -> None:
-        from subprocess import Popen, PIPE
         super().__init__(name, mac, keywords)
         mac = mac.upper()
         if mac not in TuyaLight.ids_by_mac or mac not in TuyaLight.keys_by_mac:
             raise Exception(f'{mac} is unknown; add it to ids_by_mac and keys_by_mac')
         self.devid = TuyaLight.ids_by_mac[mac]
         self.key = TuyaLight.keys_by_mac[mac]
-        try:
-            pid = Popen(['maclookup', mac], stdout=PIPE)
-            ip = pid.communicate()[0]
-            ip = ip[:-1]
-        except Exception:
-            ip = '0.0.0.0'
+        self.arper = arper.Arper()
+        ip = self.get_ip()
         self.bulb = tt.BulbDevice(self.devid, ip, local_key=self.key)
 
+    def get_status(self) -> Dict[str, Any]:
+        return self.bulb.status()
+
+    @overrides
+    def status(self) -> str:
+        ret = ''
+        for k, v in self.bulb.status().items():
+            ret += f'{k} = {v}\n'
+        return ret
+
+    @overrides
     def turn_on(self) -> bool:
         self.bulb.turn_on()
         return True
 
+    @overrides
     def turn_off(self) -> bool:
         self.bulb.turn_off()
         return True
 
-    def get_status(self) -> Dict[str, Any]:
-        return self.bulb.status()
-
+    @overrides
     def is_on(self) -> bool:
         s = self.get_status()
         return s['dps']['1']
 
+    @overrides
     def is_off(self) -> bool:
         return not self.is_on()
 
+    @overrides
     def get_dimmer_level(self) -> Optional[int]:
         s = self.get_status()
         return s['dps']['3']
 
+    @overrides
     def set_dimmer_level(self, level: int) -> bool:
+        logger.debug('Setting brightness to %d', level)
         self.bulb.set_brightness(level)
         return True
 
+    @overrides
     def make_color(self, color: str) -> bool:
-        self.bulb.set_colour(255,0,0)
-        return True
+        rgb = BaseLight.parse_color_string(color)
+        logger.debug('Light color: %s -> %s', color, rgb)
+        if rgb is not None:
+            self.bulb.set_colour(rgb[0], rgb[1], rgb[2])
+            return True
+        return False
+
 
+class TPLinkLight(BaseLight):
+    """A TPLink smart light."""
 
-class TPLinkLight(Light):
     def __init__(self, name: str, mac: str, keywords: str = "") -> None:
         super().__init__(name, mac, keywords)
         self.children: List[str] = []
         self.info: Optional[Dict] = None
         self.info_ts: Optional[datetime.datetime] = None
-        if "children" in self.keywords:
-            self.info = self.get_info()
-            if self.info is not None:
-                for child in self.info["children"]:
-                    self.children.append(child["id"])
+        if self.keywords is not None:
+            if "children" in self.keywords:
+                self.info = self.get_info()
+                if self.info is not None:
+                    for child in self.info["children"]:
+                        self.children.append(child["id"])
 
     @memoized
     def get_tplink_name(self) -> Optional[str]:
@@ -266,7 +270,7 @@ class TPLinkLight(Light):
 
     def get_cmdline(self, child: str = None) -> str:
         cmd = (
-            f"{config.config['light_utils_tplink_location']} -m {self.mac} "
+            f"{config.config['smart_lights_tplink_location']} -m {self.mac} "
             f"--no_logging_console "
         )
         if child is not None:
@@ -276,48 +280,61 @@ class TPLinkLight(Light):
     def get_children(self) -> List[str]:
         return self.children
 
-    def command(
-        self, cmd: str, child: str = None, extra_args: str = None
-    ) -> bool:
+    def command(self, cmd: str, child: str = None, extra_args: str = None) -> bool:
         cmd = self.get_cmdline(child) + f"-c {cmd}"
         if extra_args is not None:
             cmd += f" {extra_args}"
-        logger.debug(f'About to execute {cmd}')
-        return tplink_light_command(cmd)
+        logger.debug('About to execute: %s', cmd)
+        return tplink.tplink_command_wrapper(cmd)
 
-    def turn_on(self, child: str = None) -> bool:
+    @overrides
+    def turn_on(self) -> bool:
+        return self.command("on", None)
+
+    @overrides
+    def turn_off(self) -> bool:
+        return self.command("off", None)
+
+    def turn_on_child(self, child: str = None) -> bool:
         return self.command("on", child)
 
-    def turn_off(self, child: str = None) -> bool:
+    def turn_off_child(self, child: str = None) -> bool:
         return self.command("off", child)
 
+    @overrides
     def is_on(self) -> bool:
-        return self.get_on_duration_seconds() > 0
+        self.info = self.get_info()
+        if self.info is None:
+            raise Exception('Unable to get info?')
+        return self.info.get("relay_state", 0) == 1
 
+    @overrides
     def is_off(self) -> bool:
         return not self.is_on()
 
+    @overrides
     def make_color(self, color: str) -> bool:
         raise NotImplementedError
 
-    @timeout(
-        10.0, use_signals=False, error_message="Timed out waiting for tplink.py"
-    )
     def get_info(self) -> Optional[Dict]:
-        cmd = self.get_cmdline() + "-c info"
-        out = subprocess.getoutput(cmd)
-        out = re.sub("Sent:.*\n", "", out)
-        out = re.sub("Received: *", "", out)
-        try:
-            self.info = json.loads(out)["system"]["get_sysinfo"]
-            self.info_ts = datetime.datetime.now()
+        ip = self.get_ip()
+        if ip is not None:
+            self.info = tplink.tplink_get_info(ip)
+            if self.info is not None:
+                self.info_ts = datetime.datetime.now()
+            else:
+                self.info_ts = None
             return self.info
-        except Exception as e:
-            logger.exception(e)
-            print(out, file=sys.stderr)
-            self.info = None
-            self.info_ts = None
-            return None
+        return None
+
+    @overrides
+    def status(self) -> str:
+        ret = ''
+        info = self.get_info()
+        if info is not None:
+            for k, v in info:
+                ret += f'{k} = {v}\n'
+        return ret
 
     def get_on_duration_seconds(self, child: str = None) -> int:
         self.info = self.get_info()
@@ -333,13 +350,7 @@ class TPLinkLight(Light):
                     return int(chi.get("on_time", "0"))
         return 0
 
-    def get_on_limit_seconds(self) -> Optional[int]:
-        for kw in self.kws:
-            m = re.search(r"timeout:(\d+)", kw)
-            if m is not None:
-                return int(m.group(1)) * 60
-        return None
-
+    @overrides
     def get_dimmer_level(self) -> Optional[int]:
         if not self.has_keyword("dimmer"):
             return False
@@ -348,235 +359,71 @@ class TPLinkLight(Light):
             return None
         return int(self.info.get("brightness", "0"))
 
+    @overrides
     def set_dimmer_level(self, level: int) -> bool:
         if not self.has_keyword("dimmer"):
             return False
         cmd = (
             self.get_cmdline()
-            + f'-j \'{{"smartlife.iot.dimmer":{{"set_brightness":{{"brightness":{level} }} }} }}\''
-        )
-        return tplink_light_command(cmd)
-
-
-class GoogleLightGroup(GoogleLight):
-    def __init__(self, name: str, members: List[GoogleLight], keywords: str = "") -> None:
-        if len(members) < 1:
-            raise Exception("There must be at least one light in the group.")
-        self.members = members
-        mac = GoogleLightGroup.make_up_mac(members)
-        super().__init__(name, mac, keywords)
-
-    @staticmethod
-    def make_up_mac(members: List[GoogleLight]):
-        mac = members[0].get_mac()
-        b = mac.split(':')
-        b[5] = int(b[5], 16) + 1
-        if b[5] > 255:
-            b[5] = 0
-        b[5] = str(b[5])
-        return ":".join(b)
-
-    def is_on(self) -> bool:
-        r = ask_google(f"are {self.goog_name()} on?")
-        if not r.success:
-            return False
-        return 'is on' in r.audio_transcription
-
-    def get_dimmer_level(self) -> Optional[int]:
-        if not self.has_keyword("dimmer"):
-            return False
-        r = ask_google(f'how bright are {self.goog_name()}?')
-        if not r.success:
-            return None
-
-        # four lights are set to 100% brightness
-        txt = r.audio_transcription
-        m = re.search(r"(\d+)% bright", txt)
-        if m is not None:
-            return int(m.group(1))
-        if "is off" in txt:
-            return 0
-        return None
-
-    def set_dimmer_level(self, level: int) -> bool:
-        if not self.has_keyword("dimmer"):
-            return False
-        if 0 <= level <= 100:
-            was_on = self.is_on()
-            r = ask_google(f"set {self.goog_name()} to {level} percent")
-            if not r.success:
-                return False
-            if not was_on:
-                self.turn_off()
-            return True
-        return False
-
-    def make_color(self, color: str) -> bool:
-        return GoogleLight.parse_google_response(
-            ask_google(f"make {self.goog_name()} {color}")
-        )
-
-
-class LightingConfig(object):
-    """Representation of the smart light device config."""
-
-    def __init__(
-            self,
-            config_file: str = None,
-    ) -> None:
-        if config_file is None:
-            config_file = config.config[
-                'light_utils_network_mac_addresses_location'
-            ]
-        self.macs_by_name = {}
-        self._keywords_by_name = {}
-        self.keywords_by_mac = {}
-        self.names_by_mac = {}
-        self.corpus = logical_search.Corpus()
-        with open(config_file, "r") as f:
-            contents = f.readlines()
-
-        diningroom_lights = []
-        bookcase_lights = []
-        for line in contents:
-            line = line.rstrip("\n")
-            line = re.sub(r"#.*$", r"", line)
-            line = line.strip()
-            if line == "":
-                continue
-            (mac, name, keywords) = line.split(",")
-            mac = mac.strip()
-            name = name.strip()
-            keywords = keywords.strip()
-            if "perm" not in keywords:
-                continue
-            self.macs_by_name[name] = mac
-            self._keywords_by_name[name] = keywords
-            self.keywords_by_mac[mac] = keywords
-            self.names_by_mac[mac] = name
-
-#            if "bookcase_light_" in name:
-#                bookcase_lights.append(mac)
-#            elif "diningroom_light_" in name:
-#                diningroom_lights.append(mac)
-#            else:
-            self.index_light(name, keywords, mac)
-
-        # name = 'bookcase_lights'
-        # group = []
-        # keywords = 'perm wifi light smart goog dimmer'
-        # for b in bookcase_lights:
-        #     group.append(self.get_light_by_mac(b))
-        # self.bookcase_group = GoogleLightGroup(
-        #     name,
-        #     group,
-        #     keywords,
-        # )
-        # mac = self.bookcase_group.get_mac()
-        # self.macs_by_name[name] = mac
-        # self._keywords_by_name[name] = keywords
-        # self.keywords_by_mac[mac] = keywords
-        # self.names_by_mac[mac] = name
-        # self.index_light(name, keywords, mac)
-
-        # name = 'dining_room_lights'
-        # group = []
-        # for b in diningroom_lights:
-        #     group.append(self.get_light_by_mac(b))
-        # self.diningroom_group = GoogleLightGroup(
-        #     name,
-        #     group,
-        #     keywords,
-        # )
-        # mac = self.diningroom_group.get_mac()
-        # self.macs_by_name[name] = mac
-        # self._keywords_by_name[name] = keywords
-        # self.keywords_by_mac[mac] = keywords
-        # self.names_by_mac[mac] = name
-        # self.index_light(name, keywords, mac)
-
-    def index_light(self, name: str, keywords: str, mac: str) -> None:
-        properties = [("name", name)]
-        tags = set()
-        for kw in keywords.split():
-            if ":" in kw:
-                key, value = kw.split(":")
-                properties.append((key, value))
-            else:
-                tags.add(kw)
-        light = logical_search.Document(
-            docid=mac,
-            tags=tags,
-            properties=properties,
-            reference=None,
+            + '-j \'{"smartlife.iot.dimmer":{"set_brightness":{"brightness":%d}}}\'' % level
         )
-        self.corpus.add_doc(light)
-
-    def __repr__(self) -> str:
-        s = "Known devices:\n"
-        for name, keywords in self._keywords_by_name.items():
-            mac = self.macs_by_name[name]
-            s += f"  {name} ({mac}) => {keywords}\n"
-        return s
-
-    def get_keywords_by_name(self, name: str) -> Optional[str]:
-        return self._keywords_by_name.get(name, None)
-
-    def get_macs_by_name(self, name: str) -> Set[str]:
-        retval = set()
-        for (mac, lname) in self.names_by_mac.items():
-            if name in lname:
-                retval.add(mac)
-        return retval
-
-    def get_macs_by_keyword(self, keyword: str) -> Set[str]:
-        retval = set()
-        for (mac, keywords) in self.keywords_by_mac.items():
-            if keyword in keywords:
-                retval.add(mac)
-        return retval
-
-    def get_light_by_name(self, name: str) -> Optional[Light]:
-        if name in self.macs_by_name:
-            return self.get_light_by_mac(self.macs_by_name[name])
-        return None
-
-    def get_all_lights(self) -> List[Light]:
-        retval = []
-        for (mac, kws) in self.keywords_by_mac.items():
-            if mac is not None:
-                light = self.get_light_by_mac(mac)
-                if light is not None:
-                    retval.append(light)
-        return retval
-
-    def get_light_by_mac(self, mac: str) -> Optional[Light]:
-        if mac in self.keywords_by_mac:
-            name = self.names_by_mac[mac]
-            kws = self.keywords_by_mac[mac]
-            if name == 'bookcase_lights':
-                return self.bookcase_group
-            elif name == 'dining_room_lights':
-                return self.diningroom_group
-            elif 'tplink' in kws.lower():
-                return TPLinkLight(name, mac, kws)
-            elif 'tuya' in kws.lower():
-                return TuyaLight(name, mac, kws)
-            else:
-                return GoogleLight(name, mac, kws)
-        return None
-
-    def query(self, query: str) -> List[Light]:
-        """Evaluates a lighting query expression formed of keywords to search
-        for, logical operators (and, or, not), and parenthesis.
-        Returns a list of matching lights.
-        """
-        retval = []
-        results = self.corpus.query(query)
-        if results is not None:
-            for mac in results:
-                if mac is not None:
-                    light = self.get_light_by_mac(mac)
-                    if light is not None:
-                        retval.append(light)
-        return retval
+        return tplink.tplink_command_wrapper(cmd)
+
+
+# class GoogleLightGroup(GoogleLight):
+#     def __init__(self, name: str, members: List[GoogleLight], keywords: str = "") -> None:
+#         if len(members) < 1:
+#             raise Exception("There must be at least one light in the group.")
+#         self.members = members
+#         mac = GoogleLightGroup.make_up_mac(members)
+#         super().__init__(name, mac, keywords)
+
+#     @staticmethod
+#     def make_up_mac(members: List[GoogleLight]):
+#         mac = members[0].get_mac()
+#         b = mac.split(':')
+#         b[5] = int(b[5], 16) + 1
+#         if b[5] > 255:
+#             b[5] = 0
+#         b[5] = str(b[5])
+#         return ":".join(b)
+
+#     def is_on(self) -> bool:
+#         r = ask_google(f"are {self.goog_name()} on?")
+#         if not r.success:
+#             return False
+#         return 'is on' in r.audio_transcription
+
+#     def get_dimmer_level(self) -> Optional[int]:
+#         if not self.has_keyword("dimmer"):
+#             return False
+#         r = ask_google(f'how bright are {self.goog_name()}?')
+#         if not r.success:
+#             return None
+
+#         # four lights are set to 100% brightness
+#         txt = r.audio_transcription
+#         m = re.search(r"(\d+)% bright", txt)
+#         if m is not None:
+#             return int(m.group(1))
+#         if "is off" in txt:
+#             return 0
+#         return None
+
+#     def set_dimmer_level(self, level: int) -> bool:
+#         if not self.has_keyword("dimmer"):
+#             return False
+#         if 0 <= level <= 100:
+#             was_on = self.is_on()
+#             r = ask_google(f"set {self.goog_name()} to {level} percent")
+#             if not r.success:
+#                 return False
+#             if not was_on:
+#                 self.turn_off()
+#             return True
+#         return False
+
+#     def make_color(self, color: str) -> bool:
+#         return GoogleLight.parse_google_response(
+#             ask_google(f"make {self.goog_name()} {color}")
+#         )