From: Scott Gasch Date: Fri, 29 Oct 2021 03:50:36 +0000 (-0700) Subject: Smart outlets X-Git-Url: https://wannabe.guru.org/gitweb/?a=commitdiff_plain;h=2a9cbfa6e97a8cb5ed68c838f5ec09bef654c37f;p=python_utils.git Smart outlets --- diff --git a/smart_home/config.py b/smart_home/config.py new file mode 100644 index 0000000..723e097 --- /dev/null +++ b/smart_home/config.py @@ -0,0 +1,164 @@ +#!/usr/bin/env python3 + +import re +from typing import List, Optional, Set + +import argparse_utils +import config +import file_utils +import logical_search +import smart_home.device as device +import smart_home.lights as lights +import smart_home.outlets as outlets + +parser = config.add_commandline_args( + f"Smart Home Config ({__file__})", + "Args related to the smart home config." +) +parser.add_argument( + '--smart_home_config_file_location', + default='/home/scott/bin/network_mac_addresses.txt', + metavar='FILENAME', + help='The location of network_mac_addresses.txt', + type=argparse_utils.valid_filename, +) + + +class SmartHomeConfig(object): + def __init__( + self, + config_file: Optional[str] = None, + filters: List[str] = ['smart'], + ) -> None: + if config_file is None: + config_file = config.config[ + 'smart_home_config_file_location' + ] + assert file_utils.does_file_exist(config_file) + with open(config_file, "r") as f: + contents = f.readlines() + + self._macs_by_name = {} + self._keywords_by_name = {} + self._keywords_by_mac = {} + self._names_by_mac = {} + self._corpus = logical_search.Corpus() + + for line in contents: + line = line.rstrip("\n") + line = re.sub(r"#.*$", r"", line) + line = line.strip() + if line == "": + continue + (mac, name, keywords) = line.split(",") + mac = mac.strip() + name = name.strip() + keywords = keywords.strip() + + if filters is not None: + for f in filters: + if not f in keywords: + continue + + self._macs_by_name[name] = mac + self._keywords_by_name[name] = keywords + self._keywords_by_mac[mac] = keywords + self._names_by_mac[mac] = name + self.index_device(name, keywords, mac) + + def index_device(self, name: str, keywords: str, mac: str) -> None: + properties = [("name", name)] + tags = set() + for kw in keywords.split(): + if ":" in kw: + key, value = kw.split(":") + properties.append((key, value)) + else: + tags.add(kw) + device = logical_search.Document( + docid=mac, + tags=tags, + properties=properties, + reference=None, + ) + self._corpus.add_doc(device) + + def __repr__(self) -> str: + s = "Known devices:\n" + for name, keywords in self._keywords_by_name.items(): + mac = self._macs_by_name[name] + s += f" {name} ({mac}) => {keywords}\n" + return s + + def get_keywords_by_name(self, name: str) -> Optional[device.Device]: + return self._keywords_by_name.get(name, None) + + def get_macs_by_name(self, name: str) -> Set[str]: + retval = set() + for (mac, lname) in self._names_by_mac.items(): + if name in lname: + retval.add(mac) + return retval + + def get_macs_by_keyword(self, keyword: str) -> Set[str]: + retval = set() + for (mac, keywords) in self._keywords_by_mac.items(): + if keyword in keywords: + retval.add(mac) + return retval + + def get_device_by_name(self, name: str) -> Optional[device.Device]: + if name in self._macs_by_name: + return self.get_device_by_mac(self._macs_by_name[name]) + return None + + def get_all_devices(self) -> List[device.Device]: + retval = [] + for (mac, kws) in self._keywords_by_mac.items(): + if mac is not None: + device = self.get_device_by_mac(mac) + if device is not None: + retval.append(device) + return retval + + def get_device_by_mac(self, mac: str) -> Optional[device.Device]: + if mac in self._keywords_by_mac: + name = self._names_by_mac[mac] + kws = self._keywords_by_mac[mac] + if 'light' in kws.lower(): + if 'tplink' in kws.lower(): + return lights.TPLinkLight(name, mac, kws) + elif 'tuya' in kws.lower(): + return lights.TuyaLight(name, mac, kws) + elif 'goog' in kws.lower(): + return lights.GoogleLight(name, mac, kws) + else: + raise Exception(f'Unknown light device: {name}, {mac}, {kws}') + elif 'outlet' in kws.lower(): + if 'tplink' in kws.lower(): + if 'children' in kws.lower(): + return outlets.TPLinkOutletWithChildren(name, mac, kws) + else: + return outlets.TPLinkOutlet(name, mac, kws) + elif 'goog' in kws.lower(): + return outlets.GoogleOutlet(name, mac, kws) + else: + raise Exception(f'Unknown outlet device: {name}, {mac}, {kws}') + else: + return device.Device(name, mac, kws) + return None + + def query(self, query: str) -> List[device.Device]: + """Evaluates a lighting query expression formed of keywords to search + for, logical operators (and, or, not), and parenthesis. + Returns a list of matching lights. + """ + retval = [] + results = self._corpus.query(query) + if results is not None: + for mac in results: + if mac is not None: + device = self.get_device_by_mac(mac) + if device is not None: + retval.append(device) + return retval diff --git a/smart_home/device.py b/smart_home/device.py new file mode 100644 index 0000000..27860c5 --- /dev/null +++ b/smart_home/device.py @@ -0,0 +1,41 @@ +#!/usr/bin/env python3 + +import re +from typing import Any, List, Optional, Tuple + +class Device(object): + def __init__( + self, + name: str, + mac: str, + keywords: Optional[List[str]], + ): + self.name = name + self.mac = mac + self.keywords = keywords + if keywords is not None: + self.kws = keywords.split() + else: + self.kws = [] + + def get_name(self) -> str: + return self.name + + def get_mac(self) -> str: + return self.mac + + def get_keywords(self) -> Optional[List[str]]: + return self.kws + + def has_keyword(self, keyword: str) -> bool: + for kw in self.kws: + if kw == keyword: + return True + return False + + def get_on_limit_seconds(self) -> Optional[int]: + for kw in self.kws: + m = re.search(r"timeout:(\d+)", kw) + if m is not None: + return int(m.group(1)) * 60 + return None diff --git a/smart_home/lights.py b/smart_home/lights.py index 6ca6e71..0752139 100644 --- a/smart_home/lights.py +++ b/smart_home/lights.py @@ -17,30 +17,23 @@ import tinytuya as tt import argparse_utils import config import logging_utils -import logical_search +import smart_home.device as dev from google_assistant import ask_google, GoogleResponse from decorator_utils import timeout, memoized logger = logging.getLogger(__name__) parser = config.add_commandline_args( - f"Light Utils ({__file__})", - "Args related to light utilities." + f"Smart Lights ({__file__})", + "Args related to smart lights.", ) parser.add_argument( - '--light_utils_tplink_location', + '--smart_lights_tplink_location', default='/home/scott/bin/tplink.py', metavar='FILENAME', help='The location of the tplink.py helper', type=argparse_utils.valid_filename, ) -parser.add_argument( - '--light_utils_network_mac_addresses_location', - default='/home/scott/bin/network_mac_addresses.txt', - metavar='FILENAME', - help='The location of network_mac_addresses.txt', - type=argparse_utils.valid_filename, -) @timeout( @@ -63,18 +56,9 @@ def tplink_light_command(command: str) -> bool: return True -class Light(ABC): +class BaseLight(dev.Device): def __init__(self, name: str, mac: str, keywords: str = "") -> None: - self.name = name.strip() - self.mac = mac.strip() - self.keywords = keywords.strip() - self.kws = keywords.split() - - def get_name(self) -> str: - return self.name - - def get_mac(self) -> str: - return self.mac + super().__init__(name.strip(), mac.strip(), keywords) @abstractmethod def turn_on(self) -> bool: @@ -104,17 +88,8 @@ class Light(ABC): def make_color(self, color: str) -> bool: pass - def get_keywords(self) -> List[str]: - return self.kws - def has_keyword(self, keyword: str) -> bool: - for kw in self.kws: - if kw == keyword: - return True - return False - - -class GoogleLight(Light): +class GoogleLight(BaseLight): def __init__(self, name: str, mac: str, keywords: str = "") -> None: super().__init__(name, mac, keywords) @@ -180,7 +155,7 @@ class GoogleLight(Light): ) -class TuyaLight(Light): +class TuyaLight(BaseLight): ids_by_mac = { '68:C6:3A:DE:1A:94': '8844664268c63ade1a94', '68:C6:3A:DE:27:1A': '8844664268c63ade271a', @@ -245,7 +220,7 @@ class TuyaLight(Light): return True -class TPLinkLight(Light): +class TPLinkLight(BaseLight): def __init__(self, name: str, mac: str, keywords: str = "") -> None: super().__init__(name, mac, keywords) self.children: List[str] = [] @@ -266,7 +241,7 @@ class TPLinkLight(Light): def get_cmdline(self, child: str = None) -> str: cmd = ( - f"{config.config['light_utils_tplink_location']} -m {self.mac} " + f"{config.config['smart_lights_tplink_location']} -m {self.mac} " f"--no_logging_console " ) if child is not None: @@ -358,225 +333,60 @@ class TPLinkLight(Light): return tplink_light_command(cmd) -class GoogleLightGroup(GoogleLight): - def __init__(self, name: str, members: List[GoogleLight], keywords: str = "") -> None: - if len(members) < 1: - raise Exception("There must be at least one light in the group.") - self.members = members - mac = GoogleLightGroup.make_up_mac(members) - super().__init__(name, mac, keywords) - - @staticmethod - def make_up_mac(members: List[GoogleLight]): - mac = members[0].get_mac() - b = mac.split(':') - b[5] = int(b[5], 16) + 1 - if b[5] > 255: - b[5] = 0 - b[5] = str(b[5]) - return ":".join(b) - - def is_on(self) -> bool: - r = ask_google(f"are {self.goog_name()} on?") - if not r.success: - return False - return 'is on' in r.audio_transcription - - def get_dimmer_level(self) -> Optional[int]: - if not self.has_keyword("dimmer"): - return False - r = ask_google(f'how bright are {self.goog_name()}?') - if not r.success: - return None - - # four lights are set to 100% brightness - txt = r.audio_transcription - m = re.search(r"(\d+)% bright", txt) - if m is not None: - return int(m.group(1)) - if "is off" in txt: - return 0 - return None - - def set_dimmer_level(self, level: int) -> bool: - if not self.has_keyword("dimmer"): - return False - if 0 <= level <= 100: - was_on = self.is_on() - r = ask_google(f"set {self.goog_name()} to {level} percent") - if not r.success: - return False - if not was_on: - self.turn_off() - return True - return False - - def make_color(self, color: str) -> bool: - return GoogleLight.parse_google_response( - ask_google(f"make {self.goog_name()} {color}") - ) - - -class LightingConfig(object): - """Representation of the smart light device config.""" - - def __init__( - self, - config_file: str = None, - ) -> None: - if config_file is None: - config_file = config.config[ - 'light_utils_network_mac_addresses_location' - ] - self.macs_by_name = {} - self._keywords_by_name = {} - self.keywords_by_mac = {} - self.names_by_mac = {} - self.corpus = logical_search.Corpus() - with open(config_file, "r") as f: - contents = f.readlines() - - diningroom_lights = [] - bookcase_lights = [] - for line in contents: - line = line.rstrip("\n") - line = re.sub(r"#.*$", r"", line) - line = line.strip() - if line == "": - continue - (mac, name, keywords) = line.split(",") - mac = mac.strip() - name = name.strip() - keywords = keywords.strip() - if "perm" not in keywords: - continue - self.macs_by_name[name] = mac - self._keywords_by_name[name] = keywords - self.keywords_by_mac[mac] = keywords - self.names_by_mac[mac] = name - -# if "bookcase_light_" in name: -# bookcase_lights.append(mac) -# elif "diningroom_light_" in name: -# diningroom_lights.append(mac) -# else: - self.index_light(name, keywords, mac) - - # name = 'bookcase_lights' - # group = [] - # keywords = 'perm wifi light smart goog dimmer' - # for b in bookcase_lights: - # group.append(self.get_light_by_mac(b)) - # self.bookcase_group = GoogleLightGroup( - # name, - # group, - # keywords, - # ) - # mac = self.bookcase_group.get_mac() - # self.macs_by_name[name] = mac - # self._keywords_by_name[name] = keywords - # self.keywords_by_mac[mac] = keywords - # self.names_by_mac[mac] = name - # self.index_light(name, keywords, mac) - - # name = 'dining_room_lights' - # group = [] - # for b in diningroom_lights: - # group.append(self.get_light_by_mac(b)) - # self.diningroom_group = GoogleLightGroup( - # name, - # group, - # keywords, - # ) - # mac = self.diningroom_group.get_mac() - # self.macs_by_name[name] = mac - # self._keywords_by_name[name] = keywords - # self.keywords_by_mac[mac] = keywords - # self.names_by_mac[mac] = name - # self.index_light(name, keywords, mac) - - def index_light(self, name: str, keywords: str, mac: str) -> None: - properties = [("name", name)] - tags = set() - for kw in keywords.split(): - if ":" in kw: - key, value = kw.split(":") - properties.append((key, value)) - else: - tags.add(kw) - light = logical_search.Document( - docid=mac, - tags=tags, - properties=properties, - reference=None, - ) - self.corpus.add_doc(light) - - def __repr__(self) -> str: - s = "Known devices:\n" - for name, keywords in self._keywords_by_name.items(): - mac = self.macs_by_name[name] - s += f" {name} ({mac}) => {keywords}\n" - return s - - def get_keywords_by_name(self, name: str) -> Optional[str]: - return self._keywords_by_name.get(name, None) - - def get_macs_by_name(self, name: str) -> Set[str]: - retval = set() - for (mac, lname) in self.names_by_mac.items(): - if name in lname: - retval.add(mac) - return retval - - def get_macs_by_keyword(self, keyword: str) -> Set[str]: - retval = set() - for (mac, keywords) in self.keywords_by_mac.items(): - if keyword in keywords: - retval.add(mac) - return retval - - def get_light_by_name(self, name: str) -> Optional[Light]: - if name in self.macs_by_name: - return self.get_light_by_mac(self.macs_by_name[name]) - return None - - def get_all_lights(self) -> List[Light]: - retval = [] - for (mac, kws) in self.keywords_by_mac.items(): - if mac is not None: - light = self.get_light_by_mac(mac) - if light is not None: - retval.append(light) - return retval - - def get_light_by_mac(self, mac: str) -> Optional[Light]: - if mac in self.keywords_by_mac: - name = self.names_by_mac[mac] - kws = self.keywords_by_mac[mac] - if name == 'bookcase_lights': - return self.bookcase_group - elif name == 'dining_room_lights': - return self.diningroom_group - elif 'tplink' in kws.lower(): - return TPLinkLight(name, mac, kws) - elif 'tuya' in kws.lower(): - return TuyaLight(name, mac, kws) - else: - return GoogleLight(name, mac, kws) - return None - - def query(self, query: str) -> List[Light]: - """Evaluates a lighting query expression formed of keywords to search - for, logical operators (and, or, not), and parenthesis. - Returns a list of matching lights. - """ - retval = [] - results = self.corpus.query(query) - if results is not None: - for mac in results: - if mac is not None: - light = self.get_light_by_mac(mac) - if light is not None: - retval.append(light) - return retval +# class GoogleLightGroup(GoogleLight): +# def __init__(self, name: str, members: List[GoogleLight], keywords: str = "") -> None: +# if len(members) < 1: +# raise Exception("There must be at least one light in the group.") +# self.members = members +# mac = GoogleLightGroup.make_up_mac(members) +# super().__init__(name, mac, keywords) + +# @staticmethod +# def make_up_mac(members: List[GoogleLight]): +# mac = members[0].get_mac() +# b = mac.split(':') +# b[5] = int(b[5], 16) + 1 +# if b[5] > 255: +# b[5] = 0 +# b[5] = str(b[5]) +# return ":".join(b) + +# def is_on(self) -> bool: +# r = ask_google(f"are {self.goog_name()} on?") +# if not r.success: +# return False +# return 'is on' in r.audio_transcription + +# def get_dimmer_level(self) -> Optional[int]: +# if not self.has_keyword("dimmer"): +# return False +# r = ask_google(f'how bright are {self.goog_name()}?') +# if not r.success: +# return None + +# # four lights are set to 100% brightness +# txt = r.audio_transcription +# m = re.search(r"(\d+)% bright", txt) +# if m is not None: +# return int(m.group(1)) +# if "is off" in txt: +# return 0 +# return None + +# def set_dimmer_level(self, level: int) -> bool: +# if not self.has_keyword("dimmer"): +# return False +# if 0 <= level <= 100: +# was_on = self.is_on() +# r = ask_google(f"set {self.goog_name()} to {level} percent") +# if not r.success: +# return False +# if not was_on: +# self.turn_off() +# return True +# return False + +# def make_color(self, color: str) -> bool: +# return GoogleLight.parse_google_response( +# ask_google(f"make {self.goog_name()} {color}") +# ) diff --git a/smart_home/outlets.py b/smart_home/outlets.py new file mode 100644 index 0000000..527c52c --- /dev/null +++ b/smart_home/outlets.py @@ -0,0 +1,229 @@ +#!/usr/bin/env python3 + +"""Utilities for dealing with the smart outlets.""" + +from abc import abstractmethod +import datetime +import json +import logging +import os +import re +import subprocess +import sys +from typing import Any, Dict, List, Optional, Set + +import argparse_utils +import config +import logging_utils +import smart_home.device as dev +from google_assistant import ask_google, GoogleResponse +from decorator_utils import timeout, memoized + +logger = logging.getLogger(__name__) + +parser = config.add_commandline_args( + f"Outlet Utils ({__file__})", + "Args related to smart outlets.", +) +parser.add_argument( + '--smart_outlets_tplink_location', + default='/home/scott/bin/tplink.py', + metavar='FILENAME', + help='The location of the tplink.py helper', + type=argparse_utils.valid_filename, +) + + +@timeout( + 5.0, use_signals=False, error_message="Timed out waiting for tplink.py" +) +def tplink_outlet_command(command: str) -> bool: + result = os.system(command) + signal = result & 0xFF + if signal != 0: + logger.warning(f'{command} died with signal {signal}') + logging_utils.hlog("%s died with signal %d" % (command, signal)) + return False + else: + exit_value = result >> 8 + if exit_value != 0: + logger.warning(f'{command} failed, exited {exit_value}') + logging_utils.hlog("%s failed, exit %d" % (command, exit_value)) + return False + logger.debug(f'{command} succeeded.') + return True + + +class BaseOutlet(dev.Device): + def __init__(self, name: str, mac: str, keywords: str = "") -> None: + super().__init__(name.strip(), mac.strip(), keywords) + self.info = None + + @abstractmethod + def turn_on(self) -> bool: + pass + + @abstractmethod + def turn_off(self) -> bool: + pass + + @abstractmethod + def is_on(self) -> bool: + pass + + @abstractmethod + def is_off(self) -> bool: + pass + + +class TPLinkOutlet(BaseOutlet): + def __init__(self, name: str, mac: str, keywords: str = '') -> None: + super().__init__(name, mac, keywords) + self.info: Optional[Dict] = None + self.info_ts: Optional[datetime.datetime] = None + + @memoized + def get_tplink_name(self) -> Optional[str]: + self.info = self.get_info() + if self.info is not None: + return self.info["alias"] + return None + + def get_cmdline(self) -> str: + cmd = ( + f"{config.config['smart_outlets_tplink_location']} -m {self.mac} " + f"--no_logging_console " + ) + return cmd + + def command(self, cmd: str, extra_args: str = None) -> bool: + cmd = self.get_cmdline() + f"-c {cmd}" + if extra_args is not None: + cmd += f" {extra_args}" + return tplink_outlet_command(cmd) + + def turn_on(self) -> bool: + return self.command('on') + + def turn_off(self) -> bool: + return self.command('off') + + def is_on(self) -> bool: + return self.get_on_duration_seconds() > 0 + + def is_off(self) -> bool: + return not self.is_on() + + @timeout( + 10.0, use_signals=False, error_message="Timed out waiting for tplink.py" + ) + def get_info(self) -> Optional[Dict]: + cmd = self.get_cmdline() + "-c info" + out = subprocess.getoutput(cmd) + out = re.sub("Sent:.*\n", "", out) + out = re.sub("Received: *", "", out) + try: + self.info = json.loads(out)["system"]["get_sysinfo"] + self.info_ts = datetime.datetime.now() + return self.info + except Exception as e: + logger.exception(e) + print(out, file=sys.stderr) + self.info = None + self.info_ts = None + return None + + def get_on_duration_seconds(self) -> int: + self.info = self.get_info() + if self.info is None: + return 0 + return int(self.info.get("on_time", "0")) + + +class TPLinkOutletWithChildren(TPLinkOutlet): + def __init__(self, name: str, mac: str, keywords: str = "") -> None: + super().__init__(name, mac, keywords) + self.children: List[str] = [] + self.info: Optional[Dict] = None + self.info_ts: Optional[datetime.datetime] = None + assert "children" in self.keywords + self.info = self.get_info() + if self.info is not None: + for child in self.info["children"]: + self.children.append(child["id"]) + + # override + def get_cmdline(self, child: Optional[str] = None) -> str: + cmd = ( + f"{config.config['smart_outlets_tplink_location']} -m {self.mac} " + f"--no_logging_console " + ) + if child is not None: + cmd += f"-x {child} " + return cmd + + # override + def command( + self, cmd: str, child: str = None, extra_args: str = None + ) -> bool: + cmd = self.get_cmdline(child) + f"-c {cmd}" + if extra_args is not None: + cmd += f" {extra_args}" + logger.debug(f'About to execute {cmd}') + return tplink_light_command(cmd) + + def get_children(self) -> List[str]: + return self.children + + def turn_on(self, child: str = None) -> bool: + return self.command("on", child) + + def turn_off(self, child: str = None) -> bool: + return self.command("off", child) + + def get_on_duration_seconds(self, child: str = None) -> int: + self.info = self.get_info() + if child is None: + if self.info is None: + return 0 + return int(self.info.get("on_time", "0")) + else: + if self.info is None: + return 0 + for chi in self.info.get("children", {}): + if chi["id"] == child: + return int(chi.get("on_time", "0")) + return 0 + + +class GoogleOutlet(BaseOutlet): + def __init__(self, name: str, mac: str, keywords: str = "") -> None: + super().__init__(name.strip(), mac.strip(), keywords) + self.info = None + + def goog_name(self) -> str: + name = self.get_name() + return name.replace('_', ' ') + + @staticmethod + def parse_google_response(response: GoogleResponse) -> bool: + return response.success + + def turn_on(self) -> bool: + return GoogleOutlet.parse_google_response( + ask_google('turn {self.goog_name()} on') + ) + + def turn_off(self) -> bool: + return GoogleOutlet.parse_google_response( + ask_google('turn {self.goog_name()} off') + ) + + def is_on(self) -> bool: + r = ask_google(f'is {self.goog_name()} on?') + if not r.success: + return False + return 'is on' in r.audio_transcription + + def is_off(self) -> bool: + return not self.is_on()