#!/usr/bin/env python3 """Utilities for dealing with the smart lights.""" from abc import ABC, abstractmethod import datetime import json import logging import os import re import subprocess import sys from typing import Any, Dict, List, Optional, Set import tinytuya as tt import argparse_utils import config import logging_utils import logical_search from google_assistant import ask_google, GoogleResponse from decorator_utils import timeout, memoized logger = logging.getLogger(__name__) parser = config.add_commandline_args( f"Light Utils ({__file__})", "Args related to light utilities." ) parser.add_argument( '--light_utils_tplink_location', default='/home/scott/bin/tplink.py', metavar='FILENAME', help='The location of the tplink.py helper', type=argparse_utils.valid_filename, ) parser.add_argument( '--light_utils_network_mac_addresses_location', default='/home/scott/bin/network_mac_addresses.txt', metavar='FILENAME', help='The location of network_mac_addresses.txt', type=argparse_utils.valid_filename, ) @timeout( 5.0, use_signals=False, error_message="Timed out waiting for tplink.py" ) def tplink_light_command(command: str) -> bool: result = os.system(command) signal = result & 0xFF if signal != 0: logger.warning(f'{command} died with signal {signal}') logging_utils.hlog("%s died with signal %d" % (command, signal)) return False else: exit_value = result >> 8 if exit_value != 0: logger.warning(f'{command} failed, exited {exit_value}') logging_utils.hlog("%s failed, exit %d" % (command, exit_value)) return False logger.debug(f'{command} succeeded.') return True class Light(ABC): def __init__(self, name: str, mac: str, keywords: str = "") -> None: self.name = name.strip() self.mac = mac.strip() self.keywords = keywords.strip() self.kws = keywords.split() def get_name(self) -> str: return self.name def get_mac(self) -> str: return self.mac @abstractmethod def turn_on(self) -> bool: pass @abstractmethod def turn_off(self) -> bool: pass @abstractmethod def is_on(self) -> bool: pass @abstractmethod def is_off(self) -> bool: pass @abstractmethod def get_dimmer_level(self) -> Optional[int]: pass @abstractmethod def set_dimmer_level(self, level: int) -> bool: pass @abstractmethod def make_color(self, color: str) -> bool: pass def get_keywords(self) -> List[str]: return self.kws def has_keyword(self, keyword: str) -> bool: for kw in self.kws: if kw == keyword: return True return False class GoogleLight(Light): def __init__(self, name: str, mac: str, keywords: str = "") -> None: super().__init__(name, mac, keywords) def goog_name(self) -> str: name = self.get_name() return name.replace("_", " ") @staticmethod def parse_google_response(response: GoogleResponse) -> bool: return response.success def turn_on(self) -> bool: return GoogleLight.parse_google_response( ask_google(f"turn {self.goog_name()} on") ) def turn_off(self) -> bool: return GoogleLight.parse_google_response( ask_google(f"turn {self.goog_name()} off") ) def is_on(self) -> bool: r = ask_google(f"is {self.goog_name()} on?") if not r.success: return False return 'is on' in r.audio_transcription def is_off(self) -> bool: return not self.is_on() def get_dimmer_level(self) -> Optional[int]: if not self.has_keyword("dimmer"): return False r = ask_google(f'how bright is {self.goog_name()}?') if not r.success: return None # the bookcase one is set to 40% bright txt = r.audio_transcription m = re.search(r"(\d+)% bright", txt) if m is not None: return int(m.group(1)) if "is off" in txt: return 0 return None def set_dimmer_level(self, level: int) -> bool: if not self.has_keyword("dimmer"): return False if 0 <= level <= 100: was_on = self.is_on() r = ask_google(f"set {self.goog_name()} to {level} percent") if not r.success: return False if not was_on: self.turn_off() return True return False def make_color(self, color: str) -> bool: return GoogleLight.parse_google_response( ask_google(f"make {self.goog_name()} {color}") ) class TuyaLight(Light): ids_by_mac = { '68:C6:3A:DE:1A:94': '8844664268c63ade1a94', '68:C6:3A:DE:27:1A': '8844664268c63ade271a', '68:C6:3A:DE:1D:95': '8844664268c63ade1d95', '68:C6:3A:DE:19:B3': '8844664268c63ade19b3', '80:7D:3A:77:3B:F5': '07445340807d3a773bf5', '80:7D:3A:58:37:02': '07445340807d3a583702', } keys_by_mac = { '68:C6:3A:DE:1A:94': '237f19b1b3d49c36', '68:C6:3A:DE:27:1A': '237f19b1b3d49c36', '68:C6:3A:DE:1D:95': '04b90fc5cd7625d8', '68:C6:3A:DE:19:B3': '2d601f2892f1aefd', '80:7D:3A:77:3B:F5': '27ab921fe4633519', '80:7D:3A:58:37:02': '8559b5416bfa0c05', } def __init__(self, name: str, mac: str, keywords: str = "") -> None: from subprocess import Popen, PIPE super().__init__(name, mac, keywords) mac = mac.upper() if mac not in TuyaLight.ids_by_mac or mac not in TuyaLight.keys_by_mac: raise Exception(f'{mac} is unknown; add it to ids_by_mac and keys_by_mac') self.devid = TuyaLight.ids_by_mac[mac] self.key = TuyaLight.keys_by_mac[mac] try: pid = Popen(['maclookup', mac], stdout=PIPE) ip = pid.communicate()[0] ip = ip[:-1] except Exception: ip = '0.0.0.0' self.bulb = tt.BulbDevice(self.devid, ip, local_key=self.key) def turn_on(self) -> bool: self.bulb.turn_on() return True def turn_off(self) -> bool: self.bulb.turn_off() return True def get_status(self) -> Dict[str, Any]: return self.bulb.status() def is_on(self) -> bool: s = self.get_status() return s['dps']['1'] def is_off(self) -> bool: return not self.is_on() def get_dimmer_level(self) -> Optional[int]: s = self.get_status() return s['dps']['3'] def set_dimmer_level(self, level: int) -> bool: self.bulb.set_brightness(level) return True def make_color(self, color: str) -> bool: self.bulb.set_colour(255,0,0) return True class TPLinkLight(Light): def __init__(self, name: str, mac: str, keywords: str = "") -> None: super().__init__(name, mac, keywords) self.children: List[str] = [] self.info: Optional[Dict] = None self.info_ts: Optional[datetime.datetime] = None if "children" in self.keywords: self.info = self.get_info() if self.info is not None: for child in self.info["children"]: self.children.append(child["id"]) @memoized def get_tplink_name(self) -> Optional[str]: self.info = self.get_info() if self.info is not None: return self.info["alias"] return None def get_cmdline(self, child: str = None) -> str: cmd = ( f"{config.config['light_utils_tplink_location']} -m {self.mac} " f"--no_logging_console " ) if child is not None: cmd += f"-x {child} " return cmd def get_children(self) -> List[str]: return self.children def command( self, cmd: str, child: str = None, extra_args: str = None ) -> bool: cmd = self.get_cmdline(child) + f"-c {cmd}" if extra_args is not None: cmd += f" {extra_args}" logger.debug(f'About to execute {cmd}') return tplink_light_command(cmd) def turn_on(self, child: str = None) -> bool: return self.command("on", child) def turn_off(self, child: str = None) -> bool: return self.command("off", child) def is_on(self) -> bool: return self.get_on_duration_seconds() > 0 def is_off(self) -> bool: return not self.is_on() def make_color(self, color: str) -> bool: raise NotImplementedError @timeout( 10.0, use_signals=False, error_message="Timed out waiting for tplink.py" ) def get_info(self) -> Optional[Dict]: cmd = self.get_cmdline() + "-c info" out = subprocess.getoutput(cmd) out = re.sub("Sent:.*\n", "", out) out = re.sub("Received: *", "", out) try: self.info = json.loads(out)["system"]["get_sysinfo"] self.info_ts = datetime.datetime.now() return self.info except Exception as e: logger.exception(e) print(out, file=sys.stderr) self.info = None self.info_ts = None return None def get_on_duration_seconds(self, child: str = None) -> int: self.info = self.get_info() if child is None: if self.info is None: return 0 return int(self.info.get("on_time", "0")) else: if self.info is None: return 0 for chi in self.info.get("children", {}): if chi["id"] == child: return int(chi.get("on_time", "0")) return 0 def get_on_limit_seconds(self) -> Optional[int]: for kw in self.kws: m = re.search(r"timeout:(\d+)", kw) if m is not None: return int(m.group(1)) * 60 return None def get_dimmer_level(self) -> Optional[int]: if not self.has_keyword("dimmer"): return False self.info = self.get_info() if self.info is None: return None return int(self.info.get("brightness", "0")) def set_dimmer_level(self, level: int) -> bool: if not self.has_keyword("dimmer"): return False cmd = ( self.get_cmdline() + f'-j \'{{"smartlife.iot.dimmer":{{"set_brightness":{{"brightness":{level} }} }} }}\'' ) return tplink_light_command(cmd) class GoogleLightGroup(GoogleLight): def __init__(self, name: str, members: List[GoogleLight], keywords: str = "") -> None: if len(members) < 1: raise Exception("There must be at least one light in the group.") self.members = members mac = GoogleLightGroup.make_up_mac(members) super().__init__(name, mac, keywords) @staticmethod def make_up_mac(members: List[GoogleLight]): mac = members[0].get_mac() b = mac.split(':') b[5] = int(b[5], 16) + 1 if b[5] > 255: b[5] = 0 b[5] = str(b[5]) return ":".join(b) def is_on(self) -> bool: r = ask_google(f"are {self.goog_name()} on?") if not r.success: return False return 'is on' in r.audio_transcription def get_dimmer_level(self) -> Optional[int]: if not self.has_keyword("dimmer"): return False r = ask_google(f'how bright are {self.goog_name()}?') if not r.success: return None # four lights are set to 100% brightness txt = r.audio_transcription m = re.search(r"(\d+)% bright", txt) if m is not None: return int(m.group(1)) if "is off" in txt: return 0 return None def set_dimmer_level(self, level: int) -> bool: if not self.has_keyword("dimmer"): return False if 0 <= level <= 100: was_on = self.is_on() r = ask_google(f"set {self.goog_name()} to {level} percent") if not r.success: return False if not was_on: self.turn_off() return True return False def make_color(self, color: str) -> bool: return GoogleLight.parse_google_response( ask_google(f"make {self.goog_name()} {color}") ) class LightingConfig(object): """Representation of the smart light device config.""" def __init__( self, config_file: str = None, ) -> None: if config_file is None: config_file = config.config[ 'light_utils_network_mac_addresses_location' ] self.macs_by_name = {} self._keywords_by_name = {} self.keywords_by_mac = {} self.names_by_mac = {} self.corpus = logical_search.Corpus() with open(config_file, "r") as f: contents = f.readlines() diningroom_lights = [] bookcase_lights = [] for line in contents: line = line.rstrip("\n") line = re.sub(r"#.*$", r"", line) line = line.strip() if line == "": continue (mac, name, keywords) = line.split(",") mac = mac.strip() name = name.strip() keywords = keywords.strip() if "perm" not in keywords: continue self.macs_by_name[name] = mac self._keywords_by_name[name] = keywords self.keywords_by_mac[mac] = keywords self.names_by_mac[mac] = name # if "bookcase_light_" in name: # bookcase_lights.append(mac) # elif "diningroom_light_" in name: # diningroom_lights.append(mac) # else: self.index_light(name, keywords, mac) # name = 'bookcase_lights' # group = [] # keywords = 'perm wifi light smart goog dimmer' # for b in bookcase_lights: # group.append(self.get_light_by_mac(b)) # self.bookcase_group = GoogleLightGroup( # name, # group, # keywords, # ) # mac = self.bookcase_group.get_mac() # self.macs_by_name[name] = mac # self._keywords_by_name[name] = keywords # self.keywords_by_mac[mac] = keywords # self.names_by_mac[mac] = name # self.index_light(name, keywords, mac) # name = 'dining_room_lights' # group = [] # for b in diningroom_lights: # group.append(self.get_light_by_mac(b)) # self.diningroom_group = GoogleLightGroup( # name, # group, # keywords, # ) # mac = self.diningroom_group.get_mac() # self.macs_by_name[name] = mac # self._keywords_by_name[name] = keywords # self.keywords_by_mac[mac] = keywords # self.names_by_mac[mac] = name # self.index_light(name, keywords, mac) def index_light(self, name: str, keywords: str, mac: str) -> None: properties = [("name", name)] tags = set() for kw in keywords.split(): if ":" in kw: key, value = kw.split(":") properties.append((key, value)) else: tags.add(kw) light = logical_search.Document( docid=mac, tags=tags, properties=properties, reference=None, ) self.corpus.add_doc(light) def __repr__(self) -> str: s = "Known devices:\n" for name, keywords in self._keywords_by_name.items(): mac = self.macs_by_name[name] s += f" {name} ({mac}) => {keywords}\n" return s def get_keywords_by_name(self, name: str) -> Optional[str]: return self._keywords_by_name.get(name, None) def get_macs_by_name(self, name: str) -> Set[str]: retval = set() for (mac, lname) in self.names_by_mac.items(): if name in lname: retval.add(mac) return retval def get_macs_by_keyword(self, keyword: str) -> Set[str]: retval = set() for (mac, keywords) in self.keywords_by_mac.items(): if keyword in keywords: retval.add(mac) return retval def get_light_by_name(self, name: str) -> Optional[Light]: if name in self.macs_by_name: return self.get_light_by_mac(self.macs_by_name[name]) return None def get_all_lights(self) -> List[Light]: retval = [] for (mac, kws) in self.keywords_by_mac.items(): if mac is not None: light = self.get_light_by_mac(mac) if light is not None: retval.append(light) return retval def get_light_by_mac(self, mac: str) -> Optional[Light]: if mac in self.keywords_by_mac: name = self.names_by_mac[mac] kws = self.keywords_by_mac[mac] if name == 'bookcase_lights': return self.bookcase_group elif name == 'dining_room_lights': return self.diningroom_group elif 'tplink' in kws.lower(): return TPLinkLight(name, mac, kws) elif 'tuya' in kws.lower(): return TuyaLight(name, mac, kws) else: return GoogleLight(name, mac, kws) return None def query(self, query: str) -> List[Light]: """Evaluates a lighting query expression formed of keywords to search for, logical operators (and, or, not), and parenthesis. Returns a list of matching lights. """ retval = [] results = self.corpus.query(query) if results is not None: for mac in results: if mac is not None: light = self.get_light_by_mac(mac) if light is not None: retval.append(light) return retval