#!/usr/bin/env python3
+# © Copyright 2021-2022, Scott Gasch
+
"""Utilities for dealing with the smart lights."""
-from abc import abstractmethod
import datetime
-import json
import logging
-import os
import re
-import subprocess
-import sys
+from abc import abstractmethod
from typing import Any, Dict, List, Optional, Tuple
-from overrides import overrides
import tinytuya as tt
+from overrides import overrides
import ansi
import argparse_utils
import arper
import config
-import logging_utils
import smart_home.device as dev
-from google_assistant import ask_google, GoogleResponse
-from decorator_utils import timeout, memoized
+import smart_home.tplink_utils as tplink
+from decorator_utils import memoized
+from google_assistant import GoogleResponse, ask_google
logger = logging.getLogger(__name__)
)
-@timeout(
- 5.0, use_signals=False, error_message="Timed out waiting for tplink.py"
-)
-def tplink_light_command(command: str) -> bool:
- result = os.system(command)
- signal = result & 0xFF
- if signal != 0:
- logger.warning(f'{command} died with signal {signal}')
- logging_utils.hlog("%s died with signal %d" % (command, signal))
- return False
- else:
- exit_value = result >> 8
- if exit_value != 0:
- logger.warning(f'{command} failed, exited {exit_value}')
- logging_utils.hlog("%s failed, exit %d" % (command, exit_value))
- return False
- logger.debug(f'{command} succeeded.')
- return True
-
-
class BaseLight(dev.Device):
+ """A base class representing a smart light."""
+
def __init__(self, name: str, mac: str, keywords: str = "") -> None:
super().__init__(name.strip(), mac.strip(), keywords)
def parse_color_string(color: str) -> Optional[Tuple[int, int, int]]:
m = re.match(
'r#?([0-9a-fA-F][0-9a-fA-F])([0-9a-fA-F][0-9a-fA-F])([0-9a-fA-F][0-9a-fA-F])',
- color
+ color,
)
- if m is not None and len(m.group) == 3:
+ if m is not None and len(m.groups()) == 3:
red = int(m.group(0), 16)
green = int(m.group(1), 16)
blue = int(m.group(2), 16)
color = color.lower()
return ansi.COLOR_NAMES_TO_RGB.get(color, None)
+ @abstractmethod
+ def status(self) -> str:
+ pass
+
@abstractmethod
def turn_on(self) -> bool:
pass
class GoogleLight(BaseLight):
- def __init__(self, name: str, mac: str, keywords: str = "") -> None:
- super().__init__(name, mac, keywords)
+ """A smart light controlled by talking to Google."""
def goog_name(self) -> str:
name = self.get_name()
@overrides
def turn_on(self) -> bool:
- return GoogleLight.parse_google_response(
- ask_google(f"turn {self.goog_name()} on")
- )
+ return GoogleLight.parse_google_response(ask_google(f"turn {self.goog_name()} on"))
@overrides
def turn_off(self) -> bool:
- return GoogleLight.parse_google_response(
- ask_google(f"turn {self.goog_name()} off")
- )
+ return GoogleLight.parse_google_response(ask_google(f"turn {self.goog_name()} off"))
+
+ @overrides
+ def status(self) -> str:
+ if self.is_on():
+ return 'ON'
+ return 'off'
@overrides
def is_on(self) -> bool:
r = ask_google(f"is {self.goog_name()} on?")
if not r.success:
return False
- return 'is on' in r.audio_transcription
+ if r.audio_transcription is not None:
+ return 'is on' in r.audio_transcription
+ raise Exception("Can't reach Google?!")
@overrides
def is_off(self) -> bool:
# the bookcase one is set to 40% bright
txt = r.audio_transcription
- m = re.search(r"(\d+)% bright", txt)
- if m is not None:
- return int(m.group(1))
- if "is off" in txt:
- return 0
+ if txt is not None:
+ m = re.search(r"(\d+)% bright", txt)
+ if m is not None:
+ return int(m.group(1))
+ if "is off" in txt:
+ return 0
return None
@overrides
@overrides
def make_color(self, color: str) -> bool:
- return GoogleLight.parse_google_response(
- ask_google(f"make {self.goog_name()} {color}")
- )
+ return GoogleLight.parse_google_response(ask_google(f"make {self.goog_name()} {color}"))
class TuyaLight(BaseLight):
+ """A Tuya smart light."""
+
ids_by_mac = {
'68:C6:3A:DE:1A:94': '8844664268c63ade1a94',
'68:C6:3A:DE:27:1A': '8844664268c63ade271a',
def get_status(self) -> Dict[str, Any]:
return self.bulb.status()
+ @overrides
+ def status(self) -> str:
+ ret = ''
+ for k, v in self.bulb.status().items():
+ ret += f'{k} = {v}\n'
+ return ret
+
@overrides
def turn_on(self) -> bool:
self.bulb.turn_on()
@overrides
def set_dimmer_level(self, level: int) -> bool:
+ logger.debug('Setting brightness to %d', level)
self.bulb.set_brightness(level)
return True
@overrides
def make_color(self, color: str) -> bool:
rgb = BaseLight.parse_color_string(color)
+ logger.debug('Light color: %s -> %s', color, rgb)
if rgb is not None:
self.bulb.set_colour(rgb[0], rgb[1], rgb[2])
return True
class TPLinkLight(BaseLight):
+ """A TPLink smart light."""
+
def __init__(self, name: str, mac: str, keywords: str = "") -> None:
super().__init__(name, mac, keywords)
self.children: List[str] = []
self.info: Optional[Dict] = None
self.info_ts: Optional[datetime.datetime] = None
- if "children" in self.keywords:
- self.info = self.get_info()
- if self.info is not None:
- for child in self.info["children"]:
- self.children.append(child["id"])
+ if self.keywords is not None:
+ if "children" in self.keywords:
+ self.info = self.get_info()
+ if self.info is not None:
+ for child in self.info["children"]:
+ self.children.append(child["id"])
@memoized
def get_tplink_name(self) -> Optional[str]:
def get_children(self) -> List[str]:
return self.children
- def command(
- self, cmd: str, child: str = None, extra_args: str = None
- ) -> bool:
+ def command(self, cmd: str, child: str = None, extra_args: str = None) -> bool:
cmd = self.get_cmdline(child) + f"-c {cmd}"
if extra_args is not None:
cmd += f" {extra_args}"
- logger.debug(f'About to execute {cmd}')
- return tplink_light_command(cmd)
+ logger.debug('About to execute: %s', cmd)
+ return tplink.tplink_command_wrapper(cmd)
@overrides
- def turn_on(self, child: str = None) -> bool:
- return self.command("on", child)
+ def turn_on(self) -> bool:
+ return self.command("on", None)
@overrides
- def turn_off(self, child: str = None) -> bool:
+ def turn_off(self) -> bool:
+ return self.command("off", None)
+
+ def turn_on_child(self, child: str = None) -> bool:
+ return self.command("on", child)
+
+ def turn_off_child(self, child: str = None) -> bool:
return self.command("off", child)
@overrides
def is_on(self) -> bool:
- return self.get_on_duration_seconds() > 0
+ self.info = self.get_info()
+ if self.info is None:
+ raise Exception('Unable to get info?')
+ return self.info.get("relay_state", 0) == 1
@overrides
def is_off(self) -> bool:
def make_color(self, color: str) -> bool:
raise NotImplementedError
- @timeout(
- 10.0, use_signals=False, error_message="Timed out waiting for tplink.py"
- )
def get_info(self) -> Optional[Dict]:
- cmd = self.get_cmdline() + "-c info"
- out = subprocess.getoutput(cmd)
- out = re.sub("Sent:.*\n", "", out)
- out = re.sub("Received: *", "", out)
- try:
- self.info = json.loads(out)["system"]["get_sysinfo"]
- self.info_ts = datetime.datetime.now()
+ ip = self.get_ip()
+ if ip is not None:
+ self.info = tplink.tplink_get_info(ip)
+ if self.info is not None:
+ self.info_ts = datetime.datetime.now()
+ else:
+ self.info_ts = None
return self.info
- except Exception as e:
- logger.exception(e)
- print(out, file=sys.stderr)
- self.info = None
- self.info_ts = None
- return None
+ return None
+
+ @overrides
+ def status(self) -> str:
+ ret = ''
+ info = self.get_info()
+ if info is not None:
+ for k, v in info:
+ ret += f'{k} = {v}\n'
+ return ret
def get_on_duration_seconds(self, child: str = None) -> int:
self.info = self.get_info()
return False
cmd = (
self.get_cmdline()
- + f'-j \'{{"smartlife.iot.dimmer":{{"set_brightness":{{"brightness":{level} }} }} }}\''
+ + '-j \'{"smartlife.iot.dimmer":{"set_brightness":{"brightness":%d}}}\'' % level
)
- return tplink_light_command(cmd)
+ return tplink.tplink_command_wrapper(cmd)
# class GoogleLightGroup(GoogleLight):