Move cache location. Also, add doctests for exec_utils.
[python_utils.git] / light_utils.py
index f63ba0b9b75193d2a076ed5fcfa9c015b65dc621..6ca6e71db2329a65402116a5cf769048023521f6 100644 (file)
@@ -10,13 +10,15 @@ import os
 import re
 import subprocess
 import sys
-from typing import Dict, List, Optional, Set
+from typing import Any, Dict, List, Optional, Set
+
+import tinytuya as tt
 
 import argparse_utils
 import config
-import logical_search
 import logging_utils
-import google_assistant as goog
+import logical_search
+from google_assistant import ask_google, GoogleResponse
 from decorator_utils import timeout, memoized
 
 logger = logging.getLogger(__name__)
@@ -48,13 +50,16 @@ def tplink_light_command(command: str) -> bool:
     result = os.system(command)
     signal = result & 0xFF
     if signal != 0:
+        logger.warning(f'{command} died with signal {signal}')
         logging_utils.hlog("%s died with signal %d" % (command, signal))
         return False
     else:
         exit_value = result >> 8
         if exit_value != 0:
+            logger.warning(f'{command} failed, exited {exit_value}')
             logging_utils.hlog("%s failed, exit %d" % (command, exit_value))
             return False
+    logger.debug(f'{command} succeeded.')
     return True
 
 
@@ -118,21 +123,21 @@ class GoogleLight(Light):
         return name.replace("_", " ")
 
     @staticmethod
-    def parse_google_response(response: goog.GoogleResponse) -> bool:
+    def parse_google_response(response: GoogleResponse) -> bool:
         return response.success
 
     def turn_on(self) -> bool:
         return GoogleLight.parse_google_response(
-            goog.ask_google(f"turn {self.goog_name()} on")
+            ask_google(f"turn {self.goog_name()} on")
         )
 
     def turn_off(self) -> bool:
         return GoogleLight.parse_google_response(
-            goog.ask_google(f"turn {self.goog_name()} off")
+            ask_google(f"turn {self.goog_name()} off")
         )
 
     def is_on(self) -> bool:
-        r = goog.ask_google(f"is {self.goog_name()} on?")
+        r = ask_google(f"is {self.goog_name()} on?")
         if not r.success:
             return False
         return 'is on' in r.audio_transcription
@@ -143,7 +148,7 @@ class GoogleLight(Light):
     def get_dimmer_level(self) -> Optional[int]:
         if not self.has_keyword("dimmer"):
             return False
-        r = goog.ask_google(f'how bright is {self.goog_name()}?')
+        r = ask_google(f'how bright is {self.goog_name()}?')
         if not r.success:
             return None
 
@@ -161,7 +166,7 @@ class GoogleLight(Light):
             return False
         if 0 <= level <= 100:
             was_on = self.is_on()
-            r = goog.ask_google(f"set {self.goog_name()} to {level} percent")
+            r = ask_google(f"set {self.goog_name()} to {level} percent")
             if not r.success:
                 return False
             if not was_on:
@@ -171,10 +176,75 @@ class GoogleLight(Light):
 
     def make_color(self, color: str) -> bool:
         return GoogleLight.parse_google_response(
-            goog.ask_google(f"make {self.goog_name()} {color}")
+            ask_google(f"make {self.goog_name()} {color}")
         )
 
 
