X-Git-Url: https://wannabe.guru.org/gitweb/?a=blobdiff_plain;ds=inline;f=light_utils.py;h=6ca6e71db2329a65402116a5cf769048023521f6;hb=ed47f1a0c31184280a303563237e34c0e53437d7;hp=8c21b86312dcd25c5771fcd2be44c8ec0ca81dc4;hpb=497fb9e21f45ec08e1486abaee6dfa7b20b8a691;p=python_utils.git diff --git a/light_utils.py b/light_utils.py index 8c21b86..6ca6e71 100644 --- a/light_utils.py +++ b/light_utils.py @@ -10,13 +10,15 @@ import os import re import subprocess import sys -from typing import Dict, List, Optional, Set +from typing import Any, Dict, List, Optional, Set + +import tinytuya as tt import argparse_utils import config -import logical_search import logging_utils -import google_assistant as goog +import logical_search +from google_assistant import ask_google, GoogleResponse from decorator_utils import timeout, memoized logger = logging.getLogger(__name__) @@ -48,13 +50,16 @@ def tplink_light_command(command: str) -> bool: result = os.system(command) signal = result & 0xFF if signal != 0: + logger.warning(f'{command} died with signal {signal}') logging_utils.hlog("%s died with signal %d" % (command, signal)) return False else: exit_value = result >> 8 if exit_value != 0: + logger.warning(f'{command} failed, exited {exit_value}') logging_utils.hlog("%s failed, exit %d" % (command, exit_value)) return False + logger.debug(f'{command} succeeded.') return True @@ -79,6 +84,18 @@ class Light(ABC): def turn_off(self) -> bool: pass + @abstractmethod + def is_on(self) -> bool: + pass + + @abstractmethod + def is_off(self) -> bool: + pass + + @abstractmethod + def get_dimmer_level(self) -> Optional[int]: + pass + @abstractmethod def set_dimmer_level(self, level: int) -> bool: pass @@ -106,32 +123,128 @@ class GoogleLight(Light): return name.replace("_", " ") @staticmethod - def parse_google_response(response: goog.GoogleResponse) -> bool: + def parse_google_response(response: GoogleResponse) -> bool: return response.success def turn_on(self) -> bool: return GoogleLight.parse_google_response( - goog.ask_google(f"turn {self.goog_name()} on") + ask_google(f"turn {self.goog_name()} on") ) def turn_off(self) -> bool: return GoogleLight.parse_google_response( - goog.ask_google(f"turn {self.goog_name()} off") + ask_google(f"turn {self.goog_name()} off") ) + def is_on(self) -> bool: + r = ask_google(f"is {self.goog_name()} on?") + if not r.success: + return False + return 'is on' in r.audio_transcription + + def is_off(self) -> bool: + return not self.is_on() + + def get_dimmer_level(self) -> Optional[int]: + if not self.has_keyword("dimmer"): + return False + r = ask_google(f'how bright is {self.goog_name()}?') + if not r.success: + return None + + # the bookcase one is set to 40% bright + txt = r.audio_transcription + m = re.search(r"(\d+)% bright", txt) + if m is not None: + return int(m.group(1)) + if "is off" in txt: + return 0 + return None + def set_dimmer_level(self, level: int) -> bool: + if not self.has_keyword("dimmer"): + return False if 0 <= level <= 100: - return GoogleLight.parse_google_response( - goog.ask_google(f"set {self.goog_name()} to {level} percent") - ) + was_on = self.is_on() + r = ask_google(f"set {self.goog_name()} to {level} percent") + if not r.success: + return False + if not was_on: + self.turn_off() + return True return False def make_color(self, color: str) -> bool: return GoogleLight.parse_google_response( - goog.ask_google(f"make {self.goog_name()} {color}") + ask_google(f"make {self.goog_name()} {color}") ) +class TuyaLight(Light): + ids_by_mac = { + '68:C6:3A:DE:1A:94': '8844664268c63ade1a94', + '68:C6:3A:DE:27:1A': '8844664268c63ade271a', + '68:C6:3A:DE:1D:95': '8844664268c63ade1d95', + '68:C6:3A:DE:19:B3': '8844664268c63ade19b3', + '80:7D:3A:77:3B:F5': '07445340807d3a773bf5', + '80:7D:3A:58:37:02': '07445340807d3a583702', + } + keys_by_mac = { + '68:C6:3A:DE:1A:94': '237f19b1b3d49c36', + '68:C6:3A:DE:27:1A': '237f19b1b3d49c36', + '68:C6:3A:DE:1D:95': '04b90fc5cd7625d8', + '68:C6:3A:DE:19:B3': '2d601f2892f1aefd', + '80:7D:3A:77:3B:F5': '27ab921fe4633519', + '80:7D:3A:58:37:02': '8559b5416bfa0c05', + } + + def __init__(self, name: str, mac: str, keywords: str = "") -> None: + from subprocess import Popen, PIPE + super().__init__(name, mac, keywords) + mac = mac.upper() + if mac not in TuyaLight.ids_by_mac or mac not in TuyaLight.keys_by_mac: + raise Exception(f'{mac} is unknown; add it to ids_by_mac and keys_by_mac') + self.devid = TuyaLight.ids_by_mac[mac] + self.key = TuyaLight.keys_by_mac[mac] + try: + pid = Popen(['maclookup', mac], stdout=PIPE) + ip = pid.communicate()[0] + ip = ip[:-1] + except Exception: + ip = '0.0.0.0' + self.bulb = tt.BulbDevice(self.devid, ip, local_key=self.key) + + def turn_on(self) -> bool: + self.bulb.turn_on() + return True + + def turn_off(self) -> bool: + self.bulb.turn_off() + return True + + def get_status(self) -> Dict[str, Any]: + return self.bulb.status() + + def is_on(self) -> bool: + s = self.get_status() + return s['dps']['1'] + + def is_off(self) -> bool: + return not self.is_on() + + def get_dimmer_level(self) -> Optional[int]: + s = self.get_status() + return s['dps']['3'] + + def set_dimmer_level(self, level: int) -> bool: + self.bulb.set_brightness(level) + return True + + def make_color(self, color: str) -> bool: + self.bulb.set_colour(255,0,0) + return True + + class TPLinkLight(Light): def __init__(self, name: str, mac: str, keywords: str = "") -> None: super().__init__(name, mac, keywords) @@ -169,6 +282,7 @@ class TPLinkLight(Light): cmd = self.get_cmdline(child) + f"-c {cmd}" if extra_args is not None: cmd += f" {extra_args}" + logger.debug(f'About to execute {cmd}') return tplink_light_command(cmd) def turn_on(self, child: str = None) -> bool: @@ -177,6 +291,12 @@ class TPLinkLight(Light): def turn_off(self, child: str = None) -> bool: return self.command("off", child) + def is_on(self) -> bool: + return self.get_on_duration_seconds() > 0 + + def is_off(self) -> bool: + return not self.is_on() + def make_color(self, color: str) -> bool: raise NotImplementedError @@ -220,6 +340,14 @@ class TPLinkLight(Light): return int(m.group(1)) * 60 return None + def get_dimmer_level(self) -> Optional[int]: + if not self.has_keyword("dimmer"): + return False + self.info = self.get_info() + if self.info is None: + return None + return int(self.info.get("brightness", "0")) + def set_dimmer_level(self, level: int) -> bool: if not self.has_keyword("dimmer"): return False @@ -230,6 +358,65 @@ class TPLinkLight(Light): return tplink_light_command(cmd) +class GoogleLightGroup(GoogleLight): + def __init__(self, name: str, members: List[GoogleLight], keywords: str = "") -> None: + if len(members) < 1: + raise Exception("There must be at least one light in the group.") + self.members = members + mac = GoogleLightGroup.make_up_mac(members) + super().__init__(name, mac, keywords) + + @staticmethod + def make_up_mac(members: List[GoogleLight]): + mac = members[0].get_mac() + b = mac.split(':') + b[5] = int(b[5], 16) + 1 + if b[5] > 255: + b[5] = 0 + b[5] = str(b[5]) + return ":".join(b) + + def is_on(self) -> bool: + r = ask_google(f"are {self.goog_name()} on?") + if not r.success: + return False + return 'is on' in r.audio_transcription + + def get_dimmer_level(self) -> Optional[int]: + if not self.has_keyword("dimmer"): + return False + r = ask_google(f'how bright are {self.goog_name()}?') + if not r.success: + return None + + # four lights are set to 100% brightness + txt = r.audio_transcription + m = re.search(r"(\d+)% bright", txt) + if m is not None: + return int(m.group(1)) + if "is off" in txt: + return 0 + return None + + def set_dimmer_level(self, level: int) -> bool: + if not self.has_keyword("dimmer"): + return False + if 0 <= level <= 100: + was_on = self.is_on() + r = ask_google(f"set {self.goog_name()} to {level} percent") + if not r.success: + return False + if not was_on: + self.turn_off() + return True + return False + + def make_color(self, color: str) -> bool: + return GoogleLight.parse_google_response( + ask_google(f"make {self.goog_name()} {color}") + ) + + class LightingConfig(object): """Representation of the smart light device config.""" @@ -248,6 +435,9 @@ class LightingConfig(object): self.corpus = logical_search.Corpus() with open(config_file, "r") as f: contents = f.readlines() + + diningroom_lights = [] + bookcase_lights = [] for line in contents: line = line.rstrip("\n") line = re.sub(r"#.*$", r"", line) @@ -260,27 +450,67 @@ class LightingConfig(object): keywords = keywords.strip() if "perm" not in keywords: continue - properties = [("name", name)] - tags = set() - for kw in keywords.split(): - if ":" in kw: - key, value = kw.split(":") - properties.append((key, value)) - else: - tags.add(kw) - properties.append(("name", name)) self.macs_by_name[name] = mac self._keywords_by_name[name] = keywords self.keywords_by_mac[mac] = keywords self.names_by_mac[mac] = name - self.corpus.add_doc( - logical_search.Document( - docid=mac, - tags=tags, - properties=properties, - reference=None, - ) - ) + +# if "bookcase_light_" in name: +# bookcase_lights.append(mac) +# elif "diningroom_light_" in name: +# diningroom_lights.append(mac) +# else: + self.index_light(name, keywords, mac) + + # name = 'bookcase_lights' + # group = [] + # keywords = 'perm wifi light smart goog dimmer' + # for b in bookcase_lights: + # group.append(self.get_light_by_mac(b)) + # self.bookcase_group = GoogleLightGroup( + # name, + # group, + # keywords, + # ) + # mac = self.bookcase_group.get_mac() + # self.macs_by_name[name] = mac + # self._keywords_by_name[name] = keywords + # self.keywords_by_mac[mac] = keywords + # self.names_by_mac[mac] = name + # self.index_light(name, keywords, mac) + + # name = 'dining_room_lights' + # group = [] + # for b in diningroom_lights: + # group.append(self.get_light_by_mac(b)) + # self.diningroom_group = GoogleLightGroup( + # name, + # group, + # keywords, + # ) + # mac = self.diningroom_group.get_mac() + # self.macs_by_name[name] = mac + # self._keywords_by_name[name] = keywords + # self.keywords_by_mac[mac] = keywords + # self.names_by_mac[mac] = name + # self.index_light(name, keywords, mac) + + def index_light(self, name: str, keywords: str, mac: str) -> None: + properties = [("name", name)] + tags = set() + for kw in keywords.split(): + if ":" in kw: + key, value = kw.split(":") + properties.append((key, value)) + else: + tags.add(kw) + light = logical_search.Document( + docid=mac, + tags=tags, + properties=properties, + reference=None, + ) + self.corpus.add_doc(light) def __repr__(self) -> str: s = "Known devices:\n" @@ -324,8 +554,14 @@ class LightingConfig(object): if mac in self.keywords_by_mac: name = self.names_by_mac[mac] kws = self.keywords_by_mac[mac] - if "tplink" in kws.lower(): + if name == 'bookcase_lights': + return self.bookcase_group + elif name == 'dining_room_lights': + return self.diningroom_group + elif 'tplink' in kws.lower(): return TPLinkLight(name, mac, kws) + elif 'tuya' in kws.lower(): + return TuyaLight(name, mac, kws) else: return GoogleLight(name, mac, kws) return None