X-Git-Url: https://wannabe.guru.org/gitweb/?a=blobdiff_plain;f=smart_home%2Flights.py;h=2a0b1cd802f500eae7caa94d82d32dd283904403;hb=c6fca944b41f292b66efaba27ebf3fd0a37956b8;hp=54467223b9b1cc36c5edabe9d81df187cc1a409c;hpb=29ee1f98654a689e9cab76b0c7c68428faa43a8c;p=python_utils.git diff --git a/smart_home/lights.py b/smart_home/lights.py index 5446722..2a0b1cd 100644 --- a/smart_home/lights.py +++ b/smart_home/lights.py @@ -2,32 +2,31 @@ """Utilities for dealing with the smart lights.""" -from abc import abstractmethod import datetime -import json import logging -import os import re -import subprocess -import sys -from typing import Any, Dict, List, Optional +from abc import abstractmethod +from typing import Any, Dict, List, Optional, Tuple import tinytuya as tt +from overrides import overrides +import ansi import argparse_utils +import arper import config -import logging_utils import smart_home.device as dev -from google_assistant import ask_google, GoogleResponse -from decorator_utils import timeout, memoized +import smart_home.tplink_utils as tplink +from decorator_utils import memoized +from google_assistant import GoogleResponse, ask_google logger = logging.getLogger(__name__) -parser = config.add_commandline_args( +args = config.add_commandline_args( f"Smart Lights ({__file__})", "Args related to smart lights.", ) -parser.add_argument( +args.add_argument( '--smart_lights_tplink_location', default='/home/scott/bin/tplink.py', metavar='FILENAME', @@ -36,30 +35,30 @@ parser.add_argument( ) -@timeout( - 5.0, use_signals=False, error_message="Timed out waiting for tplink.py" -) -def tplink_light_command(command: str) -> bool: - result = os.system(command) - signal = result & 0xFF - if signal != 0: - logger.warning(f'{command} died with signal {signal}') - logging_utils.hlog("%s died with signal %d" % (command, signal)) - return False - else: - exit_value = result >> 8 - if exit_value != 0: - logger.warning(f'{command} failed, exited {exit_value}') - logging_utils.hlog("%s failed, exit %d" % (command, exit_value)) - return False - logger.debug(f'{command} succeeded.') - return True - - class BaseLight(dev.Device): + """A base class representing a smart light.""" + def __init__(self, name: str, mac: str, keywords: str = "") -> None: super().__init__(name.strip(), mac.strip(), keywords) + @staticmethod + def parse_color_string(color: str) -> Optional[Tuple[int, int, int]]: + m = re.match( + 'r#?([0-9a-fA-F][0-9a-fA-F])([0-9a-fA-F][0-9a-fA-F])([0-9a-fA-F][0-9a-fA-F])', + color, + ) + if m is not None and len(m.groups()) == 3: + red = int(m.group(0), 16) + green = int(m.group(1), 16) + blue = int(m.group(2), 16) + return (red, green, blue) + color = color.lower() + return ansi.COLOR_NAMES_TO_RGB.get(color, None) + + @abstractmethod + def status(self) -> str: + pass + @abstractmethod def turn_on(self) -> bool: pass @@ -90,8 +89,7 @@ class BaseLight(dev.Device): class GoogleLight(BaseLight): - def __init__(self, name: str, mac: str, keywords: str = "") -> None: - super().__init__(name, mac, keywords) + """A smart light controlled by talking to Google.""" def goog_name(self) -> str: name = self.get_name() @@ -101,25 +99,34 @@ class GoogleLight(BaseLight): def parse_google_response(response: GoogleResponse) -> bool: return response.success + @overrides def turn_on(self) -> bool: - return GoogleLight.parse_google_response( - ask_google(f"turn {self.goog_name()} on") - ) + return GoogleLight.parse_google_response(ask_google(f"turn {self.goog_name()} on")) + @overrides def turn_off(self) -> bool: - return GoogleLight.parse_google_response( - ask_google(f"turn {self.goog_name()} off") - ) + return GoogleLight.parse_google_response(ask_google(f"turn {self.goog_name()} off")) + @overrides + def status(self) -> str: + if self.is_on(): + return 'ON' + return 'off' + + @overrides def is_on(self) -> bool: r = ask_google(f"is {self.goog_name()} on?") if not r.success: return False - return 'is on' in r.audio_transcription + if r.audio_transcription is not None: + return 'is on' in r.audio_transcription + raise Exception("Can't reach Google?!") + @overrides def is_off(self) -> bool: return not self.is_on() + @overrides def get_dimmer_level(self) -> Optional[int]: if not self.has_keyword("dimmer"): return False @@ -129,13 +136,15 @@ class GoogleLight(BaseLight): # the bookcase one is set to 40% bright txt = r.audio_transcription - m = re.search(r"(\d+)% bright", txt) - if m is not None: - return int(m.group(1)) - if "is off" in txt: - return 0 + if txt is not None: + m = re.search(r"(\d+)% bright", txt) + if m is not None: + return int(m.group(1)) + if "is off" in txt: + return 0 return None + @overrides def set_dimmer_level(self, level: int) -> bool: if not self.has_keyword("dimmer"): return False @@ -149,13 +158,14 @@ class GoogleLight(BaseLight): return True return False + @overrides def make_color(self, color: str) -> bool: - return GoogleLight.parse_google_response( - ask_google(f"make {self.goog_name()} {color}") - ) + return GoogleLight.parse_google_response(ask_google(f"make {self.goog_name()} {color}")) class TuyaLight(BaseLight): + """A Tuya smart light.""" + ids_by_mac = { '68:C6:3A:DE:1A:94': '8844664268c63ade1a94', '68:C6:3A:DE:27:1A': '8844664268c63ade271a', @@ -174,63 +184,80 @@ class TuyaLight(BaseLight): } def __init__(self, name: str, mac: str, keywords: str = "") -> None: - from subprocess import Popen, PIPE super().__init__(name, mac, keywords) mac = mac.upper() if mac not in TuyaLight.ids_by_mac or mac not in TuyaLight.keys_by_mac: raise Exception(f'{mac} is unknown; add it to ids_by_mac and keys_by_mac') self.devid = TuyaLight.ids_by_mac[mac] self.key = TuyaLight.keys_by_mac[mac] - try: - pid = Popen(['maclookup', mac], stdout=PIPE) - ip = pid.communicate()[0] - ip = ip[:-1] - except Exception: - ip = '0.0.0.0' + self.arper = arper.Arper() + ip = self.get_ip() self.bulb = tt.BulbDevice(self.devid, ip, local_key=self.key) + def get_status(self) -> Dict[str, Any]: + return self.bulb.status() + + @overrides + def status(self) -> str: + ret = '' + for k, v in self.bulb.status().items(): + ret += f'{k} = {v}\n' + return ret + + @overrides def turn_on(self) -> bool: self.bulb.turn_on() return True + @overrides def turn_off(self) -> bool: self.bulb.turn_off() return True - def get_status(self) -> Dict[str, Any]: - return self.bulb.status() - + @overrides def is_on(self) -> bool: s = self.get_status() return s['dps']['1'] + @overrides def is_off(self) -> bool: return not self.is_on() + @overrides def get_dimmer_level(self) -> Optional[int]: s = self.get_status() return s['dps']['3'] + @overrides def set_dimmer_level(self, level: int) -> bool: + logger.debug('Setting brightness to %d', level) self.bulb.set_brightness(level) return True + @overrides def make_color(self, color: str) -> bool: - self.bulb.set_colour(255,0,0) - return True + rgb = BaseLight.parse_color_string(color) + logger.debug('Light color: %s -> %s', color, rgb) + if rgb is not None: + self.bulb.set_colour(rgb[0], rgb[1], rgb[2]) + return True + return False class TPLinkLight(BaseLight): + """A TPLink smart light.""" + def __init__(self, name: str, mac: str, keywords: str = "") -> None: super().__init__(name, mac, keywords) self.children: List[str] = [] self.info: Optional[Dict] = None self.info_ts: Optional[datetime.datetime] = None - if "children" in self.keywords: - self.info = self.get_info() - if self.info is not None: - for child in self.info["children"]: - self.children.append(child["id"]) + if self.keywords is not None: + if "children" in self.keywords: + self.info = self.get_info() + if self.info is not None: + for child in self.info["children"]: + self.children.append(child["id"]) @memoized def get_tplink_name(self) -> Optional[str]: @@ -251,48 +278,61 @@ class TPLinkLight(BaseLight): def get_children(self) -> List[str]: return self.children - def command( - self, cmd: str, child: str = None, extra_args: str = None - ) -> bool: + def command(self, cmd: str, child: str = None, extra_args: str = None) -> bool: cmd = self.get_cmdline(child) + f"-c {cmd}" if extra_args is not None: cmd += f" {extra_args}" - logger.debug(f'About to execute {cmd}') - return tplink_light_command(cmd) + logger.debug('About to execute: %s', cmd) + return tplink.tplink_command_wrapper(cmd) + + @overrides + def turn_on(self) -> bool: + return self.command("on", None) + + @overrides + def turn_off(self) -> bool: + return self.command("off", None) - def turn_on(self, child: str = None) -> bool: + def turn_on_child(self, child: str = None) -> bool: return self.command("on", child) - def turn_off(self, child: str = None) -> bool: + def turn_off_child(self, child: str = None) -> bool: return self.command("off", child) + @overrides def is_on(self) -> bool: - return self.get_on_duration_seconds() > 0 + self.info = self.get_info() + if self.info is None: + raise Exception('Unable to get info?') + return self.info.get("relay_state", 0) == 1 + @overrides def is_off(self) -> bool: return not self.is_on() + @overrides def make_color(self, color: str) -> bool: raise NotImplementedError - @timeout( - 10.0, use_signals=False, error_message="Timed out waiting for tplink.py" - ) def get_info(self) -> Optional[Dict]: - cmd = self.get_cmdline() + "-c info" - out = subprocess.getoutput(cmd) - out = re.sub("Sent:.*\n", "", out) - out = re.sub("Received: *", "", out) - try: - self.info = json.loads(out)["system"]["get_sysinfo"] - self.info_ts = datetime.datetime.now() + ip = self.get_ip() + if ip is not None: + self.info = tplink.tplink_get_info(ip) + if self.info is not None: + self.info_ts = datetime.datetime.now() + else: + self.info_ts = None return self.info - except Exception as e: - logger.exception(e) - print(out, file=sys.stderr) - self.info = None - self.info_ts = None - return None + return None + + @overrides + def status(self) -> str: + ret = '' + info = self.get_info() + if info is not None: + for k, v in info: + ret += f'{k} = {v}\n' + return ret def get_on_duration_seconds(self, child: str = None) -> int: self.info = self.get_info() @@ -308,13 +348,7 @@ class TPLinkLight(BaseLight): return int(chi.get("on_time", "0")) return 0 - def get_on_limit_seconds(self) -> Optional[int]: - for kw in self.kws: - m = re.search(r"timeout:(\d+)", kw) - if m is not None: - return int(m.group(1)) * 60 - return None - + @overrides def get_dimmer_level(self) -> Optional[int]: if not self.has_keyword("dimmer"): return False @@ -323,14 +357,15 @@ class TPLinkLight(BaseLight): return None return int(self.info.get("brightness", "0")) + @overrides def set_dimmer_level(self, level: int) -> bool: if not self.has_keyword("dimmer"): return False cmd = ( self.get_cmdline() - + f'-j \'{{"smartlife.iot.dimmer":{{"set_brightness":{{"brightness":{level} }} }} }}\'' + + '-j \'{"smartlife.iot.dimmer":{"set_brightness":{"brightness":%d}}}\'' % level ) - return tplink_light_command(cmd) + return tplink.tplink_command_wrapper(cmd) # class GoogleLightGroup(GoogleLight):