+class TuyaLight(Light):
+    ids_by_mac = {
+        '68:C6:3A:DE:1A:94': '8844664268c63ade1a94',
+        '68:C6:3A:DE:27:1A': '8844664268c63ade271a',
+        '68:C6:3A:DE:1D:95': '8844664268c63ade1d95',
+        '68:C6:3A:DE:19:B3': '8844664268c63ade19b3',
+        '80:7D:3A:77:3B:F5': '07445340807d3a773bf5',
+        '80:7D:3A:58:37:02': '07445340807d3a583702',
+    }
+    keys_by_mac = {
+        '68:C6:3A:DE:1A:94': '237f19b1b3d49c36',
+        '68:C6:3A:DE:27:1A': '237f19b1b3d49c36',
+        '68:C6:3A:DE:1D:95': '04b90fc5cd7625d8',
+        '68:C6:3A:DE:19:B3': '2d601f2892f1aefd',
+        '80:7D:3A:77:3B:F5': '27ab921fe4633519',
+        '80:7D:3A:58:37:02': '8559b5416bfa0c05',
+    }
+
+    def __init__(self, name: str, mac: str, keywords: str = "") -> None:
+        from subprocess import Popen, PIPE
+        super().__init__(name, mac, keywords)
+        mac = mac.upper()
+        if mac not in TuyaLight.ids_by_mac or mac not in TuyaLight.keys_by_mac:
+            raise Exception(f'{mac} is unknown; add it to ids_by_mac and keys_by_mac')
+        self.devid = TuyaLight.ids_by_mac[mac]
+        self.key = TuyaLight.keys_by_mac[mac]
+        try:
+            pid = Popen(['maclookup', mac], stdout=PIPE)
+            ip = pid.communicate()[0]
+            ip = ip[:-1]
+        except Exception:
+            ip = '0.0.0.0'
+        self.bulb = tt.BulbDevice(self.devid, ip, local_key=self.key)
+
+    def turn_on(self) -> bool:
+        self.bulb.turn_on()
+        return True
+
+    def turn_off(self) -> bool:
+        self.bulb.turn_off()
+        return True
+
+    def get_status(self) -> Dict[str, Any]:
+        return self.bulb.status()
+
+    def is_on(self) -> bool:
+        s = self.get_status()
+        return s['dps']['1']
+
+    def is_off(self) -> bool:
+        return not self.is_on()
+
+    def get_dimmer_level(self) -> Optional[int]:
+        s = self.get_status()
+        return s['dps']['3']
+
+    def set_dimmer_level(self, level: int) -> bool:
+        self.bulb.set_brightness(level)
+        return True
+
+    def make_color(self, color: str) -> bool:
+        self.bulb.set_colour(255,0,0)
+        return True
+
+
 class TPLinkLight(Light):
     def __init__(self, name: str, mac: str, keywords: str = "") -> None:
         super().__init__(name, mac, keywords)
@@ -212,6 +282,7 @@ class TPLinkLight(Light):
         cmd = self.get_cmdline(child) + f"-c {cmd}"
         if extra_args is not None:
             cmd += f" {extra_args}"
+        logger.debug(f'About to execute {cmd}')
         return tplink_light_command(cmd)
 
     def turn_on(self, child: str = None) -> bool:
@@ -287,6 +358,65 @@ class TPLinkLight(Light):
         return tplink_light_command(cmd)
 
 
+class GoogleLightGroup(GoogleLight):
+    def __init__(self, name: str, members: List[GoogleLight], keywords: str = "") -> None:
+        if len(members) < 1:
+            raise Exception("There must be at least one light in the group.")
+        self.members = members
+        mac = GoogleLightGroup.make_up_mac(members)
+        super().__init__(name, mac, keywords)
+
+    @staticmethod
+    def make_up_mac(members: List[GoogleLight]):
+        mac = members[0].get_mac()
+        b = mac.split(':')
+        b[5] = int(b[5], 16) + 1
+        if b[5] > 255:
+            b[5] = 0
+        b[5] = str(b[5])
+        return ":".join(b)
+
+    def is_on(self) -> bool:
+        r = ask_google(f"are {self.goog_name()} on?")
+        if not r.success:
+            return False
+        return 'is on' in r.audio_transcription
+
+    def get_dimmer_level(self) -> Optional[int]:
+        if not self.has_keyword("dimmer"):
+            return False
+        r = ask_google(f'how bright are {self.goog_name()}?')
+        if not r.success:
+            return None
+
+        # four lights are set to 100% brightness
+        txt = r.audio_transcription
+        m = re.search(r"(\d+)% bright", txt)
+        if m is not None:
+            return int(m.group(1))
+        if "is off" in txt:
+            return 0
+        return None
+
+    def set_dimmer_level(self, level: int) -> bool:
+        if not self.has_keyword("dimmer"):
+            return False
+        if 0 <= level <= 100:
+            was_on = self.is_on()
+            r = ask_google(f"set {self.goog_name()} to {level} percent")
+            if not r.success:
+                return False
+            if not was_on:
+                self.turn_off()
+            return True
+        return False
+
+    def make_color(self, color: str) -> bool:
+        return GoogleLight.parse_google_response(
+            ask_google(f"make {self.goog_name()} {color}")
+        )
+
+
 class LightingConfig(object):
     """Representation of the smart light device config."""
 
