Since this thing is on the innerwebs I suppose it should have a
[python_utils.git] / smart_home / lights.py
index 44b3634cd5fb9f5c84620762701f120db8bd3977..ee23fb08f4e60ef4664338cde7e53bdefd340544 100644 (file)
@@ -1,29 +1,26 @@
 #!/usr/bin/env python3
 
+# © Copyright 2021-2022, Scott Gasch
+
 """Utilities for dealing with the smart lights."""
 
-from abc import abstractmethod
 import datetime
-import json
 import logging
-import os
 import re
-import subprocess
-import sys
+from abc import abstractmethod
 from typing import Any, Dict, List, Optional, Tuple
-import warnings
 
-from overrides import overrides
 import tinytuya as tt
+from overrides import overrides
 
 import ansi
 import argparse_utils
 import arper
 import config
-import logging_utils
 import smart_home.device as dev
-from google_assistant import ask_google, GoogleResponse
-from decorator_utils import timeout, memoized
+import smart_home.tplink_utils as tplink
+from decorator_utils import memoized
+from google_assistant import GoogleResponse, ask_google
 
 logger = logging.getLogger(__name__)
 
@@ -40,31 +37,9 @@ args.add_argument(
 )
 
 
-@timeout(
-    5.0, use_signals=False, error_message="Timed out waiting for tplink.py"
-)
-def tplink_light_command(command: str) -> bool:
-    result = os.system(command)
-    signal = result & 0xFF
-    if signal != 0:
-        msg = f'{command} died with signal {signal}'
-        logger.warning(msg)
-        warnings.warn(msg)
-        logging_utils.hlog(msg)
-        return False
-    else:
-        exit_value = result >> 8
-        if exit_value != 0:
-            msg = f'{command} failed, exited {exit_value}'
-            logger.warning(msg)
-            warnings.warn(msg)
-            logging_utils.hlog(msg)
-            return False
-    logger.debug(f'{command} succeeded.')
-    return True
-
-
 class BaseLight(dev.Device):
+    """A base class representing a smart light."""
+
     def __init__(self, name: str, mac: str, keywords: str = "") -> None:
         super().__init__(name.strip(), mac.strip(), keywords)
 
@@ -72,9 +47,9 @@ class BaseLight(dev.Device):
     def parse_color_string(color: str) -> Optional[Tuple[int, int, int]]:
         m = re.match(
             'r#?([0-9a-fA-F][0-9a-fA-F])([0-9a-fA-F][0-9a-fA-F])([0-9a-fA-F][0-9a-fA-F])',
-            color
+            color,
         )
-        if m is not None and len(m.group) == 3:
+        if m is not None and len(m.groups()) == 3:
             red = int(m.group(0), 16)
             green = int(m.group(1), 16)
             blue = int(m.group(2), 16)
@@ -116,8 +91,7 @@ class BaseLight(dev.Device):
 
 
 class GoogleLight(BaseLight):
-    def __init__(self, name: str, mac: str, keywords: str = "") -> None:
-        super().__init__(name, mac, keywords)
+    """A smart light controlled by talking to Google."""
 
     def goog_name(self) -> str:
         name = self.get_name()
@@ -129,15 +103,11 @@ class GoogleLight(BaseLight):
 
     @overrides
     def turn_on(self) -> bool:
-        return GoogleLight.parse_google_response(
-            ask_google(f"turn {self.goog_name()} on")
-        )
+        return GoogleLight.parse_google_response(ask_google(f"turn {self.goog_name()} on"))
 
     @overrides
     def turn_off(self) -> bool:
-        return GoogleLight.parse_google_response(
-            ask_google(f"turn {self.goog_name()} off")
-        )
+        return GoogleLight.parse_google_response(ask_google(f"turn {self.goog_name()} off"))
 
     @overrides
     def status(self) -> str:
@@ -150,7 +120,9 @@ class GoogleLight(BaseLight):
         r = ask_google(f"is {self.goog_name()} on?")
         if not r.success:
             return False
-        return 'is on' in r.audio_transcription
+        if r.audio_transcription is not None:
+            return 'is on' in r.audio_transcription
+        raise Exception("Can't reach Google?!")
 
     @overrides
     def is_off(self) -> bool:
@@ -166,11 +138,12 @@ class GoogleLight(BaseLight):
 
         # the bookcase one is set to 40% bright
         txt = r.audio_transcription
-        m = re.search(r"(\d+)% bright", txt)
-        if m is not None:
-            return int(m.group(1))
-        if "is off" in txt:
-            return 0
+        if txt is not None:
+            m = re.search(r"(\d+)% bright", txt)
+            if m is not None:
+                return int(m.group(1))
+            if "is off" in txt:
+                return 0
         return None
 
     @overrides
@@ -189,12 +162,12 @@ class GoogleLight(BaseLight):
 
     @overrides
     def make_color(self, color: str) -> bool:
-        return GoogleLight.parse_google_response(
-            ask_google(f"make {self.goog_name()} {color}")
-        )
+        return GoogleLight.parse_google_response(ask_google(f"make {self.goog_name()} {color}"))
 
 
 class TuyaLight(BaseLight):
