X-Git-Url: https://wannabe.guru.org/gitweb/?a=blobdiff_plain;f=smart_home%2Flights.py;h=240e7da84412f089702b7b50bd4ce5b5080ca0eb;hb=7ff2af6fe7bffea90dc4a31c93140c189917c659;hp=07521398307a9234eaac8d2e977b30def714a40d;hpb=2a9cbfa6e97a8cb5ed68c838f5ec09bef654c37f;p=python_utils.git diff --git a/smart_home/lights.py b/smart_home/lights.py index 0752139..240e7da 100644 --- a/smart_home/lights.py +++ b/smart_home/lights.py @@ -2,7 +2,6 @@ """Utilities for dealing with the smart lights.""" -from abc import ABC, abstractmethod import datetime import json import logging @@ -10,24 +9,28 @@ import os import re import subprocess import sys -from typing import Any, Dict, List, Optional, Set +from abc import abstractmethod +from typing import Any, Dict, List, Optional, Tuple import tinytuya as tt +from overrides import overrides +import ansi import argparse_utils +import arper import config import logging_utils import smart_home.device as dev -from google_assistant import ask_google, GoogleResponse -from decorator_utils import timeout, memoized +from decorator_utils import memoized, timeout +from google_assistant import GoogleResponse, ask_google logger = logging.getLogger(__name__) -parser = config.add_commandline_args( +args = config.add_commandline_args( f"Smart Lights ({__file__})", "Args related to smart lights.", ) -parser.add_argument( +args.add_argument( '--smart_lights_tplink_location', default='/home/scott/bin/tplink.py', metavar='FILENAME', @@ -36,21 +39,21 @@ parser.add_argument( ) -@timeout( - 5.0, use_signals=False, error_message="Timed out waiting for tplink.py" -) +@timeout(5.0, use_signals=False, error_message="Timed out waiting for tplink.py") def tplink_light_command(command: str) -> bool: result = os.system(command) signal = result & 0xFF if signal != 0: - logger.warning(f'{command} died with signal {signal}') - logging_utils.hlog("%s died with signal %d" % (command, signal)) + msg = f'{command} died with signal {signal}' + logger.warning(msg) + logging_utils.hlog(msg) return False else: exit_value = result >> 8 if exit_value != 0: - logger.warning(f'{command} failed, exited {exit_value}') - logging_utils.hlog("%s failed, exit %d" % (command, exit_value)) + msg = f'{command} failed, exited {exit_value}' + logger.warning(msg) + logging_utils.hlog(msg) return False logger.debug(f'{command} succeeded.') return True @@ -60,6 +63,24 @@ class BaseLight(dev.Device): def __init__(self, name: str, mac: str, keywords: str = "") -> None: super().__init__(name.strip(), mac.strip(), keywords) + @staticmethod + def parse_color_string(color: str) -> Optional[Tuple[int, int, int]]: + m = re.match( + 'r#?([0-9a-fA-F][0-9a-fA-F])([0-9a-fA-F][0-9a-fA-F])([0-9a-fA-F][0-9a-fA-F])', + color, + ) + if m is not None and len(m.groups()) == 3: + red = int(m.group(0), 16) + green = int(m.group(1), 16) + blue = int(m.group(2), 16) + return (red, green, blue) + color = color.lower() + return ansi.COLOR_NAMES_TO_RGB.get(color, None) + + @abstractmethod + def status(self) -> str: + pass + @abstractmethod def turn_on(self) -> bool: pass @@ -101,25 +122,38 @@ class GoogleLight(BaseLight): def parse_google_response(response: GoogleResponse) -> bool: return response.success + @overrides def turn_on(self) -> bool: return GoogleLight.parse_google_response( ask_google(f"turn {self.goog_name()} on") ) + @overrides def turn_off(self) -> bool: return GoogleLight.parse_google_response( ask_google(f"turn {self.goog_name()} off") ) + @overrides + def status(self) -> str: + if self.is_on(): + return 'ON' + return 'off' + + @overrides def is_on(self) -> bool: r = ask_google(f"is {self.goog_name()} on?") if not r.success: return False - return 'is on' in r.audio_transcription + if r.audio_transcription is not None: + return 'is on' in r.audio_transcription + raise Exception("Can't reach Google?!") + @overrides def is_off(self) -> bool: return not self.is_on() + @overrides def get_dimmer_level(self) -> Optional[int]: if not self.has_keyword("dimmer"): return False @@ -129,13 +163,15 @@ class GoogleLight(BaseLight): # the bookcase one is set to 40% bright txt = r.audio_transcription - m = re.search(r"(\d+)% bright", txt) - if m is not None: - return int(m.group(1)) - if "is off" in txt: - return 0 + if txt is not None: + m = re.search(r"(\d+)% bright", txt) + if m is not None: + return int(m.group(1)) + if "is off" in txt: + return 0 return None + @overrides def set_dimmer_level(self, level: int) -> bool: if not self.has_keyword("dimmer"): return False @@ -149,6 +185,7 @@ class GoogleLight(BaseLight): return True return False + @overrides def make_color(self, color: str) -> bool: return GoogleLight.parse_google_response( ask_google(f"make {self.goog_name()} {color}") @@ -174,50 +211,64 @@ class TuyaLight(BaseLight): } def __init__(self, name: str, mac: str, keywords: str = "") -> None: - from subprocess import Popen, PIPE super().__init__(name, mac, keywords) mac = mac.upper() if mac not in TuyaLight.ids_by_mac or mac not in TuyaLight.keys_by_mac: raise Exception(f'{mac} is unknown; add it to ids_by_mac and keys_by_mac') self.devid = TuyaLight.ids_by_mac[mac] self.key = TuyaLight.keys_by_mac[mac] - try: - pid = Popen(['maclookup', mac], stdout=PIPE) - ip = pid.communicate()[0] - ip = ip[:-1] - except Exception: - ip = '0.0.0.0' + self.arper = arper.Arper() + ip = self.get_ip() self.bulb = tt.BulbDevice(self.devid, ip, local_key=self.key) + def get_status(self) -> Dict[str, Any]: + return self.bulb.status() + + @overrides + def status(self) -> str: + ret = '' + for k, v in self.bulb.status().items(): + ret += f'{k} = {v}\n' + return ret + + @overrides def turn_on(self) -> bool: self.bulb.turn_on() return True + @overrides def turn_off(self) -> bool: self.bulb.turn_off() return True - def get_status(self) -> Dict[str, Any]: - return self.bulb.status() - + @overrides def is_on(self) -> bool: s = self.get_status() return s['dps']['1'] + @overrides def is_off(self) -> bool: return not self.is_on() + @overrides def get_dimmer_level(self) -> Optional[int]: s = self.get_status() return s['dps']['3'] + @overrides def set_dimmer_level(self, level: int) -> bool: + logger.debug(f'Setting brightness to {level}') self.bulb.set_brightness(level) return True + @overrides def make_color(self, color: str) -> bool: - self.bulb.set_colour(255,0,0) - return True + rgb = BaseLight.parse_color_string(color) + logger.debug(f'Light color: {color} -> {rgb}') + if rgb is not None: + self.bulb.set_colour(rgb[0], rgb[1], rgb[2]) + return True + return False class TPLinkLight(BaseLight): @@ -251,40 +302,46 @@ class TPLinkLight(BaseLight): def get_children(self) -> List[str]: return self.children - def command( - self, cmd: str, child: str = None, extra_args: str = None - ) -> bool: + def command(self, cmd: str, child: str = None, extra_args: str = None) -> bool: cmd = self.get_cmdline(child) + f"-c {cmd}" if extra_args is not None: cmd += f" {extra_args}" logger.debug(f'About to execute {cmd}') return tplink_light_command(cmd) + @overrides def turn_on(self, child: str = None) -> bool: return self.command("on", child) + @overrides def turn_off(self, child: str = None) -> bool: return self.command("off", child) + @overrides def is_on(self) -> bool: - return self.get_on_duration_seconds() > 0 + self.info = self.get_info() + if self.info is None: + raise Exception('Unable to get info?') + return self.info.get("relay_state", 0) == 1 + @overrides def is_off(self) -> bool: return not self.is_on() + @overrides def make_color(self, color: str) -> bool: raise NotImplementedError - @timeout( - 10.0, use_signals=False, error_message="Timed out waiting for tplink.py" - ) + @timeout(10.0, use_signals=False, error_message="Timed out waiting for tplink.py") def get_info(self) -> Optional[Dict]: cmd = self.get_cmdline() + "-c info" out = subprocess.getoutput(cmd) + logger.debug(f'RAW OUT> {out}') out = re.sub("Sent:.*\n", "", out) out = re.sub("Received: *", "", out) try: self.info = json.loads(out)["system"]["get_sysinfo"] + logger.debug(json.dumps(self.info, indent=4, sort_keys=True)) self.info_ts = datetime.datetime.now() return self.info except Exception as e: @@ -294,6 +351,13 @@ class TPLinkLight(BaseLight): self.info_ts = None return None + @overrides + def status(self) -> str: + ret = '' + for k, v in self.get_info().items(): + ret += f'{k} = {v}\n' + return ret + def get_on_duration_seconds(self, child: str = None) -> int: self.info = self.get_info() if child is None: @@ -308,13 +372,7 @@ class TPLinkLight(BaseLight): return int(chi.get("on_time", "0")) return 0 - def get_on_limit_seconds(self) -> Optional[int]: - for kw in self.kws: - m = re.search(r"timeout:(\d+)", kw) - if m is not None: - return int(m.group(1)) * 60 - return None - + @overrides def get_dimmer_level(self) -> Optional[int]: if not self.has_keyword("dimmer"): return False @@ -323,6 +381,7 @@ class TPLinkLight(BaseLight): return None return int(self.info.get("brightness", "0")) + @overrides def set_dimmer_level(self, level: int) -> bool: if not self.has_keyword("dimmer"): return False