Fix state determination in tplink kasa lights.
[python_utils.git] / smart_home / lights.py
index 07521398307a9234eaac8d2e977b30def714a40d..64f2105ffe8a4de0864e95a14d2b05703010a2bb 100644 (file)
@@ -2,7 +2,7 @@
 
 """Utilities for dealing with the smart lights."""
 
-from abc import ABC, abstractmethod
+from abc import abstractmethod
 import datetime
 import json
 import logging
@@ -10,11 +10,14 @@ import os
 import re
 import subprocess
 import sys
-from typing import Any, Dict, List, Optional, Set
+from typing import Any, Dict, List, Optional, Tuple
 
+from overrides import overrides
 import tinytuya as tt
 
+import ansi
 import argparse_utils
+import arper
 import config
 import logging_utils
 import smart_home.device as dev
@@ -23,11 +26,11 @@ from decorator_utils import timeout, memoized
 
 logger = logging.getLogger(__name__)
 
-parser = config.add_commandline_args(
+args = config.add_commandline_args(
     f"Smart Lights ({__file__})",
     "Args related to smart lights.",
 )
-parser.add_argument(
+args.add_argument(
     '--smart_lights_tplink_location',
     default='/home/scott/bin/tplink.py',
     metavar='FILENAME',
@@ -43,14 +46,16 @@ def tplink_light_command(command: str) -> bool:
     result = os.system(command)
     signal = result & 0xFF
     if signal != 0:
-        logger.warning(f'{command} died with signal {signal}')
-        logging_utils.hlog("%s died with signal %d" % (command, signal))
+        msg = f'{command} died with signal {signal}'
+        logger.warning(msg)
+        logging_utils.hlog(msg)
         return False
     else:
         exit_value = result >> 8
         if exit_value != 0:
-            logger.warning(f'{command} failed, exited {exit_value}')
-            logging_utils.hlog("%s failed, exit %d" % (command, exit_value))
+            msg = f'{command} failed, exited {exit_value}'
+            logger.warning(msg)
+            logging_utils.hlog(msg)
             return False
     logger.debug(f'{command} succeeded.')
     return True
@@ -60,6 +65,24 @@ class BaseLight(dev.Device):
     def __init__(self, name: str, mac: str, keywords: str = "") -> None:
         super().__init__(name.strip(), mac.strip(), keywords)
 
+    @staticmethod
+    def parse_color_string(color: str) -> Optional[Tuple[int, int, int]]:
+        m = re.match(
+            'r#?([0-9a-fA-F][0-9a-fA-F])([0-9a-fA-F][0-9a-fA-F])([0-9a-fA-F][0-9a-fA-F])',
+            color
+        )
+        if m is not None and len(m.group) == 3:
+            red = int(m.group(0), 16)
+            green = int(m.group(1), 16)
+            blue = int(m.group(2), 16)
+            return (red, green, blue)
+        color = color.lower()
+        return ansi.COLOR_NAMES_TO_RGB.get(color, None)
+
+    @abstractmethod
+    def status(self) -> str:
+        pass
+
     @abstractmethod
     def turn_on(self) -> bool:
         pass
@@ -101,25 +124,36 @@ class GoogleLight(BaseLight):
     def parse_google_response(response: GoogleResponse) -> bool:
         return response.success
 
+    @overrides
     def turn_on(self) -> bool:
         return GoogleLight.parse_google_response(
             ask_google(f"turn {self.goog_name()} on")
         )
 
+    @overrides
     def turn_off(self) -> bool:
         return GoogleLight.parse_google_response(
             ask_google(f"turn {self.goog_name()} off")
         )
 
+    @overrides
+    def status(self) -> str:
+        if self.is_on():
+            return 'ON'
+        return 'off'
+
+    @overrides
     def is_on(self) -> bool:
         r = ask_google(f"is {self.goog_name()} on?")
         if not r.success:
             return False
         return 'is on' in r.audio_transcription
 
+    @overrides
     def is_off(self) -> bool:
         return not self.is_on()
 
+    @overrides
     def get_dimmer_level(self) -> Optional[int]:
         if not self.has_keyword("dimmer"):
             return False
@@ -136,6 +170,7 @@ class GoogleLight(BaseLight):
             return 0
         return None
 
+    @overrides
     def set_dimmer_level(self, level: int) -> bool:
         if not self.has_keyword("dimmer"):
             return False
@@ -149,6 +184,7 @@ class GoogleLight(BaseLight):
             return True
         return False
 
+    @overrides
     def make_color(self, color: str) -> bool:
         return GoogleLight.parse_google_response(
             ask_google(f"make {self.goog_name()} {color}")
@@ -174,50 +210,64 @@ class TuyaLight(BaseLight):
     }
 
     def __init__(self, name: str, mac: str, keywords: str = "") -> None:
-        from subprocess import Popen, PIPE
         super().__init__(name, mac, keywords)
         mac = mac.upper()
         if mac not in TuyaLight.ids_by_mac or mac not in TuyaLight.keys_by_mac:
             raise Exception(f'{mac} is unknown; add it to ids_by_mac and keys_by_mac')
         self.devid = TuyaLight.ids_by_mac[mac]
         self.key = TuyaLight.keys_by_mac[mac]
-        try:
-            pid = Popen(['maclookup', mac], stdout=PIPE)
-            ip = pid.communicate()[0]
-            ip = ip[:-1]
-        except Exception:
-            ip = '0.0.0.0'
+        self.arper = arper.Arper()
+        ip = self.get_ip()
         self.bulb = tt.BulbDevice(self.devid, ip, local_key=self.key)
 
+    def get_status(self) -> Dict[str, Any]:
+        return self.bulb.status()
+
+    @overrides
+    def status(self) -> str:
+        ret = ''
+        for k, v in self.bulb.status().items():
+            ret += f'{k} = {v}\n'
+        return ret
+
+    @overrides
     def turn_on(self) -> bool:
         self.bulb.turn_on()
         return True
 
+    @overrides
     def turn_off(self) -> bool:
         self.bulb.turn_off()
         return True
 
-    def get_status(self) -> Dict[str, Any]:
-        return self.bulb.status()
-
+    @overrides
     def is_on(self) -> bool:
         s = self.get_status()
         return s['dps']['1']
 
+    @overrides
     def is_off(self) -> bool:
         return not self.is_on()
 
+    @overrides
     def get_dimmer_level(self) -> Optional[int]:
         s = self.get_status()
         return s['dps']['3']
 
+    @overrides
     def set_dimmer_level(self, level: int) -> bool:
+        logger.debug(f'Setting brightness to {level}')
         self.bulb.set_brightness(level)
         return True
 
+    @overrides
     def make_color(self, color: str) -> bool:
-        self.bulb.set_colour(255,0,0)
-        return True
+        rgb = BaseLight.parse_color_string(color)
+        logger.debug(f'Light color: {color} -> {rgb}')
+        if rgb is not None:
+            self.bulb.set_colour(rgb[0], rgb[1], rgb[2])
+            return True
+        return False
 
 
 class TPLinkLight(BaseLight):
@@ -260,18 +310,26 @@ class TPLinkLight(BaseLight):
         logger.debug(f'About to execute {cmd}')
         return tplink_light_command(cmd)
 
+    @overrides
     def turn_on(self, child: str = None) -> bool:
         return self.command("on", child)
 
+    @overrides
     def turn_off(self, child: str = None) -> bool:
         return self.command("off", child)
 
+    @overrides
     def is_on(self) -> bool:
-        return self.get_on_duration_seconds() > 0
+        self.info = self.get_info()
+        if self.info is None:
+            raise Exception('Unable to get info?')
+        return self.info.get("relay_state", 0) == 1
 
+    @overrides
     def is_off(self) -> bool:
         return not self.is_on()
 
+    @overrides
     def make_color(self, color: str) -> bool:
         raise NotImplementedError
 
@@ -281,10 +339,12 @@ class TPLinkLight(BaseLight):
     def get_info(self) -> Optional[Dict]:
         cmd = self.get_cmdline() + "-c info"
         out = subprocess.getoutput(cmd)
+        logger.debug(f'RAW OUT> {out}')
         out = re.sub("Sent:.*\n", "", out)
         out = re.sub("Received: *", "", out)
         try:
             self.info = json.loads(out)["system"]["get_sysinfo"]
+            logger.debug(json.dumps(self.info, indent=4, sort_keys=True))
             self.info_ts = datetime.datetime.now()
             return self.info
         except Exception as e:
@@ -294,6 +354,13 @@ class TPLinkLight(BaseLight):
             self.info_ts = None
             return None
 
+    @overrides
+    def status(self) -> str:
+        ret = ''
+        for k, v in self.get_info().items():
+            ret += f'{k} = {v}\n'
+        return ret
+
     def get_on_duration_seconds(self, child: str = None) -> int:
         self.info = self.get_info()
         if child is None:
@@ -308,13 +375,7 @@ class TPLinkLight(BaseLight):
                     return int(chi.get("on_time", "0"))
         return 0
 
-    def get_on_limit_seconds(self) -> Optional[int]:
-        for kw in self.kws:
-            m = re.search(r"timeout:(\d+)", kw)
-            if m is not None:
-                return int(m.group(1)) * 60
-        return None
-
+    @overrides
     def get_dimmer_level(self) -> Optional[int]:
         if not self.has_keyword("dimmer"):
             return False
@@ -323,6 +384,7 @@ class TPLinkLight(BaseLight):
             return None
         return int(self.info.get("brightness", "0"))
 
+    @overrides
     def set_dimmer_level(self, level: int) -> bool:
         if not self.has_keyword("dimmer"):
             return False