Adds logging in light stuff, minor changes in config/string.
[python_utils.git] / light_utils.py
1 #!/usr/bin/env python3
2
3 """Utilities for dealing with the smart lights."""
4
5 from abc import ABC, abstractmethod
6 import datetime
7 import json
8 import logging
9 import os
10 import re
11 import subprocess
12 import sys
13 from typing import Dict, List, Optional, Set
14
15 import argparse_utils
16 import config
17 import logging_utils
18 import logical_search
19 from google_assistant import ask_google, GoogleResponse
20 from decorator_utils import timeout, memoized
21
22 logger = logging.getLogger(__name__)
23
24 parser = config.add_commandline_args(
25     f"Light Utils ({__file__})",
26     "Args related to light utilities."
27 )
28 parser.add_argument(
29     '--light_utils_tplink_location',
30     default='/home/scott/bin/tplink.py',
31     metavar='FILENAME',
32     help='The location of the tplink.py helper',
33     type=argparse_utils.valid_filename,
34 )
35 parser.add_argument(
36     '--light_utils_network_mac_addresses_location',
37     default='/home/scott/bin/network_mac_addresses.txt',
38     metavar='FILENAME',
39     help='The location of network_mac_addresses.txt',
40     type=argparse_utils.valid_filename,
41 )
42
43
44 @timeout(
45     5.0, use_signals=False, error_message="Timed out waiting for tplink.py"
46 )
47 def tplink_light_command(command: str) -> bool:
48     result = os.system(command)
49     signal = result & 0xFF
50     if signal != 0:
51         logger.warning(f'{command} died with signal {signal}')
52         logging_utils.hlog("%s died with signal %d" % (command, signal))
53         return False
54     else:
55         exit_value = result >> 8
56         if exit_value != 0:
57             logger.warning(f'{command} failed, exited {exit_value}')
58             logging_utils.hlog("%s failed, exit %d" % (command, exit_value))
59             return False
60     logger.debug(f'{command} succeeded.')
61     return True
62
63
64 class Light(ABC):
65     def __init__(self, name: str, mac: str, keywords: str = "") -> None:
66         self.name = name.strip()
67         self.mac = mac.strip()
68         self.keywords = keywords.strip()
69         self.kws = keywords.split()
70
71     def get_name(self) -> str:
72         return self.name
73
74     def get_mac(self) -> str:
75         return self.mac
76
77     @abstractmethod
78     def turn_on(self) -> bool:
79         pass
80
81     @abstractmethod
82     def turn_off(self) -> bool:
83         pass
84
85     @abstractmethod
86     def is_on(self) -> bool:
87         pass
88
89     @abstractmethod
90     def is_off(self) -> bool:
91         pass
92
93     @abstractmethod
94     def get_dimmer_level(self) -> Optional[int]:
95         pass
96
97     @abstractmethod
98     def set_dimmer_level(self, level: int) -> bool:
99         pass
100
101     @abstractmethod
102     def make_color(self, color: str) -> bool:
103         pass
104
105     def get_keywords(self) -> List[str]:
106         return self.kws
107
108     def has_keyword(self, keyword: str) -> bool:
109         for kw in self.kws:
110             if kw == keyword:
111                 return True
112         return False
113
114
115 class GoogleLight(Light):
116     def __init__(self, name: str, mac: str, keywords: str = "") -> None:
117         super().__init__(name, mac, keywords)
118
119     def goog_name(self) -> str:
120         name = self.get_name()
121         return name.replace("_", " ")
122
123     @staticmethod
124     def parse_google_response(response: GoogleResponse) -> bool:
125         return response.success
126
127     def turn_on(self) -> bool:
128         return GoogleLight.parse_google_response(
129             ask_google(f"turn {self.goog_name()} on")
130         )
131
132     def turn_off(self) -> bool:
133         return GoogleLight.parse_google_response(
134             ask_google(f"turn {self.goog_name()} off")
135         )
136
137     def is_on(self) -> bool:
138         r = ask_google(f"is {self.goog_name()} on?")
139         if not r.success:
140             return False
141         return 'is on' in r.audio_transcription
142
143     def is_off(self) -> bool:
144         return not self.is_on()
145
146     def get_dimmer_level(self) -> Optional[int]:
147         if not self.has_keyword("dimmer"):
148             return False
149         r = ask_google(f'how bright is {self.goog_name()}?')
150         if not r.success:
151             return None
152
153         # the bookcase one is set to 40% bright
154         txt = r.audio_transcription
155         m = re.search(r"(\d+)% bright", txt)
156         if m is not None:
157             return int(m.group(1))
158         if "is off" in txt:
159             return 0
160         return None
161
162     def set_dimmer_level(self, level: int) -> bool:
163         if not self.has_keyword("dimmer"):
164             return False
165         if 0 <= level <= 100:
166             was_on = self.is_on()
167             r = ask_google(f"set {self.goog_name()} to {level} percent")
168             if not r.success:
169                 return False
170             if not was_on:
171                 self.turn_off()
172             return True
173         return False
174
175     def make_color(self, color: str) -> bool:
176         return GoogleLight.parse_google_response(
177             ask_google(f"make {self.goog_name()} {color}")
178         )
179
180
181 class TPLinkLight(Light):
182     def __init__(self, name: str, mac: str, keywords: str = "") -> None:
183         super().__init__(name, mac, keywords)
184         self.children: List[str] = []
185         self.info: Optional[Dict] = None
186         self.info_ts: Optional[datetime.datetime] = None
187         if "children" in self.keywords:
188             self.info = self.get_info()
189             if self.info is not None:
190                 for child in self.info["children"]:
191                     self.children.append(child["id"])
192
193     @memoized
194     def get_tplink_name(self) -> Optional[str]:
195         self.info = self.get_info()
196         if self.info is not None:
197             return self.info["alias"]
198         return None
199
200     def get_cmdline(self, child: str = None) -> str:
201         cmd = (
202             f"{config.config['light_utils_tplink_location']} -m {self.mac} "
203             f"--no_logging_console "
204         )
205         if child is not None:
206             cmd += f"-x {child} "
207         return cmd
208
209     def get_children(self) -> List[str]:
210         return self.children
211
212     def command(
213         self, cmd: str, child: str = None, extra_args: str = None
214     ) -> bool:
215         cmd = self.get_cmdline(child) + f"-c {cmd}"
216         if extra_args is not None:
217             cmd += f" {extra_args}"
218         logger.debug(f'About to execute {cmd}')
219         return tplink_light_command(cmd)
220
221     def turn_on(self, child: str = None) -> bool:
222         return self.command("on", child)
223
224     def turn_off(self, child: str = None) -> bool:
225         return self.command("off", child)
226
227     def is_on(self) -> bool:
228         return self.get_on_duration_seconds() > 0
229
230     def is_off(self) -> bool:
231         return not self.is_on()
232
233     def make_color(self, color: str) -> bool:
234         raise NotImplementedError
235
236     @timeout(
237         10.0, use_signals=False, error_message="Timed out waiting for tplink.py"
238     )
239     def get_info(self) -> Optional[Dict]:
240         cmd = self.get_cmdline() + "-c info"
241         out = subprocess.getoutput(cmd)
242         out = re.sub("Sent:.*\n", "", out)
243         out = re.sub("Received: *", "", out)
244         try:
245             self.info = json.loads(out)["system"]["get_sysinfo"]
246             self.info_ts = datetime.datetime.now()
247             return self.info
248         except Exception as e:
249             logger.exception(e)
250             print(out, file=sys.stderr)
251             self.info = None
252             self.info_ts = None
253             return None
254
255     def get_on_duration_seconds(self, child: str = None) -> int:
256         self.info = self.get_info()
257         if child is None:
258             if self.info is None:
259                 return 0
260             return int(self.info.get("on_time", "0"))
261         else:
262             if self.info is None:
263                 return 0
264             for chi in self.info.get("children", {}):
265                 if chi["id"] == child:
266                     return int(chi.get("on_time", "0"))
267         return 0
268
269     def get_on_limit_seconds(self) -> Optional[int]:
270         for kw in self.kws:
271             m = re.search(r"timeout:(\d+)", kw)
272             if m is not None:
273                 return int(m.group(1)) * 60
274         return None
275
276     def get_dimmer_level(self) -> Optional[int]:
277         if not self.has_keyword("dimmer"):
278             return False
279         self.info = self.get_info()
280         if self.info is None:
281             return None
282         return int(self.info.get("brightness", "0"))
283
284     def set_dimmer_level(self, level: int) -> bool:
285         if not self.has_keyword("dimmer"):
286             return False
287         cmd = (
288             self.get_cmdline()
289             + f'-j \'{{"smartlife.iot.dimmer":{{"set_brightness":{{"brightness":{level} }} }} }}\''
290         )
291         return tplink_light_command(cmd)
292
293
294 class GoogleLightGroup(GoogleLight):
295     def __init__(self, name: str, members: List[GoogleLight], keywords: str = "") -> None:
296         if len(members) < 1:
297             raise Exception("There must be at least one light in the group.")
298         self.members = members
299         mac = GoogleLightGroup.make_up_mac(members)
300         super().__init__(name, mac, keywords)
301
302     @staticmethod
303     def make_up_mac(members: List[GoogleLight]):
304         mac = members[0].get_mac()
305         b = mac.split(':')
306         b[5] = int(b[5], 16) + 1
307         if b[5] > 255:
308             b[5] = 0
309         b[5] = str(b[5])
310         return ":".join(b)
311
312     def is_on(self) -> bool:
313         r = ask_google(f"are {self.goog_name()} on?")
314         if not r.success:
315             return False
316         return 'is on' in r.audio_transcription
317
318     def get_dimmer_level(self) -> Optional[int]:
319         if not self.has_keyword("dimmer"):
320             return False
321         r = ask_google(f'how bright are {self.goog_name()}?')
322         if not r.success:
323             return None
324
325         # four lights are set to 100% brightness
326         txt = r.audio_transcription
327         m = re.search(r"(\d+)% bright", txt)
328         if m is not None:
329             return int(m.group(1))
330         if "is off" in txt:
331             return 0
332         return None
333
334     def set_dimmer_level(self, level: int) -> bool:
335         if not self.has_keyword("dimmer"):
336             return False
337         if 0 <= level <= 100:
338             was_on = self.is_on()
339             r = ask_google(f"set {self.goog_name()} to {level} percent")
340             if not r.success:
341                 return False
342             if not was_on:
343                 self.turn_off()
344             return True
345         return False
346
347     def make_color(self, color: str) -> bool:
348         return GoogleLight.parse_google_response(
349             ask_google(f"make {self.goog_name()} {color}")
350         )
351
352
353 def group_google_lights(lights: List[Light]) -> List[Light]:
354     bookcase_group = []
355     diningroom_group = []
356     for light in lights:
357         name = light.get_name()
358         if "bookcase_light_" in name:
359             bookcase_group.append(light)
360         elif "diningroom_light_" in name:
361             diningroom_group.append(light)
362
363     did_bookcase = False
364     did_diningroom = False
365     ret = []
366     for light in lights:
367         name = light.get_name()
368         if "bookcase_light_" in name:
369             if len(bookcase_group) == 4 and not did_bookcase:
370                 ret.append(
371                     GoogleLightGroup(
372                         "bookcase_lights",
373                         bookcase_group,
374                         "perm wifi light smart goog dimmer"
375                     )
376                 )
377                 did_bookcase = True
378         elif "diningroom_light_" in name:
379             if len(diningroom_group) == 2 and not did_diningroom:
380                 ret.append(
381                     GoogleLightGroup(
382                         "dining_room_lights",
383                         diningroom_group,
384                         "intermittent wifi light smart goog dimmer"
385                     )
386                 )
387                 did_diningroom = True
388         else:
389             ret.append(light)
390     return ret
391
392
393 class LightingConfig(object):
394     """Representation of the smart light device config."""
395
396     def __init__(
397             self,
398             config_file: str = None,
399     ) -> None:
400         if config_file is None:
401             config_file = config.config[
402                 'light_utils_network_mac_addresses_location'
403             ]
404         self.macs_by_name = {}
405         self._keywords_by_name = {}
406         self.keywords_by_mac = {}
407         self.names_by_mac = {}
408         self.corpus = logical_search.Corpus()
409         with open(config_file, "r") as f:
410             contents = f.readlines()
411
412         diningroom_lights = []
413         bookcase_lights = []
414         for line in contents:
415             line = line.rstrip("\n")
416             line = re.sub(r"#.*$", r"", line)
417             line = line.strip()
418             if line == "":
419                 continue
420             (mac, name, keywords) = line.split(",")
421             mac = mac.strip()
422             name = name.strip()
423             keywords = keywords.strip()
424             if "perm" not in keywords:
425                 continue
426             self.macs_by_name[name] = mac
427             self._keywords_by_name[name] = keywords
428             self.keywords_by_mac[mac] = keywords
429             self.names_by_mac[mac] = name
430
431             if "bookcase_light_" in name:
432                 bookcase_lights.append(mac)
433             elif "diningroom_light_" in name:
434                 diningroom_lights.append(mac)
435             else:
436                 self.index_light(name, keywords, mac)
437
438         name = 'bookcase_lights'
439         group = []
440         keywords = 'perm wifi light smart goog dimmer'
441         for b in bookcase_lights:
442             group.append(self.get_light_by_mac(b))
443         self.bookcase_group = GoogleLightGroup(
444             name,
445             group,
446             keywords,
447         )
448         mac = self.bookcase_group.get_mac()
449         self.macs_by_name[name] = mac
450         self._keywords_by_name[name] = keywords
451         self.keywords_by_mac[mac] = keywords
452         self.names_by_mac[mac] = name
453         self.index_light(name, keywords, mac)
454
455         name = 'dining_room_lights'
456         group = []
457         for b in diningroom_lights:
458             group.append(self.get_light_by_mac(b))
459         self.diningroom_group = GoogleLightGroup(
460             name,
461             group,
462             keywords,
463         )
464         mac = self.diningroom_group.get_mac()
465         self.macs_by_name[name] = mac
466         self._keywords_by_name[name] = keywords
467         self.keywords_by_mac[mac] = keywords
468         self.names_by_mac[mac] = name
469         self.index_light(name, keywords, mac)
470
471     def index_light(self, name: str, keywords: str, mac: str) -> None:
472         properties = [("name", name)]
473         tags = set()
474         for kw in keywords.split():
475             if ":" in kw:
476                 key, value = kw.split(":")
477                 properties.append((key, value))
478             else:
479                 tags.add(kw)
480         self.corpus.add_doc(
481             logical_search.Document(
482                 docid=mac,
483                 tags=tags,
484                 properties=properties,
485                 reference=None,
486             )
487         )
488
489     def __repr__(self) -> str:
490         s = "Known devices:\n"
491         for name, keywords in self._keywords_by_name.items():
492             mac = self.macs_by_name[name]
493             s += f"  {name} ({mac}) => {keywords}\n"
494         return s
495
496     def get_keywords_by_name(self, name: str) -> Optional[str]:
497         return self._keywords_by_name.get(name, None)
498
499     def get_macs_by_name(self, name: str) -> Set[str]:
500         retval = set()
501         for (mac, lname) in self.names_by_mac.items():
502             if name in lname:
503                 retval.add(mac)
504         return retval
505
506     def get_macs_by_keyword(self, keyword: str) -> Set[str]:
507         retval = set()
508         for (mac, keywords) in self.keywords_by_mac.items():
509             if keyword in keywords:
510                 retval.add(mac)
511         return retval
512
513     def get_light_by_name(self, name: str) -> Optional[Light]:
514         if name in self.macs_by_name:
515             return self.get_light_by_mac(self.macs_by_name[name])
516         return None
517
518     def get_all_lights(self) -> List[Light]:
519         retval = []
520         for (mac, kws) in self.keywords_by_mac.items():
521             if mac is not None:
522                 light = self.get_light_by_mac(mac)
523                 if light is not None:
524                     retval.append(light)
525         return group_google_lights(retval)
526
527     def get_light_by_mac(self, mac: str) -> Optional[Light]:
528         if mac in self.keywords_by_mac:
529             name = self.names_by_mac[mac]
530             kws = self.keywords_by_mac[mac]
531             if name == 'bookcase_lights':
532                 return self.bookcase_group
533             elif name == 'dining_room_lights':
534                 return self.diningroom_group
535             elif "tplink" in kws.lower():
536                 return TPLinkLight(name, mac, kws)
537             else:
538                 return GoogleLight(name, mac, kws)
539         return None
540
541     def query(self, query: str) -> List[Light]:
542         """Evaluates a lighting query expression formed of keywords to search
543         for, logical operators (and, or, not), and parenthesis.
544         Returns a list of matching lights.
545         """
546         retval = []
547         results = self.corpus.query(query)
548         if results is not None:
549             for mac in results:
550                 if mac is not None:
551                     light = self.get_light_by_mac(mac)
552                     if light is not None:
553                         retval.append(light)
554         return group_google_lights(retval)