3 """Utilities for dealing with the smart lights."""
5 from abc import ABC, abstractmethod
13 from typing import Dict, List, Optional, Set
18 from google_assistant import ask_google, GoogleResponse
19 from decorator_utils import timeout, memoized
21 logger = logging.getLogger(__name__)
23 parser = config.add_commandline_args(
24 f"Light Utils ({__file__})",
25 "Args related to light utilities."
28 '--light_utils_tplink_location',
29 default='/home/scott/bin/tplink.py',
31 help='The location of the tplink.py helper',
32 type=argparse_utils.valid_filename,
35 '--light_utils_network_mac_addresses_location',
36 default='/home/scott/bin/network_mac_addresses.txt',
38 help='The location of network_mac_addresses.txt',
39 type=argparse_utils.valid_filename,
44 5.0, use_signals=False, error_message="Timed out waiting for tplink.py"
46 def tplink_light_command(command: str) -> bool:
47 result = os.system(command)
48 signal = result & 0xFF
50 logging_utils.hlog("%s died with signal %d" % (command, signal))
53 exit_value = result >> 8
55 logging_utils.hlog("%s failed, exit %d" % (command, exit_value))
61 def __init__(self, name: str, mac: str, keywords: str = "") -> None:
62 self.name = name.strip()
63 self.mac = mac.strip()
64 self.keywords = keywords.strip()
65 self.kws = keywords.split()
67 def get_name(self) -> str:
70 def get_mac(self) -> str:
74 def turn_on(self) -> bool:
78 def turn_off(self) -> bool:
82 def is_on(self) -> bool:
86 def is_off(self) -> bool:
90 def get_dimmer_level(self) -> Optional[int]:
94 def set_dimmer_level(self, level: int) -> bool:
98 def make_color(self, color: str) -> bool:
101 def get_keywords(self) -> List[str]:
104 def has_keyword(self, keyword: str) -> bool:
111 class GoogleLight(Light):
112 def __init__(self, name: str, mac: str, keywords: str = "") -> None:
113 super().__init__(name, mac, keywords)
115 def goog_name(self) -> str:
116 name = self.get_name()
117 return name.replace("_", " ")
120 def parse_google_response(response: GoogleResponse) -> bool:
121 return response.success
123 def turn_on(self) -> bool:
124 return GoogleLight.parse_google_response(
125 ask_google(f"turn {self.goog_name()} on")
128 def turn_off(self) -> bool:
129 return GoogleLight.parse_google_response(
130 ask_google(f"turn {self.goog_name()} off")
133 def is_on(self) -> bool:
134 r = ask_google(f"is {self.goog_name()} on?")
137 return 'is on' in r.audio_transcription
139 def is_off(self) -> bool:
140 return not self.is_on()
142 def get_dimmer_level(self) -> Optional[int]:
143 if not self.has_keyword("dimmer"):
145 r = ask_google(f'how bright is {self.goog_name()}?')
149 # the bookcase one is set to 40% bright
150 txt = r.audio_transcription
151 m = re.search(r"(\d+)% bright", txt)
153 return int(m.group(1))
158 def set_dimmer_level(self, level: int) -> bool:
159 if not self.has_keyword("dimmer"):
161 if 0 <= level <= 100:
162 was_on = self.is_on()
163 r = ask_google(f"set {self.goog_name()} to {level} percent")
171 def make_color(self, color: str) -> bool:
172 return GoogleLight.parse_google_response(
173 ask_google(f"make {self.goog_name()} {color}")
177 class TPLinkLight(Light):
178 def __init__(self, name: str, mac: str, keywords: str = "") -> None:
179 super().__init__(name, mac, keywords)
180 self.children: List[str] = []
181 self.info: Optional[Dict] = None
182 self.info_ts: Optional[datetime.datetime] = None
183 if "children" in self.keywords:
184 self.info = self.get_info()
185 if self.info is not None:
186 for child in self.info["children"]:
187 self.children.append(child["id"])
190 def get_tplink_name(self) -> Optional[str]:
191 self.info = self.get_info()
192 if self.info is not None:
193 return self.info["alias"]
196 def get_cmdline(self, child: str = None) -> str:
198 f"{config.config['light_utils_tplink_location']} -m {self.mac} "
199 f"--no_logging_console "
201 if child is not None:
202 cmd += f"-x {child} "
205 def get_children(self) -> List[str]:
209 self, cmd: str, child: str = None, extra_args: str = None
211 cmd = self.get_cmdline(child) + f"-c {cmd}"
212 if extra_args is not None:
213 cmd += f" {extra_args}"
214 return tplink_light_command(cmd)
216 def turn_on(self, child: str = None) -> bool:
217 return self.command("on", child)
219 def turn_off(self, child: str = None) -> bool:
220 return self.command("off", child)
222 def is_on(self) -> bool:
223 return self.get_on_duration_seconds() > 0
225 def is_off(self) -> bool:
226 return not self.is_on()
228 def make_color(self, color: str) -> bool:
229 raise NotImplementedError
232 10.0, use_signals=False, error_message="Timed out waiting for tplink.py"
234 def get_info(self) -> Optional[Dict]:
235 cmd = self.get_cmdline() + "-c info"
236 out = subprocess.getoutput(cmd)
237 out = re.sub("Sent:.*\n", "", out)
238 out = re.sub("Received: *", "", out)
240 self.info = json.loads(out)["system"]["get_sysinfo"]
241 self.info_ts = datetime.datetime.now()
243 except Exception as e:
245 print(out, file=sys.stderr)
250 def get_on_duration_seconds(self, child: str = None) -> int:
251 self.info = self.get_info()
253 if self.info is None:
255 return int(self.info.get("on_time", "0"))
257 if self.info is None:
259 for chi in self.info.get("children", {}):
260 if chi["id"] == child:
261 return int(chi.get("on_time", "0"))
264 def get_on_limit_seconds(self) -> Optional[int]:
266 m = re.search(r"timeout:(\d+)", kw)
268 return int(m.group(1)) * 60
271 def get_dimmer_level(self) -> Optional[int]:
272 if not self.has_keyword("dimmer"):
274 self.info = self.get_info()
275 if self.info is None:
277 return int(self.info.get("brightness", "0"))
279 def set_dimmer_level(self, level: int) -> bool:
280 if not self.has_keyword("dimmer"):
284 + f'-j \'{{"smartlife.iot.dimmer":{{"set_brightness":{{"brightness":{level} }} }} }}\''
286 return tplink_light_command(cmd)
289 class LightingConfig(object):
290 """Representation of the smart light device config."""
294 config_file: str = None,
296 import logical_search
297 if config_file is None:
298 config_file = config.config[
299 'light_utils_network_mac_addresses_location'
301 self.macs_by_name = {}
302 self._keywords_by_name = {}
303 self.keywords_by_mac = {}
304 self.names_by_mac = {}
305 self.corpus = logical_search.Corpus()
306 with open(config_file, "r") as f:
307 contents = f.readlines()
308 for line in contents:
309 line = line.rstrip("\n")
310 line = re.sub(r"#.*$", r"", line)
314 (mac, name, keywords) = line.split(",")
317 keywords = keywords.strip()
318 if "perm" not in keywords:
320 properties = [("name", name)]
322 for kw in keywords.split():
324 key, value = kw.split(":")
325 properties.append((key, value))
328 properties.append(("name", name))
329 self.macs_by_name[name] = mac
330 self._keywords_by_name[name] = keywords
331 self.keywords_by_mac[mac] = keywords
332 self.names_by_mac[mac] = name
334 logical_search.Document(
337 properties=properties,
342 def __repr__(self) -> str:
343 s = "Known devices:\n"
344 for name, keywords in self._keywords_by_name.items():
345 mac = self.macs_by_name[name]
346 s += f" {name} ({mac}) => {keywords}\n"
349 def get_keywords_by_name(self, name: str) -> Optional[str]:
350 return self._keywords_by_name.get(name, None)
352 def get_macs_by_name(self, name: str) -> Set[str]:
354 for (mac, lname) in self.names_by_mac.items():
359 def get_macs_by_keyword(self, keyword: str) -> Set[str]:
361 for (mac, keywords) in self.keywords_by_mac.items():
362 if keyword in keywords:
366 def get_light_by_name(self, name: str) -> Optional[Light]:
367 if name in self.macs_by_name:
368 return self.get_light_by_mac(self.macs_by_name[name])
371 def get_all_lights(self) -> List[Light]:
373 for (mac, kws) in self.keywords_by_mac.items():
375 light = self.get_light_by_mac(mac)
376 if light is not None:
380 def get_light_by_mac(self, mac: str) -> Optional[Light]:
381 if mac in self.keywords_by_mac:
382 name = self.names_by_mac[mac]
383 kws = self.keywords_by_mac[mac]
384 if "tplink" in kws.lower():
385 return TPLinkLight(name, mac, kws)
387 return GoogleLight(name, mac, kws)
390 def query(self, query: str) -> List[Light]:
391 """Evaluates a lighting query expression formed of keywords to search
392 for, logical operators (and, or, not), and parenthesis.
393 Returns a list of matching lights.
396 results = self.corpus.query(query)
397 if results is not None:
400 light = self.get_light_by_mac(mac)
401 if light is not None: