X-Git-Url: https://wannabe.guru.org/gitweb/?a=blobdiff_plain;f=smart_home%2Flights.py;h=ee23fb08f4e60ef4664338cde7e53bdefd340544;hb=532df2c5b57c7517dfb3dddd8c1358fbadf8baf3;hp=6ca6e71db2329a65402116a5cf769048023521f6;hpb=7e6972bc7c8e891dc669645fa5969ed76fe38314;p=python_utils.git diff --git a/smart_home/lights.py b/smart_home/lights.py index 6ca6e71..ee23fb0 100644 --- a/smart_home/lights.py +++ b/smart_home/lights.py @@ -1,80 +1,65 @@ #!/usr/bin/env python3 +# © Copyright 2021-2022, Scott Gasch + """Utilities for dealing with the smart lights.""" -from abc import ABC, abstractmethod import datetime -import json import logging -import os import re -import subprocess -import sys -from typing import Any, Dict, List, Optional, Set +from abc import abstractmethod +from typing import Any, Dict, List, Optional, Tuple import tinytuya as tt +from overrides import overrides +import ansi import argparse_utils +import arper import config -import logging_utils -import logical_search -from google_assistant import ask_google, GoogleResponse -from decorator_utils import timeout, memoized +import smart_home.device as dev +import smart_home.tplink_utils as tplink +from decorator_utils import memoized +from google_assistant import GoogleResponse, ask_google logger = logging.getLogger(__name__) -parser = config.add_commandline_args( - f"Light Utils ({__file__})", - "Args related to light utilities." +args = config.add_commandline_args( + f"Smart Lights ({__file__})", + "Args related to smart lights.", ) -parser.add_argument( - '--light_utils_tplink_location', +args.add_argument( + '--smart_lights_tplink_location', default='/home/scott/bin/tplink.py', metavar='FILENAME', help='The location of the tplink.py helper', type=argparse_utils.valid_filename, ) -parser.add_argument( - '--light_utils_network_mac_addresses_location', - default='/home/scott/bin/network_mac_addresses.txt', - metavar='FILENAME', - help='The location of network_mac_addresses.txt', - type=argparse_utils.valid_filename, -) - -@timeout( - 5.0, use_signals=False, error_message="Timed out waiting for tplink.py" -) -def tplink_light_command(command: str) -> bool: - result = os.system(command) - signal = result & 0xFF - if signal != 0: - logger.warning(f'{command} died with signal {signal}') - logging_utils.hlog("%s died with signal %d" % (command, signal)) - return False - else: - exit_value = result >> 8 - if exit_value != 0: - logger.warning(f'{command} failed, exited {exit_value}') - logging_utils.hlog("%s failed, exit %d" % (command, exit_value)) - return False - logger.debug(f'{command} succeeded.') - return True +class BaseLight(dev.Device): + """A base class representing a smart light.""" -class Light(ABC): def __init__(self, name: str, mac: str, keywords: str = "") -> None: - self.name = name.strip() - self.mac = mac.strip() - self.keywords = keywords.strip() - self.kws = keywords.split() + super().__init__(name.strip(), mac.strip(), keywords) - def get_name(self) -> str: - return self.name + @staticmethod + def parse_color_string(color: str) -> Optional[Tuple[int, int, int]]: + m = re.match( + 'r#?([0-9a-fA-F][0-9a-fA-F])([0-9a-fA-F][0-9a-fA-F])([0-9a-fA-F][0-9a-fA-F])', + color, + ) + if m is not None and len(m.groups()) == 3: + red = int(m.group(0), 16) + green = int(m.group(1), 16) + blue = int(m.group(2), 16) + return (red, green, blue) + color = color.lower() + return ansi.COLOR_NAMES_TO_RGB.get(color, None) - def get_mac(self) -> str: - return self.mac + @abstractmethod + def status(self) -> str: + pass @abstractmethod def turn_on(self) -> bool: @@ -104,19 +89,9 @@ class Light(ABC): def make_color(self, color: str) -> bool: pass - def get_keywords(self) -> List[str]: - return self.kws - def has_keyword(self, keyword: str) -> bool: - for kw in self.kws: - if kw == keyword: - return True - return False - - -class GoogleLight(Light): - def __init__(self, name: str, mac: str, keywords: str = "") -> None: - super().__init__(name, mac, keywords) +class GoogleLight(BaseLight): + """A smart light controlled by talking to Google.""" def goog_name(self) -> str: name = self.get_name() @@ -126,25 +101,34 @@ class GoogleLight(Light): def parse_google_response(response: GoogleResponse) -> bool: return response.success + @overrides def turn_on(self) -> bool: - return GoogleLight.parse_google_response( - ask_google(f"turn {self.goog_name()} on") - ) + return GoogleLight.parse_google_response(ask_google(f"turn {self.goog_name()} on")) + @overrides def turn_off(self) -> bool: - return GoogleLight.parse_google_response( - ask_google(f"turn {self.goog_name()} off") - ) + return GoogleLight.parse_google_response(ask_google(f"turn {self.goog_name()} off")) + + @overrides + def status(self) -> str: + if self.is_on(): + return 'ON' + return 'off' + @overrides def is_on(self) -> bool: r = ask_google(f"is {self.goog_name()} on?") if not r.success: return False - return 'is on' in r.audio_transcription + if r.audio_transcription is not None: + return 'is on' in r.audio_transcription + raise Exception("Can't reach Google?!") + @overrides def is_off(self) -> bool: return not self.is_on() + @overrides def get_dimmer_level(self) -> Optional[int]: if not self.has_keyword("dimmer"): return False @@ -154,13 +138,15 @@ class GoogleLight(Light): # the bookcase one is set to 40% bright txt = r.audio_transcription - m = re.search(r"(\d+)% bright", txt) - if m is not None: - return int(m.group(1)) - if "is off" in txt: - return 0 + if txt is not None: + m = re.search(r"(\d+)% bright", txt) + if m is not None: + return int(m.group(1)) + if "is off" in txt: + return 0 return None + @overrides def set_dimmer_level(self, level: int) -> bool: if not self.has_keyword("dimmer"): return False @@ -174,13 +160,14 @@ class GoogleLight(Light): return True return False + @overrides def make_color(self, color: str) -> bool: - return GoogleLight.parse_google_response( - ask_google(f"make {self.goog_name()} {color}") - ) + return GoogleLight.parse_google_response(ask_google(f"make {self.goog_name()} {color}")) + +class TuyaLight(BaseLight): + """A Tuya smart light.""" -class TuyaLight(Light): ids_by_mac = { '68:C6:3A:DE:1A:94': '8844664268c63ade1a94', '68:C6:3A:DE:27:1A': '8844664268c63ade271a', @@ -199,63 +186,80 @@ class TuyaLight(Light): } def __init__(self, name: str, mac: str, keywords: str = "") -> None: - from subprocess import Popen, PIPE super().__init__(name, mac, keywords) mac = mac.upper() if mac not in TuyaLight.ids_by_mac or mac not in TuyaLight.keys_by_mac: raise Exception(f'{mac} is unknown; add it to ids_by_mac and keys_by_mac') self.devid = TuyaLight.ids_by_mac[mac] self.key = TuyaLight.keys_by_mac[mac] - try: - pid = Popen(['maclookup', mac], stdout=PIPE) - ip = pid.communicate()[0] - ip = ip[:-1] - except Exception: - ip = '0.0.0.0' + self.arper = arper.Arper() + ip = self.get_ip() self.bulb = tt.BulbDevice(self.devid, ip, local_key=self.key) + def get_status(self) -> Dict[str, Any]: + return self.bulb.status() + + @overrides + def status(self) -> str: + ret = '' + for k, v in self.bulb.status().items(): + ret += f'{k} = {v}\n' + return ret + + @overrides def turn_on(self) -> bool: self.bulb.turn_on() return True + @overrides def turn_off(self) -> bool: self.bulb.turn_off() return True - def get_status(self) -> Dict[str, Any]: - return self.bulb.status() - + @overrides def is_on(self) -> bool: s = self.get_status() return s['dps']['1'] + @overrides def is_off(self) -> bool: return not self.is_on() + @overrides def get_dimmer_level(self) -> Optional[int]: s = self.get_status() return s['dps']['3'] + @overrides def set_dimmer_level(self, level: int) -> bool: + logger.debug('Setting brightness to %d', level) self.bulb.set_brightness(level) return True + @overrides def make_color(self, color: str) -> bool: - self.bulb.set_colour(255,0,0) - return True + rgb = BaseLight.parse_color_string(color) + logger.debug('Light color: %s -> %s', color, rgb) + if rgb is not None: + self.bulb.set_colour(rgb[0], rgb[1], rgb[2]) + return True + return False + +class TPLinkLight(BaseLight): + """A TPLink smart light.""" -class TPLinkLight(Light): def __init__(self, name: str, mac: str, keywords: str = "") -> None: super().__init__(name, mac, keywords) self.children: List[str] = [] self.info: Optional[Dict] = None self.info_ts: Optional[datetime.datetime] = None - if "children" in self.keywords: - self.info = self.get_info() - if self.info is not None: - for child in self.info["children"]: - self.children.append(child["id"]) + if self.keywords is not None: + if "children" in self.keywords: + self.info = self.get_info() + if self.info is not None: + for child in self.info["children"]: + self.children.append(child["id"]) @memoized def get_tplink_name(self) -> Optional[str]: @@ -266,7 +270,7 @@ class TPLinkLight(Light): def get_cmdline(self, child: str = None) -> str: cmd = ( - f"{config.config['light_utils_tplink_location']} -m {self.mac} " + f"{config.config['smart_lights_tplink_location']} -m {self.mac} " f"--no_logging_console " ) if child is not None: @@ -276,48 +280,61 @@ class TPLinkLight(Light): def get_children(self) -> List[str]: return self.children - def command( - self, cmd: str, child: str = None, extra_args: str = None - ) -> bool: + def command(self, cmd: str, child: str = None, extra_args: str = None) -> bool: cmd = self.get_cmdline(child) + f"-c {cmd}" if extra_args is not None: cmd += f" {extra_args}" - logger.debug(f'About to execute {cmd}') - return tplink_light_command(cmd) + logger.debug('About to execute: %s', cmd) + return tplink.tplink_command_wrapper(cmd) - def turn_on(self, child: str = None) -> bool: + @overrides + def turn_on(self) -> bool: + return self.command("on", None) + + @overrides + def turn_off(self) -> bool: + return self.command("off", None) + + def turn_on_child(self, child: str = None) -> bool: return self.command("on", child) - def turn_off(self, child: str = None) -> bool: + def turn_off_child(self, child: str = None) -> bool: return self.command("off", child) + @overrides def is_on(self) -> bool: - return self.get_on_duration_seconds() > 0 + self.info = self.get_info() + if self.info is None: + raise Exception('Unable to get info?') + return self.info.get("relay_state", 0) == 1 + @overrides def is_off(self) -> bool: return not self.is_on() + @overrides def make_color(self, color: str) -> bool: raise NotImplementedError - @timeout( - 10.0, use_signals=False, error_message="Timed out waiting for tplink.py" - ) def get_info(self) -> Optional[Dict]: - cmd = self.get_cmdline() + "-c info" - out = subprocess.getoutput(cmd) - out = re.sub("Sent:.*\n", "", out) - out = re.sub("Received: *", "", out) - try: - self.info = json.loads(out)["system"]["get_sysinfo"] - self.info_ts = datetime.datetime.now() + ip = self.get_ip() + if ip is not None: + self.info = tplink.tplink_get_info(ip) + if self.info is not None: + self.info_ts = datetime.datetime.now() + else: + self.info_ts = None return self.info - except Exception as e: - logger.exception(e) - print(out, file=sys.stderr) - self.info = None - self.info_ts = None - return None + return None + + @overrides + def status(self) -> str: + ret = '' + info = self.get_info() + if info is not None: + for k, v in info: + ret += f'{k} = {v}\n' + return ret def get_on_duration_seconds(self, child: str = None) -> int: self.info = self.get_info() @@ -333,13 +350,7 @@ class TPLinkLight(Light): return int(chi.get("on_time", "0")) return 0 - def get_on_limit_seconds(self) -> Optional[int]: - for kw in self.kws: - m = re.search(r"timeout:(\d+)", kw) - if m is not None: - return int(m.group(1)) * 60 - return None - + @overrides def get_dimmer_level(self) -> Optional[int]: if not self.has_keyword("dimmer"): return False @@ -348,235 +359,71 @@ class TPLinkLight(Light): return None return int(self.info.get("brightness", "0")) + @overrides def set_dimmer_level(self, level: int) -> bool: if not self.has_keyword("dimmer"): return False cmd = ( self.get_cmdline() - + f'-j \'{{"smartlife.iot.dimmer":{{"set_brightness":{{"brightness":{level} }} }} }}\'' - ) - return tplink_light_command(cmd) - - -class GoogleLightGroup(GoogleLight): - def __init__(self, name: str, members: List[GoogleLight], keywords: str = "") -> None: - if len(members) < 1: - raise Exception("There must be at least one light in the group.") - self.members = members - mac = GoogleLightGroup.make_up_mac(members) - super().__init__(name, mac, keywords) - - @staticmethod - def make_up_mac(members: List[GoogleLight]): - mac = members[0].get_mac() - b = mac.split(':') - b[5] = int(b[5], 16) + 1 - if b[5] > 255: - b[5] = 0 - b[5] = str(b[5]) - return ":".join(b) - - def is_on(self) -> bool: - r = ask_google(f"are {self.goog_name()} on?") - if not r.success: - return False - return 'is on' in r.audio_transcription - - def get_dimmer_level(self) -> Optional[int]: - if not self.has_keyword("dimmer"): - return False - r = ask_google(f'how bright are {self.goog_name()}?') - if not r.success: - return None - - # four lights are set to 100% brightness - txt = r.audio_transcription - m = re.search(r"(\d+)% bright", txt) - if m is not None: - return int(m.group(1)) - if "is off" in txt: - return 0 - return None - - def set_dimmer_level(self, level: int) -> bool: - if not self.has_keyword("dimmer"): - return False - if 0 <= level <= 100: - was_on = self.is_on() - r = ask_google(f"set {self.goog_name()} to {level} percent") - if not r.success: - return False - if not was_on: - self.turn_off() - return True - return False - - def make_color(self, color: str) -> bool: - return GoogleLight.parse_google_response( - ask_google(f"make {self.goog_name()} {color}") - ) - - -class LightingConfig(object): - """Representation of the smart light device config.""" - - def __init__( - self, - config_file: str = None, - ) -> None: - if config_file is None: - config_file = config.config[ - 'light_utils_network_mac_addresses_location' - ] - self.macs_by_name = {} - self._keywords_by_name = {} - self.keywords_by_mac = {} - self.names_by_mac = {} - self.corpus = logical_search.Corpus() - with open(config_file, "r") as f: - contents = f.readlines() - - diningroom_lights = [] - bookcase_lights = [] - for line in contents: - line = line.rstrip("\n") - line = re.sub(r"#.*$", r"", line) - line = line.strip() - if line == "": - continue - (mac, name, keywords) = line.split(",") - mac = mac.strip() - name = name.strip() - keywords = keywords.strip() - if "perm" not in keywords: - continue - self.macs_by_name[name] = mac - self._keywords_by_name[name] = keywords - self.keywords_by_mac[mac] = keywords - self.names_by_mac[mac] = name - -# if "bookcase_light_" in name: -# bookcase_lights.append(mac) -# elif "diningroom_light_" in name: -# diningroom_lights.append(mac) -# else: - self.index_light(name, keywords, mac) - - # name = 'bookcase_lights' - # group = [] - # keywords = 'perm wifi light smart goog dimmer' - # for b in bookcase_lights: - # group.append(self.get_light_by_mac(b)) - # self.bookcase_group = GoogleLightGroup( - # name, - # group, - # keywords, - # ) - # mac = self.bookcase_group.get_mac() - # self.macs_by_name[name] = mac - # self._keywords_by_name[name] = keywords - # self.keywords_by_mac[mac] = keywords - # self.names_by_mac[mac] = name - # self.index_light(name, keywords, mac) - - # name = 'dining_room_lights' - # group = [] - # for b in diningroom_lights: - # group.append(self.get_light_by_mac(b)) - # self.diningroom_group = GoogleLightGroup( - # name, - # group, - # keywords, - # ) - # mac = self.diningroom_group.get_mac() - # self.macs_by_name[name] = mac - # self._keywords_by_name[name] = keywords - # self.keywords_by_mac[mac] = keywords - # self.names_by_mac[mac] = name - # self.index_light(name, keywords, mac) - - def index_light(self, name: str, keywords: str, mac: str) -> None: - properties = [("name", name)] - tags = set() - for kw in keywords.split(): - if ":" in kw: - key, value = kw.split(":") - properties.append((key, value)) - else: - tags.add(kw) - light = logical_search.Document( - docid=mac, - tags=tags, - properties=properties, - reference=None, + + '-j \'{"smartlife.iot.dimmer":{"set_brightness":{"brightness":%d}}}\'' % level ) - self.corpus.add_doc(light) - - def __repr__(self) -> str: - s = "Known devices:\n" - for name, keywords in self._keywords_by_name.items(): - mac = self.macs_by_name[name] - s += f" {name} ({mac}) => {keywords}\n" - return s - - def get_keywords_by_name(self, name: str) -> Optional[str]: - return self._keywords_by_name.get(name, None) - - def get_macs_by_name(self, name: str) -> Set[str]: - retval = set() - for (mac, lname) in self.names_by_mac.items(): - if name in lname: - retval.add(mac) - return retval - - def get_macs_by_keyword(self, keyword: str) -> Set[str]: - retval = set() - for (mac, keywords) in self.keywords_by_mac.items(): - if keyword in keywords: - retval.add(mac) - return retval - - def get_light_by_name(self, name: str) -> Optional[Light]: - if name in self.macs_by_name: - return self.get_light_by_mac(self.macs_by_name[name]) - return None - - def get_all_lights(self) -> List[Light]: - retval = [] - for (mac, kws) in self.keywords_by_mac.items(): - if mac is not None: - light = self.get_light_by_mac(mac) - if light is not None: - retval.append(light) - return retval - - def get_light_by_mac(self, mac: str) -> Optional[Light]: - if mac in self.keywords_by_mac: - name = self.names_by_mac[mac] - kws = self.keywords_by_mac[mac] - if name == 'bookcase_lights': - return self.bookcase_group - elif name == 'dining_room_lights': - return self.diningroom_group - elif 'tplink' in kws.lower(): - return TPLinkLight(name, mac, kws) - elif 'tuya' in kws.lower(): - return TuyaLight(name, mac, kws) - else: - return GoogleLight(name, mac, kws) - return None - - def query(self, query: str) -> List[Light]: - """Evaluates a lighting query expression formed of keywords to search - for, logical operators (and, or, not), and parenthesis. - Returns a list of matching lights. - """ - retval = [] - results = self.corpus.query(query) - if results is not None: - for mac in results: - if mac is not None: - light = self.get_light_by_mac(mac) - if light is not None: - retval.append(light) - return retval + return tplink.tplink_command_wrapper(cmd) + + +# class GoogleLightGroup(GoogleLight): +# def __init__(self, name: str, members: List[GoogleLight], keywords: str = "") -> None: +# if len(members) < 1: +# raise Exception("There must be at least one light in the group.") +# self.members = members +# mac = GoogleLightGroup.make_up_mac(members) +# super().__init__(name, mac, keywords) + +# @staticmethod +# def make_up_mac(members: List[GoogleLight]): +# mac = members[0].get_mac() +# b = mac.split(':') +# b[5] = int(b[5], 16) + 1 +# if b[5] > 255: +# b[5] = 0 +# b[5] = str(b[5]) +# return ":".join(b) + +# def is_on(self) -> bool: +# r = ask_google(f"are {self.goog_name()} on?") +# if not r.success: +# return False +# return 'is on' in r.audio_transcription + +# def get_dimmer_level(self) -> Optional[int]: +# if not self.has_keyword("dimmer"): +# return False +# r = ask_google(f'how bright are {self.goog_name()}?') +# if not r.success: +# return None + +# # four lights are set to 100% brightness +# txt = r.audio_transcription +# m = re.search(r"(\d+)% bright", txt) +# if m is not None: +# return int(m.group(1)) +# if "is off" in txt: +# return 0 +# return None + +# def set_dimmer_level(self, level: int) -> bool: +# if not self.has_keyword("dimmer"): +# return False +# if 0 <= level <= 100: +# was_on = self.is_on() +# r = ask_google(f"set {self.goog_name()} to {level} percent") +# if not r.success: +# return False +# if not was_on: +# self.turn_off() +# return True +# return False + +# def make_color(self, color: str) -> bool: +# return GoogleLight.parse_google_response( +# ask_google(f"make {self.goog_name()} {color}") +# )