import re
import subprocess
import sys
-from typing import Dict, List, Optional, Set
+from typing import Any, Dict, List, Optional, Set
+
+import tinytuya as tt
import argparse_utils
import config
-import logical_search
import logging_utils
-import google_assistant as goog
+import logical_search
+from google_assistant import ask_google, GoogleResponse
from decorator_utils import timeout, memoized
logger = logging.getLogger(__name__)
result = os.system(command)
signal = result & 0xFF
if signal != 0:
+ logger.warning(f'{command} died with signal {signal}')
logging_utils.hlog("%s died with signal %d" % (command, signal))
return False
else:
exit_value = result >> 8
if exit_value != 0:
+ logger.warning(f'{command} failed, exited {exit_value}')
logging_utils.hlog("%s failed, exit %d" % (command, exit_value))
return False
+ logger.debug(f'{command} succeeded.')
return True
def turn_off(self) -> bool:
pass
+ @abstractmethod
+ def is_on(self) -> bool:
+ pass
+
+ @abstractmethod
+ def is_off(self) -> bool:
+ pass
+
+ @abstractmethod
+ def get_dimmer_level(self) -> Optional[int]:
+ pass
+
@abstractmethod
def set_dimmer_level(self, level: int) -> bool:
pass
return name.replace("_", " ")
@staticmethod
- def parse_google_response(response: goog.GoogleResponse) -> bool:
+ def parse_google_response(response: GoogleResponse) -> bool:
return response.success
def turn_on(self) -> bool:
return GoogleLight.parse_google_response(
- goog.ask_google(f"turn {self.goog_name()} on")
+ ask_google(f"turn {self.goog_name()} on")
)
def turn_off(self) -> bool:
return GoogleLight.parse_google_response(
- goog.ask_google(f"turn {self.goog_name()} off")
+ ask_google(f"turn {self.goog_name()} off")
)
+ def is_on(self) -> bool:
+ r = ask_google(f"is {self.goog_name()} on?")
+ if not r.success:
+ return False
+ return 'is on' in r.audio_transcription
+
+ def is_off(self) -> bool:
+ return not self.is_on()
+
+ def get_dimmer_level(self) -> Optional[int]:
+ if not self.has_keyword("dimmer"):
+ return False
+ r = ask_google(f'how bright is {self.goog_name()}?')
+ if not r.success:
+ return None
+
+ # the bookcase one is set to 40% bright
+ txt = r.audio_transcription
+ m = re.search(r"(\d+)% bright", txt)
+ if m is not None:
+ return int(m.group(1))
+ if "is off" in txt:
+ return 0
+ return None
+
def set_dimmer_level(self, level: int) -> bool:
+ if not self.has_keyword("dimmer"):
+ return False
if 0 <= level <= 100:
- return GoogleLight.parse_google_response(
- goog.ask_google(f"set {self.goog_name()} to {level} percent")
- )
+ was_on = self.is_on()
+ r = ask_google(f"set {self.goog_name()} to {level} percent")
+ if not r.success:
+ return False
+ if not was_on:
+ self.turn_off()
+ return True
return False
def make_color(self, color: str) -> bool:
return GoogleLight.parse_google_response(
- goog.ask_google(f"make {self.goog_name()} {color}")
+ ask_google(f"make {self.goog_name()} {color}")
)
+class TuyaLight(Light):
+ ids_by_mac = {
+ '68:C6:3A:DE:1A:94': '8844664268c63ade1a94',
+ '68:C6:3A:DE:27:1A': '8844664268c63ade271a',
+ '68:C6:3A:DE:1D:95': '8844664268c63ade1d95',
+ '68:C6:3A:DE:19:B3': '8844664268c63ade19b3',
+ '80:7D:3A:77:3B:F5': '07445340807d3a773bf5',
+ '80:7D:3A:58:37:02': '07445340807d3a583702',
+ }
+ keys_by_mac = {
+ '68:C6:3A:DE:1A:94': '237f19b1b3d49c36',
+ '68:C6:3A:DE:27:1A': '237f19b1b3d49c36',
+ '68:C6:3A:DE:1D:95': '04b90fc5cd7625d8',
+ '68:C6:3A:DE:19:B3': '2d601f2892f1aefd',
+ '80:7D:3A:77:3B:F5': '27ab921fe4633519',
+ '80:7D:3A:58:37:02': '8559b5416bfa0c05',
+ }
+
+ def __init__(self, name: str, mac: str, keywords: str = "") -> None:
+ from subprocess import Popen, PIPE
+ super().__init__(name, mac, keywords)
+ mac = mac.upper()
+ if mac not in TuyaLight.ids_by_mac or mac not in TuyaLight.keys_by_mac:
+ raise Exception(f'{mac} is unknown; add it to ids_by_mac and keys_by_mac')
+ self.devid = TuyaLight.ids_by_mac[mac]
+ self.key = TuyaLight.keys_by_mac[mac]
+ try:
+ pid = Popen(['maclookup', mac], stdout=PIPE)
+ ip = pid.communicate()[0]
+ ip = ip[:-1]
+ except Exception:
+ ip = '0.0.0.0'
+ self.bulb = tt.BulbDevice(self.devid, ip, local_key=self.key)
+
+ def turn_on(self) -> bool:
+ self.bulb.turn_on()
+ return True
+
+ def turn_off(self) -> bool:
+ self.bulb.turn_off()
+ return True
+
+ def get_status(self) -> Dict[str, Any]:
+ return self.bulb.status()
+
+ def is_on(self) -> bool:
+ s = self.get_status()
+ return s['dps']['1']
+
+ def is_off(self) -> bool:
+ return not self.is_on()
+
+ def get_dimmer_level(self) -> Optional[int]:
+ s = self.get_status()
+ return s['dps']['3']
+
+ def set_dimmer_level(self, level: int) -> bool:
+ self.bulb.set_brightness(level)
+ return True
+
+ def make_color(self, color: str) -> bool:
+ self.bulb.set_colour(255,0,0)
+ return True
+
+
class TPLinkLight(Light):
def __init__(self, name: str, mac: str, keywords: str = "") -> None:
super().__init__(name, mac, keywords)
cmd = self.get_cmdline(child) + f"-c {cmd}"
if extra_args is not None:
cmd += f" {extra_args}"
+ logger.debug(f'About to execute {cmd}')
return tplink_light_command(cmd)
def turn_on(self, child: str = None) -> bool:
def turn_off(self, child: str = None) -> bool:
return self.command("off", child)
+ def is_on(self) -> bool:
+ return self.get_on_duration_seconds() > 0
+
+ def is_off(self) -> bool:
+ return not self.is_on()
+
def make_color(self, color: str) -> bool:
raise NotImplementedError
return int(m.group(1)) * 60
return None
+ def get_dimmer_level(self) -> Optional[int]:
+ if not self.has_keyword("dimmer"):
+ return False
+ self.info = self.get_info()
+ if self.info is None:
+ return None
+ return int(self.info.get("brightness", "0"))
+
def set_dimmer_level(self, level: int) -> bool:
if not self.has_keyword("dimmer"):
return False
return tplink_light_command(cmd)
+class GoogleLightGroup(GoogleLight):
+ def __init__(self, name: str, members: List[GoogleLight], keywords: str = "") -> None:
+ if len(members) < 1:
+ raise Exception("There must be at least one light in the group.")
+ self.members = members
+ mac = GoogleLightGroup.make_up_mac(members)
+ super().__init__(name, mac, keywords)
+
+ @staticmethod
+ def make_up_mac(members: List[GoogleLight]):
+ mac = members[0].get_mac()
+ b = mac.split(':')
+ b[5] = int(b[5], 16) + 1
+ if b[5] > 255:
+ b[5] = 0
+ b[5] = str(b[5])
+ return ":".join(b)
+
+ def is_on(self) -> bool:
+ r = ask_google(f"are {self.goog_name()} on?")
+ if not r.success:
+ return False
+ return 'is on' in r.audio_transcription
+
+ def get_dimmer_level(self) -> Optional[int]:
+ if not self.has_keyword("dimmer"):
+ return False
+ r = ask_google(f'how bright are {self.goog_name()}?')
+ if not r.success:
+ return None
+
+ # four lights are set to 100% brightness
+ txt = r.audio_transcription
+ m = re.search(r"(\d+)% bright", txt)
+ if m is not None:
+ return int(m.group(1))
+ if "is off" in txt:
+ return 0
+ return None
+
+ def set_dimmer_level(self, level: int) -> bool:
+ if not self.has_keyword("dimmer"):
+ return False
+ if 0 <= level <= 100:
+ was_on = self.is_on()
+ r = ask_google(f"set {self.goog_name()} to {level} percent")
+ if not r.success:
+ return False
+ if not was_on:
+ self.turn_off()
+ return True
+ return False
+
+ def make_color(self, color: str) -> bool:
+ return GoogleLight.parse_google_response(
+ ask_google(f"make {self.goog_name()} {color}")
+ )
+
+
class LightingConfig(object):
"""Representation of the smart light device config."""
self.corpus = logical_search.Corpus()
with open(config_file, "r") as f:
contents = f.readlines()
+
+ diningroom_lights = []
+ bookcase_lights = []
for line in contents:
line = line.rstrip("\n")
line = re.sub(r"#.*$", r"", line)
keywords = keywords.strip()
if "perm" not in keywords:
continue
- properties = [("name", name)]
- tags = set()
- for kw in keywords.split():
- if ":" in kw:
- key, value = kw.split(":")
- properties.append((key, value))
- else:
- tags.add(kw)
- properties.append(("name", name))
self.macs_by_name[name] = mac
self._keywords_by_name[name] = keywords
self.keywords_by_mac[mac] = keywords
self.names_by_mac[mac] = name
- self.corpus.add_doc(
- logical_search.Document(
- docid=mac,
- tags=tags,
- properties=properties,
- reference=None,
- )
- )
+
+# if "bookcase_light_" in name:
+# bookcase_lights.append(mac)
+# elif "diningroom_light_" in name:
+# diningroom_lights.append(mac)
+# else:
+ self.index_light(name, keywords, mac)
+
+ # name = 'bookcase_lights'
+ # group = []
+ # keywords = 'perm wifi light smart goog dimmer'
+ # for b in bookcase_lights:
+ # group.append(self.get_light_by_mac(b))
+ # self.bookcase_group = GoogleLightGroup(
+ # name,
+ # group,
+ # keywords,
+ # )
+ # mac = self.bookcase_group.get_mac()
+ # self.macs_by_name[name] = mac
+ # self._keywords_by_name[name] = keywords
+ # self.keywords_by_mac[mac] = keywords
+ # self.names_by_mac[mac] = name
+ # self.index_light(name, keywords, mac)
+
+ # name = 'dining_room_lights'
+ # group = []
+ # for b in diningroom_lights:
+ # group.append(self.get_light_by_mac(b))
+ # self.diningroom_group = GoogleLightGroup(
+ # name,
+ # group,
+ # keywords,
+ # )
+ # mac = self.diningroom_group.get_mac()
+ # self.macs_by_name[name] = mac
+ # self._keywords_by_name[name] = keywords
+ # self.keywords_by_mac[mac] = keywords
+ # self.names_by_mac[mac] = name
+ # self.index_light(name, keywords, mac)
+
+ def index_light(self, name: str, keywords: str, mac: str) -> None:
+ properties = [("name", name)]
+ tags = set()
+ for kw in keywords.split():
+ if ":" in kw:
+ key, value = kw.split(":")
+ properties.append((key, value))
+ else:
+ tags.add(kw)
+ light = logical_search.Document(
+ docid=mac,
+ tags=tags,
+ properties=properties,
+ reference=None,
+ )
+ self.corpus.add_doc(light)
def __repr__(self) -> str:
s = "Known devices:\n"
if mac in self.keywords_by_mac:
name = self.names_by_mac[mac]
kws = self.keywords_by_mac[mac]
- if "tplink" in kws.lower():
+ if name == 'bookcase_lights':
+ return self.bookcase_group
+ elif name == 'dining_room_lights':
+ return self.diningroom_group
+ elif 'tplink' in kws.lower():
return TPLinkLight(name, mac, kws)
+ elif 'tuya' in kws.lower():
+ return TuyaLight(name, mac, kws)
else:
return GoogleLight(name, mac, kws)
return None