#!/usr/bin/env python3 """Utilities for dealing with the smart lights.""" from abc import abstractmethod import datetime import json import logging import os import re import subprocess import sys from typing import Any, Dict, List, Optional import tinytuya as tt import argparse_utils import config import logging_utils import smart_home.device as dev from google_assistant import ask_google, GoogleResponse from decorator_utils import timeout, memoized logger = logging.getLogger(__name__) parser = config.add_commandline_args( f"Smart Lights ({__file__})", "Args related to smart lights.", ) parser.add_argument( '--smart_lights_tplink_location', default='/home/scott/bin/tplink.py', metavar='FILENAME', help='The location of the tplink.py helper', type=argparse_utils.valid_filename, ) @timeout( 5.0, use_signals=False, error_message="Timed out waiting for tplink.py" ) def tplink_light_command(command: str) -> bool: result = os.system(command) signal = result & 0xFF if signal != 0: logger.warning(f'{command} died with signal {signal}') logging_utils.hlog("%s died with signal %d" % (command, signal)) return False else: exit_value = result >> 8 if exit_value != 0: logger.warning(f'{command} failed, exited {exit_value}') logging_utils.hlog("%s failed, exit %d" % (command, exit_value)) return False logger.debug(f'{command} succeeded.') return True class BaseLight(dev.Device): def __init__(self, name: str, mac: str, keywords: str = "") -> None: super().__init__(name.strip(), mac.strip(), keywords) @abstractmethod def turn_on(self) -> bool: pass @abstractmethod def turn_off(self) -> bool: pass @abstractmethod def is_on(self) -> bool: pass @abstractmethod def is_off(self) -> bool: pass @abstractmethod def get_dimmer_level(self) -> Optional[int]: pass @abstractmethod def set_dimmer_level(self, level: int) -> bool: pass @abstractmethod def make_color(self, color: str) -> bool: pass class GoogleLight(BaseLight): def __init__(self, name: str, mac: str, keywords: str = "") -> None: super().__init__(name, mac, keywords) def goog_name(self) -> str: name = self.get_name() return name.replace("_", " ") @staticmethod def parse_google_response(response: GoogleResponse) -> bool: return response.success def turn_on(self) -> bool: return GoogleLight.parse_google_response( ask_google(f"turn {self.goog_name()} on") ) def turn_off(self) -> bool: return GoogleLight.parse_google_response( ask_google(f"turn {self.goog_name()} off") ) def is_on(self) -> bool: r = ask_google(f"is {self.goog_name()} on?") if not r.success: return False return 'is on' in r.audio_transcription def is_off(self) -> bool: return not self.is_on() def get_dimmer_level(self) -> Optional[int]: if not self.has_keyword("dimmer"): return False r = ask_google(f'how bright is {self.goog_name()}?') if not r.success: return None # the bookcase one is set to 40% bright txt = r.audio_transcription m = re.search(r"(\d+)% bright", txt) if m is not None: return int(m.group(1)) if "is off" in txt: return 0 return None def set_dimmer_level(self, level: int) -> bool: if not self.has_keyword("dimmer"): return False if 0 <= level <= 100: was_on = self.is_on() r = ask_google(f"set {self.goog_name()} to {level} percent") if not r.success: return False if not was_on: self.turn_off() return True return False def make_color(self, color: str) -> bool: return GoogleLight.parse_google_response( ask_google(f"make {self.goog_name()} {color}") ) class TuyaLight(BaseLight): ids_by_mac = { '68:C6:3A:DE:1A:94': '8844664268c63ade1a94', '68:C6:3A:DE:27:1A': '8844664268c63ade271a', '68:C6:3A:DE:1D:95': '8844664268c63ade1d95', '68:C6:3A:DE:19:B3': '8844664268c63ade19b3', '80:7D:3A:77:3B:F5': '07445340807d3a773bf5', '80:7D:3A:58:37:02': '07445340807d3a583702', } keys_by_mac = { '68:C6:3A:DE:1A:94': '237f19b1b3d49c36', '68:C6:3A:DE:27:1A': '237f19b1b3d49c36', '68:C6:3A:DE:1D:95': '04b90fc5cd7625d8', '68:C6:3A:DE:19:B3': '2d601f2892f1aefd', '80:7D:3A:77:3B:F5': '27ab921fe4633519', '80:7D:3A:58:37:02': '8559b5416bfa0c05', } def __init__(self, name: str, mac: str, keywords: str = "") -> None: from subprocess import Popen, PIPE super().__init__(name, mac, keywords) mac = mac.upper() if mac not in TuyaLight.ids_by_mac or mac not in TuyaLight.keys_by_mac: raise Exception(f'{mac} is unknown; add it to ids_by_mac and keys_by_mac') self.devid = TuyaLight.ids_by_mac[mac] self.key = TuyaLight.keys_by_mac[mac] try: pid = Popen(['maclookup', mac], stdout=PIPE) ip = pid.communicate()[0] ip = ip[:-1] except Exception: ip = '0.0.0.0' self.bulb = tt.BulbDevice(self.devid, ip, local_key=self.key) def turn_on(self) -> bool: self.bulb.turn_on() return True def turn_off(self) -> bool: self.bulb.turn_off() return True def get_status(self) -> Dict[str, Any]: return self.bulb.status() def is_on(self) -> bool: s = self.get_status() return s['dps']['1'] def is_off(self) -> bool: return not self.is_on() def get_dimmer_level(self) -> Optional[int]: s = self.get_status() return s['dps']['3'] def set_dimmer_level(self, level: int) -> bool: self.bulb.set_brightness(level) return True def make_color(self, color: str) -> bool: self.bulb.set_colour(255,0,0) return True class TPLinkLight(BaseLight): def __init__(self, name: str, mac: str, keywords: str = "") -> None: super().__init__(name, mac, keywords) self.children: List[str] = [] self.info: Optional[Dict] = None self.info_ts: Optional[datetime.datetime] = None if "children" in self.keywords: self.info = self.get_info() if self.info is not None: for child in self.info["children"]: self.children.append(child["id"]) @memoized def get_tplink_name(self) -> Optional[str]: self.info = self.get_info() if self.info is not None: return self.info["alias"] return None def get_cmdline(self, child: str = None) -> str: cmd = ( f"{config.config['smart_lights_tplink_location']} -m {self.mac} " f"--no_logging_console " ) if child is not None: cmd += f"-x {child} " return cmd def get_children(self) -> List[str]: return self.children def command( self, cmd: str, child: str = None, extra_args: str = None ) -> bool: cmd = self.get_cmdline(child) + f"-c {cmd}" if extra_args is not None: cmd += f" {extra_args}" logger.debug(f'About to execute {cmd}') return tplink_light_command(cmd) def turn_on(self, child: str = None) -> bool: return self.command("on", child) def turn_off(self, child: str = None) -> bool: return self.command("off", child) def is_on(self) -> bool: return self.get_on_duration_seconds() > 0 def is_off(self) -> bool: return not self.is_on() def make_color(self, color: str) -> bool: raise NotImplementedError @timeout( 10.0, use_signals=False, error_message="Timed out waiting for tplink.py" ) def get_info(self) -> Optional[Dict]: cmd = self.get_cmdline() + "-c info" out = subprocess.getoutput(cmd) out = re.sub("Sent:.*\n", "", out) out = re.sub("Received: *", "", out) try: self.info = json.loads(out)["system"]["get_sysinfo"] self.info_ts = datetime.datetime.now() return self.info except Exception as e: logger.exception(e) print(out, file=sys.stderr) self.info = None self.info_ts = None return None def get_on_duration_seconds(self, child: str = None) -> int: self.info = self.get_info() if child is None: if self.info is None: return 0 return int(self.info.get("on_time", "0")) else: if self.info is None: return 0 for chi in self.info.get("children", {}): if chi["id"] == child: return int(chi.get("on_time", "0")) return 0 def get_on_limit_seconds(self) -> Optional[int]: for kw in self.kws: m = re.search(r"timeout:(\d+)", kw) if m is not None: return int(m.group(1)) * 60 return None def get_dimmer_level(self) -> Optional[int]: if not self.has_keyword("dimmer"): return False self.info = self.get_info() if self.info is None: return None return int(self.info.get("brightness", "0")) def set_dimmer_level(self, level: int) -> bool: if not self.has_keyword("dimmer"): return False cmd = ( self.get_cmdline() + f'-j \'{{"smartlife.iot.dimmer":{{"set_brightness":{{"brightness":{level} }} }} }}\'' ) return tplink_light_command(cmd) # class GoogleLightGroup(GoogleLight): # def __init__(self, name: str, members: List[GoogleLight], keywords: str = "") -> None: # if len(members) < 1: # raise Exception("There must be at least one light in the group.") # self.members = members # mac = GoogleLightGroup.make_up_mac(members) # super().__init__(name, mac, keywords) # @staticmethod # def make_up_mac(members: List[GoogleLight]): # mac = members[0].get_mac() # b = mac.split(':') # b[5] = int(b[5], 16) + 1 # if b[5] > 255: # b[5] = 0 # b[5] = str(b[5]) # return ":".join(b) # def is_on(self) -> bool: # r = ask_google(f"are {self.goog_name()} on?") # if not r.success: # return False # return 'is on' in r.audio_transcription # def get_dimmer_level(self) -> Optional[int]: # if not self.has_keyword("dimmer"): # return False # r = ask_google(f'how bright are {self.goog_name()}?') # if not r.success: # return None # # four lights are set to 100% brightness # txt = r.audio_transcription # m = re.search(r"(\d+)% bright", txt) # if m is not None: # return int(m.group(1)) # if "is off" in txt: # return 0 # return None # def set_dimmer_level(self, level: int) -> bool: # if not self.has_keyword("dimmer"): # return False # if 0 <= level <= 100: # was_on = self.is_on() # r = ask_google(f"set {self.goog_name()} to {level} percent") # if not r.success: # return False # if not was_on: # self.turn_off() # return True # return False # def make_color(self, color: str) -> bool: # return GoogleLight.parse_google_response( # ask_google(f"make {self.goog_name()} {color}") # )