#!/usr/bin/env python3 """Utilities for dealing with the smart lights.""" from abc import ABC, abstractmethod import datetime import json import logging import os import re import subprocess import sys from typing import Dict, List, Optional, Set import argparse_utils import config import logging_utils from google_assistant import ask_google, GoogleResponse from decorator_utils import timeout, memoized logger = logging.getLogger(__name__) parser = config.add_commandline_args( f"Light Utils ({__file__})", "Args related to light utilities." ) parser.add_argument( '--light_utils_tplink_location', default='/home/scott/bin/tplink.py', metavar='FILENAME', help='The location of the tplink.py helper', type=argparse_utils.valid_filename, ) parser.add_argument( '--light_utils_network_mac_addresses_location', default='/home/scott/bin/network_mac_addresses.txt', metavar='FILENAME', help='The location of network_mac_addresses.txt', type=argparse_utils.valid_filename, ) @timeout( 5.0, use_signals=False, error_message="Timed out waiting for tplink.py" ) def tplink_light_command(command: str) -> bool: result = os.system(command) signal = result & 0xFF if signal != 0: logging_utils.hlog("%s died with signal %d" % (command, signal)) return False else: exit_value = result >> 8 if exit_value != 0: logging_utils.hlog("%s failed, exit %d" % (command, exit_value)) return False return True class Light(ABC): def __init__(self, name: str, mac: str, keywords: str = "") -> None: self.name = name.strip() self.mac = mac.strip() self.keywords = keywords.strip() self.kws = keywords.split() def get_name(self) -> str: return self.name def get_mac(self) -> str: return self.mac @abstractmethod def turn_on(self) -> bool: pass @abstractmethod def turn_off(self) -> bool: pass @abstractmethod def is_on(self) -> bool: pass @abstractmethod def is_off(self) -> bool: pass @abstractmethod def get_dimmer_level(self) -> Optional[int]: pass @abstractmethod def set_dimmer_level(self, level: int) -> bool: pass @abstractmethod def make_color(self, color: str) -> bool: pass def get_keywords(self) -> List[str]: return self.kws def has_keyword(self, keyword: str) -> bool: for kw in self.kws: if kw == keyword: return True return False class GoogleLight(Light): def __init__(self, name: str, mac: str, keywords: str = "") -> None: super().__init__(name, mac, keywords) def goog_name(self) -> str: name = self.get_name() return name.replace("_", " ") @staticmethod def parse_google_response(response: GoogleResponse) -> bool: return response.success def turn_on(self) -> bool: return GoogleLight.parse_google_response( ask_google(f"turn {self.goog_name()} on") ) def turn_off(self) -> bool: return GoogleLight.parse_google_response( ask_google(f"turn {self.goog_name()} off") ) def is_on(self) -> bool: r = ask_google(f"is {self.goog_name()} on?") if not r.success: return False return 'is on' in r.audio_transcription def is_off(self) -> bool: return not self.is_on() def get_dimmer_level(self) -> Optional[int]: if not self.has_keyword("dimmer"): return False r = ask_google(f'how bright is {self.goog_name()}?') if not r.success: return None # the bookcase one is set to 40% bright txt = r.audio_transcription m = re.search(r"(\d+)% bright", txt) if m is not None: return int(m.group(1)) if "is off" in txt: return 0 return None def set_dimmer_level(self, level: int) -> bool: if not self.has_keyword("dimmer"): return False if 0 <= level <= 100: was_on = self.is_on() r = ask_google(f"set {self.goog_name()} to {level} percent") if not r.success: return False if not was_on: self.turn_off() return True return False def make_color(self, color: str) -> bool: return GoogleLight.parse_google_response( ask_google(f"make {self.goog_name()} {color}") ) class TPLinkLight(Light): def __init__(self, name: str, mac: str, keywords: str = "") -> None: super().__init__(name, mac, keywords) self.children: List[str] = [] self.info: Optional[Dict] = None self.info_ts: Optional[datetime.datetime] = None if "children" in self.keywords: self.info = self.get_info() if self.info is not None: for child in self.info["children"]: self.children.append(child["id"]) @memoized def get_tplink_name(self) -> Optional[str]: self.info = self.get_info() if self.info is not None: return self.info["alias"] return None def get_cmdline(self, child: str = None) -> str: cmd = ( f"{config.config['light_utils_tplink_location']} -m {self.mac} " f"--no_logging_console " ) if child is not None: cmd += f"-x {child} " return cmd def get_children(self) -> List[str]: return self.children def command( self, cmd: str, child: str = None, extra_args: str = None ) -> bool: cmd = self.get_cmdline(child) + f"-c {cmd}" if extra_args is not None: cmd += f" {extra_args}" return tplink_light_command(cmd) def turn_on(self, child: str = None) -> bool: return self.command("on", child) def turn_off(self, child: str = None) -> bool: return self.command("off", child) def is_on(self) -> bool: return self.get_on_duration_seconds() > 0 def is_off(self) -> bool: return not self.is_on() def make_color(self, color: str) -> bool: raise NotImplementedError @timeout( 10.0, use_signals=False, error_message="Timed out waiting for tplink.py" ) def get_info(self) -> Optional[Dict]: cmd = self.get_cmdline() + "-c info" out = subprocess.getoutput(cmd) out = re.sub("Sent:.*\n", "", out) out = re.sub("Received: *", "", out) try: self.info = json.loads(out)["system"]["get_sysinfo"] self.info_ts = datetime.datetime.now() return self.info except Exception as e: logger.exception(e) print(out, file=sys.stderr) self.info = None self.info_ts = None return None def get_on_duration_seconds(self, child: str = None) -> int: self.info = self.get_info() if child is None: if self.info is None: return 0 return int(self.info.get("on_time", "0")) else: if self.info is None: return 0 for chi in self.info.get("children", {}): if chi["id"] == child: return int(chi.get("on_time", "0")) return 0 def get_on_limit_seconds(self) -> Optional[int]: for kw in self.kws: m = re.search(r"timeout:(\d+)", kw) if m is not None: return int(m.group(1)) * 60 return None def get_dimmer_level(self) -> Optional[int]: if not self.has_keyword("dimmer"): return False self.info = self.get_info() if self.info is None: return None return int(self.info.get("brightness", "0")) def set_dimmer_level(self, level: int) -> bool: if not self.has_keyword("dimmer"): return False cmd = ( self.get_cmdline() + f'-j \'{{"smartlife.iot.dimmer":{{"set_brightness":{{"brightness":{level} }} }} }}\'' ) return tplink_light_command(cmd) class LightingConfig(object): """Representation of the smart light device config.""" def __init__( self, config_file: str = None, ) -> None: import logical_search if config_file is None: config_file = config.config[ 'light_utils_network_mac_addresses_location' ] self.macs_by_name = {} self._keywords_by_name = {} self.keywords_by_mac = {} self.names_by_mac = {} self.corpus = logical_search.Corpus() with open(config_file, "r") as f: contents = f.readlines() for line in contents: line = line.rstrip("\n") line = re.sub(r"#.*$", r"", line) line = line.strip() if line == "": continue (mac, name, keywords) = line.split(",") mac = mac.strip() name = name.strip() keywords = keywords.strip() if "perm" not in keywords: continue properties = [("name", name)] tags = set() for kw in keywords.split(): if ":" in kw: key, value = kw.split(":") properties.append((key, value)) else: tags.add(kw) properties.append(("name", name)) self.macs_by_name[name] = mac self._keywords_by_name[name] = keywords self.keywords_by_mac[mac] = keywords self.names_by_mac[mac] = name self.corpus.add_doc( logical_search.Document( docid=mac, tags=tags, properties=properties, reference=None, ) ) def __repr__(self) -> str: s = "Known devices:\n" for name, keywords in self._keywords_by_name.items(): mac = self.macs_by_name[name] s += f" {name} ({mac}) => {keywords}\n" return s def get_keywords_by_name(self, name: str) -> Optional[str]: return self._keywords_by_name.get(name, None) def get_macs_by_name(self, name: str) -> Set[str]: retval = set() for (mac, lname) in self.names_by_mac.items(): if name in lname: retval.add(mac) return retval def get_macs_by_keyword(self, keyword: str) -> Set[str]: retval = set() for (mac, keywords) in self.keywords_by_mac.items(): if keyword in keywords: retval.add(mac) return retval def get_light_by_name(self, name: str) -> Optional[Light]: if name in self.macs_by_name: return self.get_light_by_mac(self.macs_by_name[name]) return None def get_all_lights(self) -> List[Light]: retval = [] for (mac, kws) in self.keywords_by_mac.items(): if mac is not None: light = self.get_light_by_mac(mac) if light is not None: retval.append(light) return retval def get_light_by_mac(self, mac: str) -> Optional[Light]: if mac in self.keywords_by_mac: name = self.names_by_mac[mac] kws = self.keywords_by_mac[mac] if "tplink" in kws.lower(): return TPLinkLight(name, mac, kws) else: return GoogleLight(name, mac, kws) return None def query(self, query: str) -> List[Light]: """Evaluates a lighting query expression formed of keywords to search for, logical operators (and, or, not), and parenthesis. Returns a list of matching lights. """ retval = [] results = self.corpus.query(query) if results is not None: for mac in results: if mac is not None: light = self.get_light_by_mac(mac) if light is not None: retval.append(light) return retval