#!/usr/bin/env python3
+# © Copyright 2021-2022, Scott Gasch
+
"""Utilities for dealing with the smart lights."""
-from abc import ABC, abstractmethod
import datetime
-import json
import logging
-import os
import re
-import subprocess
-import sys
-from typing import Any, Dict, List, Optional, Set
+from abc import abstractmethod
+from typing import Any, Dict, List, Optional, Tuple
import tinytuya as tt
+from overrides import overrides
+import ansi
import argparse_utils
+import arper
import config
-import logging_utils
-import logical_search
-from google_assistant import ask_google, GoogleResponse
-from decorator_utils import timeout, memoized
+import smart_home.device as dev
+import smart_home.tplink_utils as tplink
+from decorator_utils import memoized
+from google_assistant import GoogleResponse, ask_google
logger = logging.getLogger(__name__)
-parser = config.add_commandline_args(
- f"Light Utils ({__file__})",
- "Args related to light utilities."
+args = config.add_commandline_args(
+ f"Smart Lights ({__file__})",
+ "Args related to smart lights.",
)
-parser.add_argument(
- '--light_utils_tplink_location',
+args.add_argument(
+ '--smart_lights_tplink_location',
default='/home/scott/bin/tplink.py',
metavar='FILENAME',
help='The location of the tplink.py helper',
type=argparse_utils.valid_filename,
)
-parser.add_argument(
- '--light_utils_network_mac_addresses_location',
- default='/home/scott/bin/network_mac_addresses.txt',
- metavar='FILENAME',
- help='The location of network_mac_addresses.txt',
- type=argparse_utils.valid_filename,
-)
-
-@timeout(
- 5.0, use_signals=False, error_message="Timed out waiting for tplink.py"
-)
-def tplink_light_command(command: str) -> bool:
- result = os.system(command)
- signal = result & 0xFF
- if signal != 0:
- logger.warning(f'{command} died with signal {signal}')
- logging_utils.hlog("%s died with signal %d" % (command, signal))
- return False
- else:
- exit_value = result >> 8
- if exit_value != 0:
- logger.warning(f'{command} failed, exited {exit_value}')
- logging_utils.hlog("%s failed, exit %d" % (command, exit_value))
- return False
- logger.debug(f'{command} succeeded.')
- return True
+class BaseLight(dev.Device):
+ """A base class representing a smart light."""
-class Light(ABC):
def __init__(self, name: str, mac: str, keywords: str = "") -> None:
- self.name = name.strip()
- self.mac = mac.strip()
- self.keywords = keywords.strip()
- self.kws = keywords.split()
+ super().__init__(name.strip(), mac.strip(), keywords)
- def get_name(self) -> str:
- return self.name
+ @staticmethod
+ def parse_color_string(color: str) -> Optional[Tuple[int, int, int]]:
+ m = re.match(
+ 'r#?([0-9a-fA-F][0-9a-fA-F])([0-9a-fA-F][0-9a-fA-F])([0-9a-fA-F][0-9a-fA-F])',
+ color,
+ )
+ if m is not None and len(m.groups()) == 3:
+ red = int(m.group(0), 16)
+ green = int(m.group(1), 16)
+ blue = int(m.group(2), 16)
+ return (red, green, blue)
+ color = color.lower()
+ return ansi.COLOR_NAMES_TO_RGB.get(color, None)
- def get_mac(self) -> str:
- return self.mac
+ @abstractmethod
+ def status(self) -> str:
+ pass
@abstractmethod
def turn_on(self) -> bool:
def make_color(self, color: str) -> bool:
pass
- def get_keywords(self) -> List[str]:
- return self.kws
- def has_keyword(self, keyword: str) -> bool:
- for kw in self.kws:
- if kw == keyword:
- return True
- return False
-
-
-class GoogleLight(Light):
- def __init__(self, name: str, mac: str, keywords: str = "") -> None:
- super().__init__(name, mac, keywords)
+class GoogleLight(BaseLight):
+ """A smart light controlled by talking to Google."""
def goog_name(self) -> str:
name = self.get_name()
def parse_google_response(response: GoogleResponse) -> bool:
return response.success
+ @overrides
def turn_on(self) -> bool:
- return GoogleLight.parse_google_response(
- ask_google(f"turn {self.goog_name()} on")
- )
+ return GoogleLight.parse_google_response(ask_google(f"turn {self.goog_name()} on"))
+ @overrides
def turn_off(self) -> bool:
- return GoogleLight.parse_google_response(
- ask_google(f"turn {self.goog_name()} off")
- )
+ return GoogleLight.parse_google_response(ask_google(f"turn {self.goog_name()} off"))
+
+ @overrides
+ def status(self) -> str:
+ if self.is_on():
+ return 'ON'
+ return 'off'
+ @overrides
def is_on(self) -> bool:
r = ask_google(f"is {self.goog_name()} on?")
if not r.success:
return False
- return 'is on' in r.audio_transcription
+ if r.audio_transcription is not None:
+ return 'is on' in r.audio_transcription
+ raise Exception("Can't reach Google?!")
+ @overrides
def is_off(self) -> bool:
return not self.is_on()
+ @overrides
def get_dimmer_level(self) -> Optional[int]:
if not self.has_keyword("dimmer"):
return False
# the bookcase one is set to 40% bright
txt = r.audio_transcription
- m = re.search(r"(\d+)% bright", txt)
- if m is not None:
- return int(m.group(1))
- if "is off" in txt:
- return 0
+ if txt is not None:
+ m = re.search(r"(\d+)% bright", txt)
+ if m is not None:
+ return int(m.group(1))
+ if "is off" in txt:
+ return 0
return None
+ @overrides
def set_dimmer_level(self, level: int) -> bool:
if not self.has_keyword("dimmer"):
return False
return True
return False
+ @overrides
def make_color(self, color: str) -> bool:
- return GoogleLight.parse_google_response(
- ask_google(f"make {self.goog_name()} {color}")
- )
+ return GoogleLight.parse_google_response(ask_google(f"make {self.goog_name()} {color}"))
+
+class TuyaLight(BaseLight):
+ """A Tuya smart light."""
-class TuyaLight(Light):
ids_by_mac = {
'68:C6:3A:DE:1A:94': '8844664268c63ade1a94',
'68:C6:3A:DE:27:1A': '8844664268c63ade271a',
}
def __init__(self, name: str, mac: str, keywords: str = "") -> None:
- from subprocess import Popen, PIPE
super().__init__(name, mac, keywords)
mac = mac.upper()
if mac not in TuyaLight.ids_by_mac or mac not in TuyaLight.keys_by_mac:
raise Exception(f'{mac} is unknown; add it to ids_by_mac and keys_by_mac')
self.devid = TuyaLight.ids_by_mac[mac]
self.key = TuyaLight.keys_by_mac[mac]
- try:
- pid = Popen(['maclookup', mac], stdout=PIPE)
- ip = pid.communicate()[0]
- ip = ip[:-1]
- except Exception:
- ip = '0.0.0.0'
+ self.arper = arper.Arper()
+ ip = self.get_ip()
self.bulb = tt.BulbDevice(self.devid, ip, local_key=self.key)
+ def get_status(self) -> Dict[str, Any]:
+ return self.bulb.status()
+
+ @overrides
+ def status(self) -> str:
+ ret = ''
+ for k, v in self.bulb.status().items():
+ ret += f'{k} = {v}\n'
+ return ret
+
+ @overrides
def turn_on(self) -> bool:
self.bulb.turn_on()
return True
+ @overrides
def turn_off(self) -> bool:
self.bulb.turn_off()
return True
- def get_status(self) -> Dict[str, Any]:
- return self.bulb.status()
-
+ @overrides
def is_on(self) -> bool:
s = self.get_status()
return s['dps']['1']
+ @overrides
def is_off(self) -> bool:
return not self.is_on()
+ @overrides
def get_dimmer_level(self) -> Optional[int]:
s = self.get_status()
return s['dps']['3']
+ @overrides
def set_dimmer_level(self, level: int) -> bool:
+ logger.debug('Setting brightness to %d', level)
self.bulb.set_brightness(level)
return True
+ @overrides
def make_color(self, color: str) -> bool:
- self.bulb.set_colour(255,0,0)
- return True
+ rgb = BaseLight.parse_color_string(color)
+ logger.debug('Light color: %s -> %s', color, rgb)
+ if rgb is not None:
+ self.bulb.set_colour(rgb[0], rgb[1], rgb[2])
+ return True
+ return False
+
+class TPLinkLight(BaseLight):
+ """A TPLink smart light."""
-class TPLinkLight(Light):
def __init__(self, name: str, mac: str, keywords: str = "") -> None:
super().__init__(name, mac, keywords)
self.children: List[str] = []
self.info: Optional[Dict] = None
self.info_ts: Optional[datetime.datetime] = None
- if "children" in self.keywords:
- self.info = self.get_info()
- if self.info is not None:
- for child in self.info["children"]:
- self.children.append(child["id"])
+ if self.keywords is not None:
+ if "children" in self.keywords:
+ self.info = self.get_info()
+ if self.info is not None:
+ for child in self.info["children"]:
+ self.children.append(child["id"])
@memoized
def get_tplink_name(self) -> Optional[str]:
def get_cmdline(self, child: str = None) -> str:
cmd = (
- f"{config.config['light_utils_tplink_location']} -m {self.mac} "
+ f"{config.config['smart_lights_tplink_location']} -m {self.mac} "
f"--no_logging_console "
)
if child is not None:
def get_children(self) -> List[str]:
return self.children
- def command(
- self, cmd: str, child: str = None, extra_args: str = None
- ) -> bool:
+ def command(self, cmd: str, child: str = None, extra_args: str = None) -> bool:
cmd = self.get_cmdline(child) + f"-c {cmd}"
if extra_args is not None:
cmd += f" {extra_args}"
- logger.debug(f'About to execute {cmd}')
- return tplink_light_command(cmd)
+ logger.debug('About to execute: %s', cmd)
+ return tplink.tplink_command_wrapper(cmd)
- def turn_on(self, child: str = None) -> bool:
+ @overrides
+ def turn_on(self) -> bool:
+ return self.command("on", None)
+
+ @overrides
+ def turn_off(self) -> bool:
+ return self.command("off", None)
+
+ def turn_on_child(self, child: str = None) -> bool:
return self.command("on", child)
- def turn_off(self, child: str = None) -> bool:
+ def turn_off_child(self, child: str = None) -> bool:
return self.command("off", child)
+ @overrides
def is_on(self) -> bool:
- return self.get_on_duration_seconds() > 0
+ self.info = self.get_info()
+ if self.info is None:
+ raise Exception('Unable to get info?')
+ return self.info.get("relay_state", 0) == 1
+ @overrides
def is_off(self) -> bool:
return not self.is_on()
+ @overrides
def make_color(self, color: str) -> bool:
raise NotImplementedError
- @timeout(
- 10.0, use_signals=False, error_message="Timed out waiting for tplink.py"
- )
def get_info(self) -> Optional[Dict]:
- cmd = self.get_cmdline() + "-c info"
- out = subprocess.getoutput(cmd)
- out = re.sub("Sent:.*\n", "", out)
- out = re.sub("Received: *", "", out)
- try:
- self.info = json.loads(out)["system"]["get_sysinfo"]
- self.info_ts = datetime.datetime.now()
+ ip = self.get_ip()
+ if ip is not None:
+ self.info = tplink.tplink_get_info(ip)
+ if self.info is not None:
+ self.info_ts = datetime.datetime.now()
+ else:
+ self.info_ts = None
return self.info
- except Exception as e:
- logger.exception(e)
- print(out, file=sys.stderr)
- self.info = None
- self.info_ts = None
- return None
+ return None
+
+ @overrides
+ def status(self) -> str:
+ ret = ''
+ info = self.get_info()
+ if info is not None:
+ for k, v in info:
+ ret += f'{k} = {v}\n'
+ return ret
def get_on_duration_seconds(self, child: str = None) -> int:
self.info = self.get_info()
return int(chi.get("on_time", "0"))
return 0
- def get_on_limit_seconds(self) -> Optional[int]:
- for kw in self.kws:
- m = re.search(r"timeout:(\d+)", kw)
- if m is not None:
- return int(m.group(1)) * 60
- return None
-
+ @overrides
def get_dimmer_level(self) -> Optional[int]:
if not self.has_keyword("dimmer"):
return False
return None
return int(self.info.get("brightness", "0"))
+ @overrides
def set_dimmer_level(self, level: int) -> bool:
if not self.has_keyword("dimmer"):
return False
cmd = (
self.get_cmdline()
- + f'-j \'{{"smartlife.iot.dimmer":{{"set_brightness":{{"brightness":{level} }} }} }}\''
- )
- return tplink_light_command(cmd)
-
-
-class GoogleLightGroup(GoogleLight):
- def __init__(self, name: str, members: List[GoogleLight], keywords: str = "") -> None:
- if len(members) < 1:
- raise Exception("There must be at least one light in the group.")
- self.members = members
- mac = GoogleLightGroup.make_up_mac(members)
- super().__init__(name, mac, keywords)
-
- @staticmethod
- def make_up_mac(members: List[GoogleLight]):
- mac = members[0].get_mac()
- b = mac.split(':')
- b[5] = int(b[5], 16) + 1
- if b[5] > 255:
- b[5] = 0
- b[5] = str(b[5])
- return ":".join(b)
-
- def is_on(self) -> bool:
- r = ask_google(f"are {self.goog_name()} on?")
- if not r.success:
- return False
- return 'is on' in r.audio_transcription
-
- def get_dimmer_level(self) -> Optional[int]:
- if not self.has_keyword("dimmer"):
- return False
- r = ask_google(f'how bright are {self.goog_name()}?')
- if not r.success:
- return None
-
- # four lights are set to 100% brightness
- txt = r.audio_transcription
- m = re.search(r"(\d+)% bright", txt)
- if m is not None:
- return int(m.group(1))
- if "is off" in txt:
- return 0
- return None
-
- def set_dimmer_level(self, level: int) -> bool:
- if not self.has_keyword("dimmer"):
- return False
- if 0 <= level <= 100:
- was_on = self.is_on()
- r = ask_google(f"set {self.goog_name()} to {level} percent")
- if not r.success:
- return False
- if not was_on:
- self.turn_off()
- return True
- return False
-
- def make_color(self, color: str) -> bool:
- return GoogleLight.parse_google_response(
- ask_google(f"make {self.goog_name()} {color}")
- )
-
-
-class LightingConfig(object):
- """Representation of the smart light device config."""
-
- def __init__(
- self,
- config_file: str = None,
- ) -> None:
- if config_file is None:
- config_file = config.config[
- 'light_utils_network_mac_addresses_location'
- ]
- self.macs_by_name = {}
- self._keywords_by_name = {}
- self.keywords_by_mac = {}
- self.names_by_mac = {}
- self.corpus = logical_search.Corpus()
- with open(config_file, "r") as f:
- contents = f.readlines()
-
- diningroom_lights = []
- bookcase_lights = []
- for line in contents:
- line = line.rstrip("\n")
- line = re.sub(r"#.*$", r"", line)
- line = line.strip()
- if line == "":
- continue
- (mac, name, keywords) = line.split(",")
- mac = mac.strip()
- name = name.strip()
- keywords = keywords.strip()
- if "perm" not in keywords:
- continue
- self.macs_by_name[name] = mac
- self._keywords_by_name[name] = keywords
- self.keywords_by_mac[mac] = keywords
- self.names_by_mac[mac] = name
-
-# if "bookcase_light_" in name:
-# bookcase_lights.append(mac)
-# elif "diningroom_light_" in name:
-# diningroom_lights.append(mac)
-# else:
- self.index_light(name, keywords, mac)
-
- # name = 'bookcase_lights'
- # group = []
- # keywords = 'perm wifi light smart goog dimmer'
- # for b in bookcase_lights:
- # group.append(self.get_light_by_mac(b))
- # self.bookcase_group = GoogleLightGroup(
- # name,
- # group,
- # keywords,
- # )
- # mac = self.bookcase_group.get_mac()
- # self.macs_by_name[name] = mac
- # self._keywords_by_name[name] = keywords
- # self.keywords_by_mac[mac] = keywords
- # self.names_by_mac[mac] = name
- # self.index_light(name, keywords, mac)
-
- # name = 'dining_room_lights'
- # group = []
- # for b in diningroom_lights:
- # group.append(self.get_light_by_mac(b))
- # self.diningroom_group = GoogleLightGroup(
- # name,
- # group,
- # keywords,
- # )
- # mac = self.diningroom_group.get_mac()
- # self.macs_by_name[name] = mac
- # self._keywords_by_name[name] = keywords
- # self.keywords_by_mac[mac] = keywords
- # self.names_by_mac[mac] = name
- # self.index_light(name, keywords, mac)
-
- def index_light(self, name: str, keywords: str, mac: str) -> None:
- properties = [("name", name)]
- tags = set()
- for kw in keywords.split():
- if ":" in kw:
- key, value = kw.split(":")
- properties.append((key, value))
- else:
- tags.add(kw)
- light = logical_search.Document(
- docid=mac,
- tags=tags,
- properties=properties,
- reference=None,
+ + '-j \'{"smartlife.iot.dimmer":{"set_brightness":{"brightness":%d}}}\'' % level
)
- self.corpus.add_doc(light)
-
- def __repr__(self) -> str:
- s = "Known devices:\n"
- for name, keywords in self._keywords_by_name.items():
- mac = self.macs_by_name[name]
- s += f" {name} ({mac}) => {keywords}\n"
- return s
-
- def get_keywords_by_name(self, name: str) -> Optional[str]:
- return self._keywords_by_name.get(name, None)
-
- def get_macs_by_name(self, name: str) -> Set[str]:
- retval = set()
- for (mac, lname) in self.names_by_mac.items():
- if name in lname:
- retval.add(mac)
- return retval
-
- def get_macs_by_keyword(self, keyword: str) -> Set[str]:
- retval = set()
- for (mac, keywords) in self.keywords_by_mac.items():
- if keyword in keywords:
- retval.add(mac)
- return retval
-
- def get_light_by_name(self, name: str) -> Optional[Light]:
- if name in self.macs_by_name:
- return self.get_light_by_mac(self.macs_by_name[name])
- return None
-
- def get_all_lights(self) -> List[Light]:
- retval = []
- for (mac, kws) in self.keywords_by_mac.items():
- if mac is not None:
- light = self.get_light_by_mac(mac)
- if light is not None:
- retval.append(light)
- return retval
-
- def get_light_by_mac(self, mac: str) -> Optional[Light]:
- if mac in self.keywords_by_mac:
- name = self.names_by_mac[mac]
- kws = self.keywords_by_mac[mac]
- if name == 'bookcase_lights':
- return self.bookcase_group
- elif name == 'dining_room_lights':
- return self.diningroom_group
- elif 'tplink' in kws.lower():
- return TPLinkLight(name, mac, kws)
- elif 'tuya' in kws.lower():
- return TuyaLight(name, mac, kws)
- else:
- return GoogleLight(name, mac, kws)
- return None
-
- def query(self, query: str) -> List[Light]:
- """Evaluates a lighting query expression formed of keywords to search
- for, logical operators (and, or, not), and parenthesis.
- Returns a list of matching lights.
- """
- retval = []
- results = self.corpus.query(query)
- if results is not None:
- for mac in results:
- if mac is not None:
- light = self.get_light_by_mac(mac)
- if light is not None:
- retval.append(light)
- return retval
+ return tplink.tplink_command_wrapper(cmd)
+
+
+# class GoogleLightGroup(GoogleLight):
+# def __init__(self, name: str, members: List[GoogleLight], keywords: str = "") -> None:
+# if len(members) < 1:
+# raise Exception("There must be at least one light in the group.")
+# self.members = members
+# mac = GoogleLightGroup.make_up_mac(members)
+# super().__init__(name, mac, keywords)
+
+# @staticmethod
+# def make_up_mac(members: List[GoogleLight]):
+# mac = members[0].get_mac()
+# b = mac.split(':')
+# b[5] = int(b[5], 16) + 1
+# if b[5] > 255:
+# b[5] = 0
+# b[5] = str(b[5])
+# return ":".join(b)
+
+# def is_on(self) -> bool:
+# r = ask_google(f"are {self.goog_name()} on?")
+# if not r.success:
+# return False
+# return 'is on' in r.audio_transcription
+
+# def get_dimmer_level(self) -> Optional[int]:
+# if not self.has_keyword("dimmer"):
+# return False
+# r = ask_google(f'how bright are {self.goog_name()}?')
+# if not r.success:
+# return None
+
+# # four lights are set to 100% brightness
+# txt = r.audio_transcription
+# m = re.search(r"(\d+)% bright", txt)
+# if m is not None:
+# return int(m.group(1))
+# if "is off" in txt:
+# return 0
+# return None
+
+# def set_dimmer_level(self, level: int) -> bool:
+# if not self.has_keyword("dimmer"):
+# return False
+# if 0 <= level <= 100:
+# was_on = self.is_on()
+# r = ask_google(f"set {self.goog_name()} to {level} percent")
+# if not r.success:
+# return False
+# if not was_on:
+# self.turn_off()
+# return True
+# return False
+
+# def make_color(self, color: str) -> bool:
+# return GoogleLight.parse_google_response(
+# ask_google(f"make {self.goog_name()} {color}")
+# )