+    """A Tuya smart light."""
+
     ids_by_mac = {
         '68:C6:3A:DE:1A:94': '8844664268c63ade1a94',
         '68:C6:3A:DE:27:1A': '8844664268c63ade271a',
@@ -259,14 +232,14 @@ class TuyaLight(BaseLight):
 
     @overrides
     def set_dimmer_level(self, level: int) -> bool:
-        logger.debug(f'Setting brightness to {level}')
+        logger.debug('Setting brightness to %d', level)
         self.bulb.set_brightness(level)
         return True
 
     @overrides
     def make_color(self, color: str) -> bool:
         rgb = BaseLight.parse_color_string(color)
-        logger.debug(f'Light color: {color} -> {rgb}')
+        logger.debug('Light color: %s -> %s', color, rgb)
         if rgb is not None:
             self.bulb.set_colour(rgb[0], rgb[1], rgb[2])
             return True
@@ -274,16 +247,19 @@ class TuyaLight(BaseLight):
 
 
 class TPLinkLight(BaseLight):
+    """A TPLink smart light."""
+
     def __init__(self, name: str, mac: str, keywords: str = "") -> None:
         super().__init__(name, mac, keywords)
         self.children: List[str] = []
         self.info: Optional[Dict] = None
         self.info_ts: Optional[datetime.datetime] = None
-        if "children" in self.keywords:
-            self.info = self.get_info()
-            if self.info is not None:
-                for child in self.info["children"]:
-                    self.children.append(child["id"])
+        if self.keywords is not None:
+            if "children" in self.keywords:
+                self.info = self.get_info()
+                if self.info is not None:
+                    for child in self.info["children"]:
+                        self.children.append(child["id"])
 
     @memoized
     def get_tplink_name(self) -> Optional[str]:
@@ -304,21 +280,25 @@ class TPLinkLight(BaseLight):
     def get_children(self) -> List[str]:
         return self.children
 
-    def command(
-        self, cmd: str, child: str = None, extra_args: str = None
-    ) -> bool:
+    def command(self, cmd: str, child: str = None, extra_args: str = None) -> bool:
         cmd = self.get_cmdline(child) + f"-c {cmd}"
         if extra_args is not None:
             cmd += f" {extra_args}"
-        logger.debug(f'About to execute {cmd}')
-        return tplink_light_command(cmd)
+        logger.debug('About to execute: %s', cmd)
+        return tplink.tplink_command_wrapper(cmd)
 
     @overrides
-    def turn_on(self, child: str = None) -> bool:
-        return self.command("on", child)
+    def turn_on(self) -> bool:
+        return self.command("on", None)
 
     @overrides
-    def turn_off(self, child: str = None) -> bool:
+    def turn_off(self) -> bool:
+        return self.command("off", None)
+
+    def turn_on_child(self, child: str = None) -> bool:
+        return self.command("on", child)
+
+    def turn_off_child(self, child: str = None) -> bool:
         return self.command("off", child)
 
     @overrides
@@ -326,7 +306,7 @@ class TPLinkLight(BaseLight):
         self.info = self.get_info()
         if self.info is None:
             raise Exception('Unable to get info?')
-        return self.info.get("relay_state", "0") == "1"
+        return self.info.get("relay_state", 0) == 1
 
     @overrides
     def is_off(self) -> bool:
@@ -336,31 +316,24 @@ class TPLinkLight(BaseLight):
     def make_color(self, color: str) -> bool:
         raise NotImplementedError
 
-    @timeout(
-        10.0, use_signals=False, error_message="Timed out waiting for tplink.py"
-    )
     def get_info(self) -> Optional[Dict]:
-        cmd = self.get_cmdline() + "-c info"
-        out = subprocess.getoutput(cmd)
-        logger.debug(f'RAW OUT> {out}')
-        out = re.sub("Sent:.*\n", "", out)
-        out = re.sub("Received: *", "", out)
-        try:
-            self.info = json.loads(out)["system"]["get_sysinfo"]
-            self.info_ts = datetime.datetime.now()
+        ip = self.get_ip()
+        if ip is not None:
+            self.info = tplink.tplink_get_info(ip)
+            if self.info is not None:
+                self.info_ts = datetime.datetime.now()
+            else:
+                self.info_ts = None
             return self.info
-        except Exception as e:
-            logger.exception(e)
-            print(out, file=sys.stderr)
-            self.info = None
-            self.info_ts = None
-            return None
+        return None
 
     @overrides
     def status(self) -> str:
         ret = ''
-        for k, v in self.get_info().items():
-            ret += f'{k} = {v}\n'
+        info = self.get_info()
+        if info is not None:
+            for k, v in info:
+                ret += f'{k} = {v}\n'
         return ret
 
     def get_on_duration_seconds(self, child: str = None) -> int:
@@ -392,9 +365,9 @@ class TPLinkLight(BaseLight):
             return False
         cmd = (
             self.get_cmdline()
-            + f'-j \'{{"smartlife.iot.dimmer":{{"set_brightness":{{"brightness":{level} }} }} }}\''
+            + '-j \'{"smartlife.iot.dimmer":{"set_brightness":{"brightness":%d}}}\'' % level
         )
-        return tplink_light_command(cmd)
+        return tplink.tplink_command_wrapper(cmd)
 
 
 # class GoogleLightGroup(GoogleLight):