#!/usr/bin/env python3 """Utilities for dealing with the smart lights.""" from abc import abstractmethod import datetime import json import logging import os import re import subprocess import sys from typing import Any, Dict, List, Optional, Tuple from overrides import overrides import tinytuya as tt import ansi import argparse_utils import arper import config import logging_utils import smart_home.device as dev from google_assistant import ask_google, GoogleResponse from decorator_utils import timeout, memoized logger = logging.getLogger(__name__) args = config.add_commandline_args( f"Smart Lights ({__file__})", "Args related to smart lights.", ) args.add_argument( '--smart_lights_tplink_location', default='/home/scott/bin/tplink.py', metavar='FILENAME', help='The location of the tplink.py helper', type=argparse_utils.valid_filename, ) @timeout( 5.0, use_signals=False, error_message="Timed out waiting for tplink.py" ) def tplink_light_command(command: str) -> bool: result = os.system(command) signal = result & 0xFF if signal != 0: logger.warning(f'{command} died with signal {signal}') logging_utils.hlog("%s died with signal %d" % (command, signal)) return False else: exit_value = result >> 8 if exit_value != 0: logger.warning(f'{command} failed, exited {exit_value}') logging_utils.hlog("%s failed, exit %d" % (command, exit_value)) return False logger.debug(f'{command} succeeded.') return True class BaseLight(dev.Device): def __init__(self, name: str, mac: str, keywords: str = "") -> None: super().__init__(name.strip(), mac.strip(), keywords) @staticmethod def parse_color_string(color: str) -> Optional[Tuple[int, int, int]]: m = re.match( 'r#?([0-9a-fA-F][0-9a-fA-F])([0-9a-fA-F][0-9a-fA-F])([0-9a-fA-F][0-9a-fA-F])', color ) if m is not None and len(m.group) == 3: red = int(m.group(0), 16) green = int(m.group(1), 16) blue = int(m.group(2), 16) return (red, green, blue) color = color.lower() return ansi.COLOR_NAMES_TO_RGB.get(color, None) @abstractmethod def status(self) -> str: pass @abstractmethod def turn_on(self) -> bool: pass @abstractmethod def turn_off(self) -> bool: pass @abstractmethod def is_on(self) -> bool: pass @abstractmethod def is_off(self) -> bool: pass @abstractmethod def get_dimmer_level(self) -> Optional[int]: pass @abstractmethod def set_dimmer_level(self, level: int) -> bool: pass @abstractmethod def make_color(self, color: str) -> bool: pass class GoogleLight(BaseLight): def __init__(self, name: str, mac: str, keywords: str = "") -> None: super().__init__(name, mac, keywords) def goog_name(self) -> str: name = self.get_name() return name.replace("_", " ") @staticmethod def parse_google_response(response: GoogleResponse) -> bool: return response.success @overrides def turn_on(self) -> bool: return GoogleLight.parse_google_response( ask_google(f"turn {self.goog_name()} on") ) @overrides def turn_off(self) -> bool: return GoogleLight.parse_google_response( ask_google(f"turn {self.goog_name()} off") ) @overrides def status(self) -> str: if self.is_on(): return 'ON' return 'off' @overrides def is_on(self) -> bool: r = ask_google(f"is {self.goog_name()} on?") if not r.success: return False return 'is on' in r.audio_transcription @overrides def is_off(self) -> bool: return not self.is_on() @overrides def get_dimmer_level(self) -> Optional[int]: if not self.has_keyword("dimmer"): return False r = ask_google(f'how bright is {self.goog_name()}?') if not r.success: return None # the bookcase one is set to 40% bright txt = r.audio_transcription m = re.search(r"(\d+)% bright", txt) if m is not None: return int(m.group(1)) if "is off" in txt: return 0 return None @overrides def set_dimmer_level(self, level: int) -> bool: if not self.has_keyword("dimmer"): return False if 0 <= level <= 100: was_on = self.is_on() r = ask_google(f"set {self.goog_name()} to {level} percent") if not r.success: return False if not was_on: self.turn_off() return True return False @overrides def make_color(self, color: str) -> bool: return GoogleLight.parse_google_response( ask_google(f"make {self.goog_name()} {color}") ) class TuyaLight(BaseLight): ids_by_mac = { '68:C6:3A:DE:1A:94': '8844664268c63ade1a94', '68:C6:3A:DE:27:1A': '8844664268c63ade271a', '68:C6:3A:DE:1D:95': '8844664268c63ade1d95', '68:C6:3A:DE:19:B3': '8844664268c63ade19b3', '80:7D:3A:77:3B:F5': '07445340807d3a773bf5', '80:7D:3A:58:37:02': '07445340807d3a583702', } keys_by_mac = { '68:C6:3A:DE:1A:94': '237f19b1b3d49c36', '68:C6:3A:DE:27:1A': '237f19b1b3d49c36', '68:C6:3A:DE:1D:95': '04b90fc5cd7625d8', '68:C6:3A:DE:19:B3': '2d601f2892f1aefd', '80:7D:3A:77:3B:F5': '27ab921fe4633519', '80:7D:3A:58:37:02': '8559b5416bfa0c05', } def __init__(self, name: str, mac: str, keywords: str = "") -> None: super().__init__(name, mac, keywords) mac = mac.upper() if mac not in TuyaLight.ids_by_mac or mac not in TuyaLight.keys_by_mac: raise Exception(f'{mac} is unknown; add it to ids_by_mac and keys_by_mac') self.devid = TuyaLight.ids_by_mac[mac] self.key = TuyaLight.keys_by_mac[mac] self.arper = arper.Arper() ip = self.get_ip() self.bulb = tt.BulbDevice(self.devid, ip, local_key=self.key) def get_status(self) -> Dict[str, Any]: return self.bulb.status() @overrides def status(self) -> str: ret = '' for k, v in self.bulb.status().items(): ret += f'{k} = {v}\n' return ret @overrides def turn_on(self) -> bool: self.bulb.turn_on() return True @overrides def turn_off(self) -> bool: self.bulb.turn_off() return True @overrides def is_on(self) -> bool: s = self.get_status() return s['dps']['1'] @overrides def is_off(self) -> bool: return not self.is_on() @overrides def get_dimmer_level(self) -> Optional[int]: s = self.get_status() return s['dps']['3'] @overrides def set_dimmer_level(self, level: int) -> bool: logger.debug(f'Setting brightness to {level}') self.bulb.set_brightness(level) return True @overrides def make_color(self, color: str) -> bool: rgb = BaseLight.parse_color_string(color) logger.debug(f'Light color: {color} -> {rgb}') if rgb is not None: self.bulb.set_colour(rgb[0], rgb[1], rgb[2]) return True return False class TPLinkLight(BaseLight): def __init__(self, name: str, mac: str, keywords: str = "") -> None: super().__init__(name, mac, keywords) self.children: List[str] = [] self.info: Optional[Dict] = None self.info_ts: Optional[datetime.datetime] = None if "children" in self.keywords: self.info = self.get_info() if self.info is not None: for child in self.info["children"]: self.children.append(child["id"]) @memoized def get_tplink_name(self) -> Optional[str]: self.info = self.get_info() if self.info is not None: return self.info["alias"] return None def get_cmdline(self, child: str = None) -> str: cmd = ( f"{config.config['smart_lights_tplink_location']} -m {self.mac} " f"--no_logging_console " ) if child is not None: cmd += f"-x {child} " return cmd def get_children(self) -> List[str]: return self.children def command( self, cmd: str, child: str = None, extra_args: str = None ) -> bool: cmd = self.get_cmdline(child) + f"-c {cmd}" if extra_args is not None: cmd += f" {extra_args}" logger.debug(f'About to execute {cmd}') return tplink_light_command(cmd) @overrides def turn_on(self, child: str = None) -> bool: return self.command("on", child) @overrides def turn_off(self, child: str = None) -> bool: return self.command("off", child) @overrides def is_on(self) -> bool: return self.get_on_duration_seconds() > 0 @overrides def is_off(self) -> bool: return not self.is_on() @overrides def make_color(self, color: str) -> bool: raise NotImplementedError @timeout( 10.0, use_signals=False, error_message="Timed out waiting for tplink.py" ) def get_info(self) -> Optional[Dict]: cmd = self.get_cmdline() + "-c info" out = subprocess.getoutput(cmd) logger.debug(f'RAW OUT> {out}') out = re.sub("Sent:.*\n", "", out) out = re.sub("Received: *", "", out) try: self.info = json.loads(out)["system"]["get_sysinfo"] self.info_ts = datetime.datetime.now() return self.info except Exception as e: logger.exception(e) print(out, file=sys.stderr) self.info = None self.info_ts = None return None @overrides def status(self) -> str: ret = '' for k, v in self.get_info().items(): ret += f'{k} = {v}\n' return ret def get_on_duration_seconds(self, child: str = None) -> int: self.info = self.get_info() if child is None: if self.info is None: return 0 return int(self.info.get("on_time", "0")) else: if self.info is None: return 0 for chi in self.info.get("children", {}): if chi["id"] == child: return int(chi.get("on_time", "0")) return 0 @overrides def get_dimmer_level(self) -> Optional[int]: if not self.has_keyword("dimmer"): return False self.info = self.get_info() if self.info is None: return None return int(self.info.get("brightness", "0")) @overrides def set_dimmer_level(self, level: int) -> bool: if not self.has_keyword("dimmer"): return False cmd = ( self.get_cmdline() + f'-j \'{{"smartlife.iot.dimmer":{{"set_brightness":{{"brightness":{level} }} }} }}\'' ) return tplink_light_command(cmd) # class GoogleLightGroup(GoogleLight): # def __init__(self, name: str, members: List[GoogleLight], keywords: str = "") -> None: # if len(members) < 1: # raise Exception("There must be at least one light in the group.") # self.members = members # mac = GoogleLightGroup.make_up_mac(members) # super().__init__(name, mac, keywords) # @staticmethod # def make_up_mac(members: List[GoogleLight]): # mac = members[0].get_mac() # b = mac.split(':') # b[5] = int(b[5], 16) + 1 # if b[5] > 255: # b[5] = 0 # b[5] = str(b[5]) # return ":".join(b) # def is_on(self) -> bool: # r = ask_google(f"are {self.goog_name()} on?") # if not r.success: # return False # return 'is on' in r.audio_transcription # def get_dimmer_level(self) -> Optional[int]: # if not self.has_keyword("dimmer"): # return False # r = ask_google(f'how bright are {self.goog_name()}?') # if not r.success: # return None # # four lights are set to 100% brightness # txt = r.audio_transcription # m = re.search(r"(\d+)% bright", txt) # if m is not None: # return int(m.group(1)) # if "is off" in txt: # return 0 # return None # def set_dimmer_level(self, level: int) -> bool: # if not self.has_keyword("dimmer"): # return False # if 0 <= level <= 100: # was_on = self.is_on() # r = ask_google(f"set {self.goog_name()} to {level} percent") # if not r.success: # return False # if not was_on: # self.turn_off() # return True # return False # def make_color(self, color: str) -> bool: # return GoogleLight.parse_google_response( # ask_google(f"make {self.goog_name()} {color}") # )