"""Utilities for dealing with the smart lights."""
-from abc import ABC, abstractmethod
import datetime
import json
import logging
import re
import subprocess
import sys
-from typing import Any, Dict, List, Optional, Set
+from abc import abstractmethod
+from typing import Any, Dict, List, Optional, Tuple
import tinytuya as tt
+from overrides import overrides
+import ansi
import argparse_utils
+import arper
import config
import logging_utils
import smart_home.device as dev
-from google_assistant import ask_google, GoogleResponse
-from decorator_utils import timeout, memoized
+from decorator_utils import memoized, timeout
+from google_assistant import GoogleResponse, ask_google
logger = logging.getLogger(__name__)
-parser = config.add_commandline_args(
+args = config.add_commandline_args(
f"Smart Lights ({__file__})",
"Args related to smart lights.",
)
-parser.add_argument(
+args.add_argument(
'--smart_lights_tplink_location',
default='/home/scott/bin/tplink.py',
metavar='FILENAME',
)
-@timeout(
- 5.0, use_signals=False, error_message="Timed out waiting for tplink.py"
-)
+@timeout(5.0, use_signals=False, error_message="Timed out waiting for tplink.py")
def tplink_light_command(command: str) -> bool:
result = os.system(command)
signal = result & 0xFF
if signal != 0:
- logger.warning(f'{command} died with signal {signal}')
- logging_utils.hlog("%s died with signal %d" % (command, signal))
+ msg = f'{command} died with signal {signal}'
+ logger.warning(msg)
+ logging_utils.hlog(msg)
return False
else:
exit_value = result >> 8
if exit_value != 0:
- logger.warning(f'{command} failed, exited {exit_value}')
- logging_utils.hlog("%s failed, exit %d" % (command, exit_value))
+ msg = f'{command} failed, exited {exit_value}'
+ logger.warning(msg)
+ logging_utils.hlog(msg)
return False
logger.debug(f'{command} succeeded.')
return True
def __init__(self, name: str, mac: str, keywords: str = "") -> None:
super().__init__(name.strip(), mac.strip(), keywords)
+ @staticmethod
+ def parse_color_string(color: str) -> Optional[Tuple[int, int, int]]:
+ m = re.match(
+ 'r#?([0-9a-fA-F][0-9a-fA-F])([0-9a-fA-F][0-9a-fA-F])([0-9a-fA-F][0-9a-fA-F])',
+ color,
+ )
+ if m is not None and len(m.groups()) == 3:
+ red = int(m.group(0), 16)
+ green = int(m.group(1), 16)
+ blue = int(m.group(2), 16)
+ return (red, green, blue)
+ color = color.lower()
+ return ansi.COLOR_NAMES_TO_RGB.get(color, None)
+
+ @abstractmethod
+ def status(self) -> str:
+ pass
+
@abstractmethod
def turn_on(self) -> bool:
pass
def parse_google_response(response: GoogleResponse) -> bool:
return response.success
+ @overrides
def turn_on(self) -> bool:
- return GoogleLight.parse_google_response(
- ask_google(f"turn {self.goog_name()} on")
- )
+ return GoogleLight.parse_google_response(ask_google(f"turn {self.goog_name()} on"))
+ @overrides
def turn_off(self) -> bool:
- return GoogleLight.parse_google_response(
- ask_google(f"turn {self.goog_name()} off")
- )
+ return GoogleLight.parse_google_response(ask_google(f"turn {self.goog_name()} off"))
+
+ @overrides
+ def status(self) -> str:
+ if self.is_on():
+ return 'ON'
+ return 'off'
+ @overrides
def is_on(self) -> bool:
r = ask_google(f"is {self.goog_name()} on?")
if not r.success:
return False
- return 'is on' in r.audio_transcription
+ if r.audio_transcription is not None:
+ return 'is on' in r.audio_transcription
+ raise Exception("Can't reach Google?!")
+ @overrides
def is_off(self) -> bool:
return not self.is_on()
+ @overrides
def get_dimmer_level(self) -> Optional[int]:
if not self.has_keyword("dimmer"):
return False
# the bookcase one is set to 40% bright
txt = r.audio_transcription
- m = re.search(r"(\d+)% bright", txt)
- if m is not None:
- return int(m.group(1))
- if "is off" in txt:
- return 0
+ if txt is not None:
+ m = re.search(r"(\d+)% bright", txt)
+ if m is not None:
+ return int(m.group(1))
+ if "is off" in txt:
+ return 0
return None
+ @overrides
def set_dimmer_level(self, level: int) -> bool:
if not self.has_keyword("dimmer"):
return False
return True
return False
+ @overrides
def make_color(self, color: str) -> bool:
- return GoogleLight.parse_google_response(
- ask_google(f"make {self.goog_name()} {color}")
- )
+ return GoogleLight.parse_google_response(ask_google(f"make {self.goog_name()} {color}"))
class TuyaLight(BaseLight):
}
def __init__(self, name: str, mac: str, keywords: str = "") -> None:
- from subprocess import Popen, PIPE
super().__init__(name, mac, keywords)
mac = mac.upper()
if mac not in TuyaLight.ids_by_mac or mac not in TuyaLight.keys_by_mac:
raise Exception(f'{mac} is unknown; add it to ids_by_mac and keys_by_mac')
self.devid = TuyaLight.ids_by_mac[mac]
self.key = TuyaLight.keys_by_mac[mac]
- try:
- pid = Popen(['maclookup', mac], stdout=PIPE)
- ip = pid.communicate()[0]
- ip = ip[:-1]
- except Exception:
- ip = '0.0.0.0'
+ self.arper = arper.Arper()
+ ip = self.get_ip()
self.bulb = tt.BulbDevice(self.devid, ip, local_key=self.key)
+ def get_status(self) -> Dict[str, Any]:
+ return self.bulb.status()
+
+ @overrides
+ def status(self) -> str:
+ ret = ''
+ for k, v in self.bulb.status().items():
+ ret += f'{k} = {v}\n'
+ return ret
+
+ @overrides
def turn_on(self) -> bool:
self.bulb.turn_on()
return True
+ @overrides
def turn_off(self) -> bool:
self.bulb.turn_off()
return True
- def get_status(self) -> Dict[str, Any]:
- return self.bulb.status()
-
+ @overrides
def is_on(self) -> bool:
s = self.get_status()
return s['dps']['1']
+ @overrides
def is_off(self) -> bool:
return not self.is_on()
+ @overrides
def get_dimmer_level(self) -> Optional[int]:
s = self.get_status()
return s['dps']['3']
+ @overrides
def set_dimmer_level(self, level: int) -> bool:
+ logger.debug(f'Setting brightness to {level}')
self.bulb.set_brightness(level)
return True
+ @overrides
def make_color(self, color: str) -> bool:
- self.bulb.set_colour(255,0,0)
- return True
+ rgb = BaseLight.parse_color_string(color)
+ logger.debug(f'Light color: {color} -> {rgb}')
+ if rgb is not None:
+ self.bulb.set_colour(rgb[0], rgb[1], rgb[2])
+ return True
+ return False
class TPLinkLight(BaseLight):
def get_children(self) -> List[str]:
return self.children
- def command(
- self, cmd: str, child: str = None, extra_args: str = None
- ) -> bool:
+ def command(self, cmd: str, child: str = None, extra_args: str = None) -> bool:
cmd = self.get_cmdline(child) + f"-c {cmd}"
if extra_args is not None:
cmd += f" {extra_args}"
logger.debug(f'About to execute {cmd}')
return tplink_light_command(cmd)
+ @overrides
def turn_on(self, child: str = None) -> bool:
return self.command("on", child)
+ @overrides
def turn_off(self, child: str = None) -> bool:
return self.command("off", child)
+ @overrides
def is_on(self) -> bool:
- return self.get_on_duration_seconds() > 0
+ self.info = self.get_info()
+ if self.info is None:
+ raise Exception('Unable to get info?')
+ return self.info.get("relay_state", 0) == 1
+ @overrides
def is_off(self) -> bool:
return not self.is_on()
+ @overrides
def make_color(self, color: str) -> bool:
raise NotImplementedError
- @timeout(
- 10.0, use_signals=False, error_message="Timed out waiting for tplink.py"
- )
+ @timeout(10.0, use_signals=False, error_message="Timed out waiting for tplink.py")
def get_info(self) -> Optional[Dict]:
cmd = self.get_cmdline() + "-c info"
+ logger.debug(f'Getting status of {self.mac} via "{cmd}"...')
out = subprocess.getoutput(cmd)
+ logger.debug(f'RAW OUT> {out}')
out = re.sub("Sent:.*\n", "", out)
out = re.sub("Received: *", "", out)
try:
self.info = json.loads(out)["system"]["get_sysinfo"]
+ logger.debug(json.dumps(self.info, indent=4, sort_keys=True))
self.info_ts = datetime.datetime.now()
return self.info
except Exception as e:
self.info_ts = None
return None
+ @overrides
+ def status(self) -> str:
+ ret = ''
+ for k, v in self.get_info().items():
+ ret += f'{k} = {v}\n'
+ return ret
+
def get_on_duration_seconds(self, child: str = None) -> int:
self.info = self.get_info()
if child is None:
return int(chi.get("on_time", "0"))
return 0
- def get_on_limit_seconds(self) -> Optional[int]:
- for kw in self.kws:
- m = re.search(r"timeout:(\d+)", kw)
- if m is not None:
- return int(m.group(1)) * 60
- return None
-
+ @overrides
def get_dimmer_level(self) -> Optional[int]:
if not self.has_keyword("dimmer"):
return False
return None
return int(self.info.get("brightness", "0"))
+ @overrides
def set_dimmer_level(self, level: int) -> bool:
if not self.has_keyword("dimmer"):
return False