@@ -305,6 +435,9 @@ class LightingConfig(object):
         self.corpus = logical_search.Corpus()
         with open(config_file, "r") as f:
             contents = f.readlines()
+
+        diningroom_lights = []
+        bookcase_lights = []
         for line in contents:
             line = line.rstrip("\n")
             line = re.sub(r"#.*$", r"", line)
@@ -317,27 +450,67 @@ class LightingConfig(object):
             keywords = keywords.strip()
             if "perm" not in keywords:
                 continue
-            properties = [("name", name)]
-            tags = set()
-            for kw in keywords.split():
-                if ":" in kw:
-                    key, value = kw.split(":")
-                    properties.append((key, value))
-                else:
-                    tags.add(kw)
-            properties.append(("name", name))
             self.macs_by_name[name] = mac
             self._keywords_by_name[name] = keywords
             self.keywords_by_mac[mac] = keywords
             self.names_by_mac[mac] = name
-            self.corpus.add_doc(
-                logical_search.Document(
-                    docid=mac,
-                    tags=tags,
-                    properties=properties,
-                    reference=None,
-                )
-            )
+
+#            if "bookcase_light_" in name:
+#                bookcase_lights.append(mac)
+#            elif "diningroom_light_" in name:
+#                diningroom_lights.append(mac)
+#            else:
+            self.index_light(name, keywords, mac)
+
+        # name = 'bookcase_lights'
+        # group = []
+        # keywords = 'perm wifi light smart goog dimmer'
+        # for b in bookcase_lights:
+        #     group.append(self.get_light_by_mac(b))
+        # self.bookcase_group = GoogleLightGroup(
+        #     name,
+        #     group,
+        #     keywords,
+        # )
+        # mac = self.bookcase_group.get_mac()
+        # self.macs_by_name[name] = mac
+        # self._keywords_by_name[name] = keywords
+        # self.keywords_by_mac[mac] = keywords
+        # self.names_by_mac[mac] = name
+        # self.index_light(name, keywords, mac)
+
+        # name = 'dining_room_lights'
+        # group = []
+        # for b in diningroom_lights:
+        #     group.append(self.get_light_by_mac(b))
+        # self.diningroom_group = GoogleLightGroup(
+        #     name,
+        #     group,
+        #     keywords,
+        # )
+        # mac = self.diningroom_group.get_mac()
+        # self.macs_by_name[name] = mac
+        # self._keywords_by_name[name] = keywords
+        # self.keywords_by_mac[mac] = keywords
+        # self.names_by_mac[mac] = name
+        # self.index_light(name, keywords, mac)
+
+    def index_light(self, name: str, keywords: str, mac: str) -> None:
+        properties = [("name", name)]
+        tags = set()
+        for kw in keywords.split():
+            if ":" in kw:
+                key, value = kw.split(":")
+                properties.append((key, value))
+            else:
+                tags.add(kw)
+        light = logical_search.Document(
+            docid=mac,
+            tags=tags,
+            properties=properties,
+            reference=None,
+        )
+        self.corpus.add_doc(light)
 
     def __repr__(self) -> str:
         s = "Known devices:\n"
@@ -381,8 +554,14 @@ class LightingConfig(object):
         if mac in self.keywords_by_mac:
             name = self.names_by_mac[mac]
             kws = self.keywords_by_mac[mac]
-            if "tplink" in kws.lower():
+            if name == 'bookcase_lights':
+                return self.bookcase_group
+            elif name == 'dining_room_lights':
+                return self.diningroom_group
+            elif 'tplink' in kws.lower():
                 return TPLinkLight(name, mac, kws)
+            elif 'tuya' in kws.lower():
+                return TuyaLight(name, mac, kws)
             else:
                 return GoogleLight(name, mac, kws)
         return None