import argparse_utils
import config
-import logical_search
import logging_utils
-import google_assistant as goog
+import logical_search
+from google_assistant import ask_google, GoogleResponse
from decorator_utils import timeout, memoized
logger = logging.getLogger(__name__)
result = os.system(command)
signal = result & 0xFF
if signal != 0:
+ logger.warning(f'{command} died with signal {signal}')
logging_utils.hlog("%s died with signal %d" % (command, signal))
return False
else:
exit_value = result >> 8
if exit_value != 0:
+ logger.warning(f'{command} failed, exited {exit_value}')
logging_utils.hlog("%s failed, exit %d" % (command, exit_value))
return False
+ logger.debug(f'{command} succeeded.')
return True
def turn_off(self) -> bool:
pass
+ @abstractmethod
+ def is_on(self) -> bool:
+ pass
+
+ @abstractmethod
+ def is_off(self) -> bool:
+ pass
+
+ @abstractmethod
+ def get_dimmer_level(self) -> Optional[int]:
+ pass
+
@abstractmethod
def set_dimmer_level(self, level: int) -> bool:
pass
return name.replace("_", " ")
@staticmethod
- def parse_google_response(response: goog.GoogleResponse) -> bool:
+ def parse_google_response(response: GoogleResponse) -> bool:
return response.success
def turn_on(self) -> bool:
return GoogleLight.parse_google_response(
- goog.ask_google(f"turn {self.goog_name()} on")
+ ask_google(f"turn {self.goog_name()} on")
)
def turn_off(self) -> bool:
return GoogleLight.parse_google_response(
- goog.ask_google(f"turn {self.goog_name()} off")
+ ask_google(f"turn {self.goog_name()} off")
)
+ def is_on(self) -> bool:
+ r = ask_google(f"is {self.goog_name()} on?")
+ if not r.success:
+ return False
+ return 'is on' in r.audio_transcription
+
+ def is_off(self) -> bool:
+ return not self.is_on()
+
+ def get_dimmer_level(self) -> Optional[int]:
+ if not self.has_keyword("dimmer"):
+ return False
+ r = ask_google(f'how bright is {self.goog_name()}?')
+ if not r.success:
+ return None
+
+ # the bookcase one is set to 40% bright
+ txt = r.audio_transcription
+ m = re.search(r"(\d+)% bright", txt)
+ if m is not None:
+ return int(m.group(1))
+ if "is off" in txt:
+ return 0
+ return None
+
def set_dimmer_level(self, level: int) -> bool:
+ if not self.has_keyword("dimmer"):
+ return False
if 0 <= level <= 100:
- return GoogleLight.parse_google_response(
- goog.ask_google(f"set {self.goog_name()} to {level} percent")
- )
+ was_on = self.is_on()
+ r = ask_google(f"set {self.goog_name()} to {level} percent")
+ if not r.success:
+ return False
+ if not was_on:
+ self.turn_off()
+ return True
return False
def make_color(self, color: str) -> bool:
return GoogleLight.parse_google_response(
- goog.ask_google(f"make {self.goog_name()} {color}")
+ ask_google(f"make {self.goog_name()} {color}")
)
cmd = self.get_cmdline(child) + f"-c {cmd}"
if extra_args is not None:
cmd += f" {extra_args}"
+ logger.debug(f'About to execute {cmd}')
return tplink_light_command(cmd)
def turn_on(self, child: str = None) -> bool:
def turn_off(self, child: str = None) -> bool:
return self.command("off", child)
+ def is_on(self) -> bool:
+ return self.get_on_duration_seconds() > 0
+
+ def is_off(self) -> bool:
+ return not self.is_on()
+
def make_color(self, color: str) -> bool:
raise NotImplementedError
return int(m.group(1)) * 60
return None
+ def get_dimmer_level(self) -> Optional[int]:
+ if not self.has_keyword("dimmer"):
+ return False
+ self.info = self.get_info()
+ if self.info is None:
+ return None
+ return int(self.info.get("brightness", "0"))
+
def set_dimmer_level(self, level: int) -> bool:
if not self.has_keyword("dimmer"):
return False
return tplink_light_command(cmd)
+class GoogleLightGroup(GoogleLight):
+ def __init__(self, name: str, members: List[GoogleLight], keywords: str = "") -> None:
+ if len(members) < 1:
+ raise Exception("There must be at least one light in the group.")
+ self.members = members
+ mac = GoogleLightGroup.make_up_mac(members)
+ super().__init__(name, mac, keywords)
+
+ @staticmethod
+ def make_up_mac(members: List[GoogleLight]):
+ mac = members[0].get_mac()
+ b = mac.split(':')
+ b[5] = int(b[5], 16) + 1
+ if b[5] > 255:
+ b[5] = 0
+ b[5] = str(b[5])
+ return ":".join(b)
+
+ def is_on(self) -> bool:
+ r = ask_google(f"are {self.goog_name()} on?")
+ if not r.success:
+ return False
+ return 'is on' in r.audio_transcription
+
+ def get_dimmer_level(self) -> Optional[int]:
+ if not self.has_keyword("dimmer"):
+ return False
+ r = ask_google(f'how bright are {self.goog_name()}?')
+ if not r.success:
+ return None
+
+ # four lights are set to 100% brightness
+ txt = r.audio_transcription
+ m = re.search(r"(\d+)% bright", txt)
+ if m is not None:
+ return int(m.group(1))
+ if "is off" in txt:
+ return 0
+ return None
+
+ def set_dimmer_level(self, level: int) -> bool:
+ if not self.has_keyword("dimmer"):
+ return False
+ if 0 <= level <= 100:
+ was_on = self.is_on()
+ r = ask_google(f"set {self.goog_name()} to {level} percent")
+ if not r.success:
+ return False
+ if not was_on:
+ self.turn_off()
+ return True
+ return False
+
+ def make_color(self, color: str) -> bool:
+ return GoogleLight.parse_google_response(
+ ask_google(f"make {self.goog_name()} {color}")
+ )
+
+
+def group_google_lights(lights: List[Light]) -> List[Light]:
+ bookcase_group = []
+ diningroom_group = []
+ for light in lights:
+ name = light.get_name()
+ if "bookcase_light_" in name:
+ bookcase_group.append(light)
+ elif "diningroom_light_" in name:
+ diningroom_group.append(light)
+
+ did_bookcase = False
+ did_diningroom = False
+ ret = []
+ for light in lights:
+ name = light.get_name()
+ if "bookcase_light_" in name:
+ if len(bookcase_group) == 4 and not did_bookcase:
+ ret.append(
+ GoogleLightGroup(
+ "bookcase_lights",
+ bookcase_group,
+ "perm wifi light smart goog dimmer"
+ )
+ )
+ did_bookcase = True
+ elif "diningroom_light_" in name:
+ if len(diningroom_group) == 2 and not did_diningroom:
+ ret.append(
+ GoogleLightGroup(
+ "dining_room_lights",
+ diningroom_group,
+ "intermittent wifi light smart goog dimmer"
+ )
+ )
+ did_diningroom = True
+ else:
+ ret.append(light)
+ return ret
+
+
class LightingConfig(object):
"""Representation of the smart light device config."""
self.corpus = logical_search.Corpus()
with open(config_file, "r") as f:
contents = f.readlines()
+
+ diningroom_lights = []
+ bookcase_lights = []
for line in contents:
line = line.rstrip("\n")
line = re.sub(r"#.*$", r"", line)
keywords = keywords.strip()
if "perm" not in keywords:
continue
- properties = [("name", name)]
- tags = set()
- for kw in keywords.split():
- if ":" in kw:
- key, value = kw.split(":")
- properties.append((key, value))
- else:
- tags.add(kw)
- properties.append(("name", name))
self.macs_by_name[name] = mac
self._keywords_by_name[name] = keywords
self.keywords_by_mac[mac] = keywords
self.names_by_mac[mac] = name
- self.corpus.add_doc(
- logical_search.Document(
- docid=mac,
- tags=tags,
- properties=properties,
- reference=None,
- )
+
+ if "bookcase_light_" in name:
+ bookcase_lights.append(mac)
+ elif "diningroom_light_" in name:
+ diningroom_lights.append(mac)
+ else:
+ self.index_light(name, keywords, mac)
+
+ name = 'bookcase_lights'
+ group = []
+ keywords = 'perm wifi light smart goog dimmer'
+ for b in bookcase_lights:
+ group.append(self.get_light_by_mac(b))
+ self.bookcase_group = GoogleLightGroup(
+ name,
+ group,
+ keywords,
+ )
+ mac = self.bookcase_group.get_mac()
+ self.macs_by_name[name] = mac
+ self._keywords_by_name[name] = keywords
+ self.keywords_by_mac[mac] = keywords
+ self.names_by_mac[mac] = name
+ self.index_light(name, keywords, mac)
+
+ name = 'dining_room_lights'
+ group = []
+ for b in diningroom_lights:
+ group.append(self.get_light_by_mac(b))
+ self.diningroom_group = GoogleLightGroup(
+ name,
+ group,
+ keywords,
+ )
+ mac = self.diningroom_group.get_mac()
+ self.macs_by_name[name] = mac
+ self._keywords_by_name[name] = keywords
+ self.keywords_by_mac[mac] = keywords
+ self.names_by_mac[mac] = name
+ self.index_light(name, keywords, mac)
+
+ def index_light(self, name: str, keywords: str, mac: str) -> None:
+ properties = [("name", name)]
+ tags = set()
+ for kw in keywords.split():
+ if ":" in kw:
+ key, value = kw.split(":")
+ properties.append((key, value))
+ else:
+ tags.add(kw)
+ self.corpus.add_doc(
+ logical_search.Document(
+ docid=mac,
+ tags=tags,
+ properties=properties,
+ reference=None,
)
+ )
def __repr__(self) -> str:
s = "Known devices:\n"
light = self.get_light_by_mac(mac)
if light is not None:
retval.append(light)
- return retval
+ return group_google_lights(retval)
def get_light_by_mac(self, mac: str) -> Optional[Light]:
if mac in self.keywords_by_mac:
name = self.names_by_mac[mac]
kws = self.keywords_by_mac[mac]
- if "tplink" in kws.lower():
+ if name == 'bookcase_lights':
+ return self.bookcase_group
+ elif name == 'dining_room_lights':
+ return self.diningroom_group
+ elif "tplink" in kws.lower():
return TPLinkLight(name, mac, kws)
else:
return GoogleLight(name, mac, kws)
light = self.get_light_by_mac(mac)
if light is not None:
retval.append(light)
- return retval
+ return group_google_lights(retval)