3 """Utilities for dealing with the smart lights."""
5 from abc import ABC, abstractmethod
13 from typing import Dict, List, Optional, Set
19 import google_assistant as goog
20 from decorator_utils import timeout, memoized
22 logger = logging.getLogger(__name__)
24 parser = config.add_commandline_args(
25 f"Light Utils ({__file__})",
26 "Args related to light utilities."
29 '--light_utils_tplink_location',
30 default='/home/scott/bin/tplink.py',
32 help='The location of the tplink.py helper',
33 type=argparse_utils.valid_filename,
36 '--light_utils_network_mac_addresses_location',
37 default='/home/scott/bin/network_mac_addresses.txt',
39 help='The location of network_mac_addresses.txt',
40 type=argparse_utils.valid_filename,
45 5.0, use_signals=False, error_message="Timed out waiting for tplink.py"
47 def tplink_light_command(command: str) -> bool:
48 result = os.system(command)
49 signal = result & 0xFF
51 logging_utils.hlog("%s died with signal %d" % (command, signal))
54 exit_value = result >> 8
56 logging_utils.hlog("%s failed, exit %d" % (command, exit_value))
62 def __init__(self, name: str, mac: str, keywords: str = "") -> None:
63 self.name = name.strip()
64 self.mac = mac.strip()
65 self.keywords = keywords.strip()
66 self.kws = keywords.split()
68 def get_name(self) -> str:
71 def get_mac(self) -> str:
75 def turn_on(self) -> bool:
79 def turn_off(self) -> bool:
83 def set_dimmer_level(self, level: int) -> bool:
87 def make_color(self, color: str) -> bool:
90 def get_keywords(self) -> List[str]:
93 def has_keyword(self, keyword: str) -> bool:
100 class GoogleLight(Light):
101 def __init__(self, name: str, mac: str, keywords: str = "") -> None:
102 super().__init__(name, mac, keywords)
104 def goog_name(self) -> str:
105 name = self.get_name()
106 return name.replace("_", " ")
109 def parse_google_response(response: goog.GoogleResponse) -> bool:
110 return response.success
112 def turn_on(self) -> bool:
113 return GoogleLight.parse_google_response(
114 goog.ask_google(f"turn {self.goog_name()} on")
117 def turn_off(self) -> bool:
118 return GoogleLight.parse_google_response(
119 goog.ask_google(f"turn {self.goog_name()} off")
122 def set_dimmer_level(self, level: int) -> bool:
123 if 0 <= level <= 100:
124 return GoogleLight.parse_google_response(
125 goog.ask_google(f"set {self.goog_name()} to {level} percent")
129 def make_color(self, color: str) -> bool:
130 return GoogleLight.parse_google_response(
131 goog.ask_google(f"make {self.goog_name()} {color}")
135 class TPLinkLight(Light):
136 def __init__(self, name: str, mac: str, keywords: str = "") -> None:
137 super().__init__(name, mac, keywords)
138 self.children: List[str] = []
139 self.info: Optional[Dict] = None
140 self.info_ts: Optional[datetime.datetime] = None
141 if "children" in self.keywords:
142 self.info = self.get_info()
143 if self.info is not None:
144 for child in self.info["children"]:
145 self.children.append(child["id"])
148 def get_tplink_name(self) -> Optional[str]:
149 self.info = self.get_info()
150 if self.info is not None:
151 return self.info["alias"]
154 def get_cmdline(self, child: str = None) -> str:
156 f"{config.config['light_utils_tplink_location']} -m {self.mac} "
157 f"--no_logging_console "
159 if child is not None:
160 cmd += f"-x {child} "
163 def get_children(self) -> List[str]:
167 self, cmd: str, child: str = None, extra_args: str = None
169 cmd = self.get_cmdline(child) + f"-c {cmd}"
170 if extra_args is not None:
171 cmd += f" {extra_args}"
172 return tplink_light_command(cmd)
174 def turn_on(self, child: str = None) -> bool:
175 return self.command("on", child)
177 def turn_off(self, child: str = None) -> bool:
178 return self.command("off", child)
180 def make_color(self, color: str) -> bool:
181 raise NotImplementedError
184 10.0, use_signals=False, error_message="Timed out waiting for tplink.py"
186 def get_info(self) -> Optional[Dict]:
187 cmd = self.get_cmdline() + "-c info"
188 out = subprocess.getoutput(cmd)
189 out = re.sub("Sent:.*\n", "", out)
190 out = re.sub("Received: *", "", out)
192 self.info = json.loads(out)["system"]["get_sysinfo"]
193 self.info_ts = datetime.datetime.now()
195 except Exception as e:
197 print(out, file=sys.stderr)
202 def get_on_duration_seconds(self, child: str = None) -> int:
203 self.info = self.get_info()
205 if self.info is None:
207 return int(self.info.get("on_time", "0"))
209 if self.info is None:
211 for chi in self.info.get("children", {}):
212 if chi["id"] == child:
213 return int(chi.get("on_time", "0"))
216 def get_on_limit_seconds(self) -> Optional[int]:
218 m = re.search(r"timeout:(\d+)", kw)
220 return int(m.group(1)) * 60
223 def set_dimmer_level(self, level: int) -> bool:
224 if not self.has_keyword("dimmer"):
228 + f'-j \'{{"smartlife.iot.dimmer":{{"set_brightness":{{"brightness":{level} }} }} }}\''
230 return tplink_light_command(cmd)
233 class LightingConfig(object):
234 """Representation of the smart light device config."""
238 config_file: str = None,
240 if config_file is None:
241 config_file = config.config[
242 'light_utils_network_mac_addresses_location'
244 self.macs_by_name = {}
245 self._keywords_by_name = {}
246 self.keywords_by_mac = {}
247 self.names_by_mac = {}
248 self.corpus = logical_search.Corpus()
249 with open(config_file, "r") as f:
250 contents = f.readlines()
251 for line in contents:
252 line = line.rstrip("\n")
253 line = re.sub(r"#.*$", r"", line)
257 (mac, name, keywords) = line.split(",")
260 keywords = keywords.strip()
261 if "perm" not in keywords:
263 properties = [("name", name)]
265 for kw in keywords.split():
267 key, value = kw.split(":")
268 properties.append((key, value))
271 properties.append(("name", name))
272 self.macs_by_name[name] = mac
273 self._keywords_by_name[name] = keywords
274 self.keywords_by_mac[mac] = keywords
275 self.names_by_mac[mac] = name
277 logical_search.Document(
280 properties=properties,
285 def __repr__(self) -> str:
286 s = "Known devices:\n"
287 for name, keywords in self._keywords_by_name.items():
288 mac = self.macs_by_name[name]
289 s += f" {name} ({mac}) => {keywords}\n"
292 def get_keywords_by_name(self, name: str) -> Optional[str]:
293 return self._keywords_by_name.get(name, None)
295 def get_macs_by_name(self, name: str) -> Set[str]:
297 for (mac, lname) in self.names_by_mac.items():
302 def get_macs_by_keyword(self, keyword: str) -> Set[str]:
304 for (mac, keywords) in self.keywords_by_mac.items():
305 if keyword in keywords:
309 def get_light_by_name(self, name: str) -> Optional[Light]:
310 if name in self.macs_by_name:
311 return self.get_light_by_mac(self.macs_by_name[name])
314 def get_all_lights(self) -> List[Light]:
316 for (mac, kws) in self.keywords_by_mac.items():
318 light = self.get_light_by_mac(mac)
319 if light is not None:
323 def get_light_by_mac(self, mac: str) -> Optional[Light]:
324 if mac in self.keywords_by_mac:
325 name = self.names_by_mac[mac]
326 kws = self.keywords_by_mac[mac]
327 if "tplink" in kws.lower():
328 return TPLinkLight(name, mac, kws)
330 return GoogleLight(name, mac, kws)
333 def query(self, query: str) -> List[Light]:
334 """Evaluates a lighting query expression formed of keywords to search
335 for, logical operators (and, or, not), and parenthesis.
336 Returns a list of matching lights.
339 results = self.corpus.query(query)
340 if results is not None:
343 light = self.get_light_by_mac(mac)
344 if light is not None: