Starting to move towards python3.9
[python_utils.git] / light_utils.py
1 #!/usr/bin/env python3
2
3 """Utilities for dealing with the smart lights."""
4
5 from abc import ABC, abstractmethod
6 import datetime
7 import json
8 import logging
9 import os
10 import re
11 import subprocess
12 import sys
13 from typing import Dict, List, Optional, Set
14
15 import argparse_utils
16 import config
17 import logging_utils
18 import logical_search
19 from google_assistant import ask_google, GoogleResponse
20 from decorator_utils import timeout, memoized
21
22 logger = logging.getLogger(__name__)
23
24 parser = config.add_commandline_args(
25     f"Light Utils ({__file__})",
26     "Args related to light utilities."
27 )
28 parser.add_argument(
29     '--light_utils_tplink_location',
30     default='/home/scott/bin/tplink.py',
31     metavar='FILENAME',
32     help='The location of the tplink.py helper',
33     type=argparse_utils.valid_filename,
34 )
35 parser.add_argument(
36     '--light_utils_network_mac_addresses_location',
37     default='/home/scott/bin/network_mac_addresses.txt',
38     metavar='FILENAME',
39     help='The location of network_mac_addresses.txt',
40     type=argparse_utils.valid_filename,
41 )
42
43
44 @timeout(
45     5.0, use_signals=False, error_message="Timed out waiting for tplink.py"
46 )
47 def tplink_light_command(command: str) -> bool:
48     result = os.system(command)
49     signal = result & 0xFF
50     if signal != 0:
51         logging_utils.hlog("%s died with signal %d" % (command, signal))
52         return False
53     else:
54         exit_value = result >> 8
55         if exit_value != 0:
56             logging_utils.hlog("%s failed, exit %d" % (command, exit_value))
57             return False
58     return True
59
60
61 class Light(ABC):
62     def __init__(self, name: str, mac: str, keywords: str = "") -> None:
63         self.name = name.strip()
64         self.mac = mac.strip()
65         self.keywords = keywords.strip()
66         self.kws = keywords.split()
67
68     def get_name(self) -> str:
69         return self.name
70
71     def get_mac(self) -> str:
72         return self.mac
73
74     @abstractmethod
75     def turn_on(self) -> bool:
76         pass
77
78     @abstractmethod
79     def turn_off(self) -> bool:
80         pass
81
82     @abstractmethod
83     def is_on(self) -> bool:
84         pass
85
86     @abstractmethod
87     def is_off(self) -> bool:
88         pass
89
90     @abstractmethod
91     def get_dimmer_level(self) -> Optional[int]:
92         pass
93
94     @abstractmethod
95     def set_dimmer_level(self, level: int) -> bool:
96         pass
97
98     @abstractmethod
99     def make_color(self, color: str) -> bool:
100         pass
101
102     def get_keywords(self) -> List[str]:
103         return self.kws
104
105     def has_keyword(self, keyword: str) -> bool:
106         for kw in self.kws:
107             if kw == keyword:
108                 return True
109         return False
110
111
112 class GoogleLight(Light):
113     def __init__(self, name: str, mac: str, keywords: str = "") -> None:
114         super().__init__(name, mac, keywords)
115
116     def goog_name(self) -> str:
117         name = self.get_name()
118         return name.replace("_", " ")
119
120     @staticmethod
121     def parse_google_response(response: GoogleResponse) -> bool:
122         return response.success
123
124     def turn_on(self) -> bool:
125         return GoogleLight.parse_google_response(
126             ask_google(f"turn {self.goog_name()} on")
127         )
128
129     def turn_off(self) -> bool:
130         return GoogleLight.parse_google_response(
131             ask_google(f"turn {self.goog_name()} off")
132         )
133
134     def is_on(self) -> bool:
135         r = ask_google(f"is {self.goog_name()} on?")
136         if not r.success:
137             return False
138         return 'is on' in r.audio_transcription
139
140     def is_off(self) -> bool:
141         return not self.is_on()
142
143     def get_dimmer_level(self) -> Optional[int]:
144         if not self.has_keyword("dimmer"):
145             return False
146         r = ask_google(f'how bright is {self.goog_name()}?')
147         if not r.success:
148             return None
149
150         # the bookcase one is set to 40% bright
151         txt = r.audio_transcription
152         m = re.search(r"(\d+)% bright", txt)
153         if m is not None:
154             return int(m.group(1))
155         if "is off" in txt:
156             return 0
157         return None
158
159     def set_dimmer_level(self, level: int) -> bool:
160         if not self.has_keyword("dimmer"):
161             return False
162         if 0 <= level <= 100:
163             was_on = self.is_on()
164             r = ask_google(f"set {self.goog_name()} to {level} percent")
165             if not r.success:
166                 return False
167             if not was_on:
168                 self.turn_off()
169             return True
170         return False
171
172     def make_color(self, color: str) -> bool:
173         return GoogleLight.parse_google_response(
174             ask_google(f"make {self.goog_name()} {color}")
175         )
176
177
178 class TPLinkLight(Light):
179     def __init__(self, name: str, mac: str, keywords: str = "") -> None:
180         super().__init__(name, mac, keywords)
181         self.children: List[str] = []
182         self.info: Optional[Dict] = None
183         self.info_ts: Optional[datetime.datetime] = None
184         if "children" in self.keywords:
185             self.info = self.get_info()
186             if self.info is not None:
187                 for child in self.info["children"]:
188                     self.children.append(child["id"])
189
190     @memoized
191     def get_tplink_name(self) -> Optional[str]:
192         self.info = self.get_info()
193         if self.info is not None:
194             return self.info["alias"]
195         return None
196
197     def get_cmdline(self, child: str = None) -> str:
198         cmd = (
199             f"{config.config['light_utils_tplink_location']} -m {self.mac} "
200             f"--no_logging_console "
201         )
202         if child is not None:
203             cmd += f"-x {child} "
204         return cmd
205
206     def get_children(self) -> List[str]:
207         return self.children
208
209     def command(
210         self, cmd: str, child: str = None, extra_args: str = None
211     ) -> bool:
212         cmd = self.get_cmdline(child) + f"-c {cmd}"
213         if extra_args is not None:
214             cmd += f" {extra_args}"
215         return tplink_light_command(cmd)
216
217     def turn_on(self, child: str = None) -> bool:
218         return self.command("on", child)
219
220     def turn_off(self, child: str = None) -> bool:
221         return self.command("off", child)
222
223     def is_on(self) -> bool:
224         return self.get_on_duration_seconds() > 0
225
226     def is_off(self) -> bool:
227         return not self.is_on()
228
229     def make_color(self, color: str) -> bool:
230         raise NotImplementedError
231
232     @timeout(
233         10.0, use_signals=False, error_message="Timed out waiting for tplink.py"
234     )
235     def get_info(self) -> Optional[Dict]:
236         cmd = self.get_cmdline() + "-c info"
237         out = subprocess.getoutput(cmd)
238         out = re.sub("Sent:.*\n", "", out)
239         out = re.sub("Received: *", "", out)
240         try:
241             self.info = json.loads(out)["system"]["get_sysinfo"]
242             self.info_ts = datetime.datetime.now()
243             return self.info
244         except Exception as e:
245             logger.exception(e)
246             print(out, file=sys.stderr)
247             self.info = None
248             self.info_ts = None
249             return None
250
251     def get_on_duration_seconds(self, child: str = None) -> int:
252         self.info = self.get_info()
253         if child is None:
254             if self.info is None:
255                 return 0
256             return int(self.info.get("on_time", "0"))
257         else:
258             if self.info is None:
259                 return 0
260             for chi in self.info.get("children", {}):
261                 if chi["id"] == child:
262                     return int(chi.get("on_time", "0"))
263         return 0
264
265     def get_on_limit_seconds(self) -> Optional[int]:
266         for kw in self.kws:
267             m = re.search(r"timeout:(\d+)", kw)
268             if m is not None:
269                 return int(m.group(1)) * 60
270         return None
271
272     def get_dimmer_level(self) -> Optional[int]:
273         if not self.has_keyword("dimmer"):
274             return False
275         self.info = self.get_info()
276         if self.info is None:
277             return None
278         return int(self.info.get("brightness", "0"))
279
280     def set_dimmer_level(self, level: int) -> bool:
281         if not self.has_keyword("dimmer"):
282             return False
283         cmd = (
284             self.get_cmdline()
285             + f'-j \'{{"smartlife.iot.dimmer":{{"set_brightness":{{"brightness":{level} }} }} }}\''
286         )
287         return tplink_light_command(cmd)
288
289
290 class GoogleLightGroup(GoogleLight):
291     def __init__(self, name: str, members: List[GoogleLight], keywords: str = "") -> None:
292         if len(members) < 1:
293             raise Exception("There must be at least one light in the group.")
294         self.members = members
295         mac = GoogleLightGroup.make_up_mac(members)
296         super().__init__(name, mac, keywords)
297
298     @staticmethod
299     def make_up_mac(members: List[GoogleLight]):
300         mac = members[0].get_mac()
301         b = mac.split(':')
302         b[5] = int(b[5], 16) + 1
303         if b[5] > 255:
304             b[5] = 0
305         b[5] = str(b[5])
306         return ":".join(b)
307
308     def is_on(self) -> bool:
309         r = ask_google(f"are {self.goog_name()} on?")
310         if not r.success:
311             return False
312         return 'is on' in r.audio_transcription
313
314     def get_dimmer_level(self) -> Optional[int]:
315         if not self.has_keyword("dimmer"):
316             return False
317         r = ask_google(f'how bright are {self.goog_name()}?')
318         if not r.success:
319             return None
320
321         # four lights are set to 100% brightness
322         txt = r.audio_transcription
323         m = re.search(r"(\d+)% bright", txt)
324         if m is not None:
325             return int(m.group(1))
326         if "is off" in txt:
327             return 0
328         return None
329
330     def set_dimmer_level(self, level: int) -> bool:
331         if not self.has_keyword("dimmer"):
332             return False
333         if 0 <= level <= 100:
334             was_on = self.is_on()
335             r = ask_google(f"set {self.goog_name()} to {level} percent")
336             if not r.success:
337                 return False
338             if not was_on:
339                 self.turn_off()
340             return True
341         return False
342
343     def make_color(self, color: str) -> bool:
344         return GoogleLight.parse_google_response(
345             ask_google(f"make {self.goog_name()} {color}")
346         )
347
348
349 def group_google_lights(lights: List[Light]) -> List[Light]:
350     bookcase_group = []
351     diningroom_group = []
352     for light in lights:
353         name = light.get_name()
354         if "bookcase_light_" in name:
355             bookcase_group.append(light)
356         elif "diningroom_light_" in name:
357             diningroom_group.append(light)
358
359     did_bookcase = False
360     did_diningroom = False
361     ret = []
362     for light in lights:
363         name = light.get_name()
364         if "bookcase_light_" in name:
365             if len(bookcase_group) == 4 and not did_bookcase:
366                 ret.append(
367                     GoogleLightGroup(
368                         "bookcase_lights",
369                         bookcase_group,
370                         "perm wifi light smart goog dimmer"
371                     )
372                 )
373                 did_bookcase = True
374         elif "diningroom_light_" in name:
375             if len(diningroom_group) == 2 and not did_diningroom:
376                 ret.append(
377                     GoogleLightGroup(
378                         "dining_room_lights",
379                         diningroom_group,
380                         "intermittent wifi light smart goog dimmer"
381                     )
382                 )
383                 did_diningroom = True
384         else:
385             ret.append(light)
386     return ret
387
388
389 class LightingConfig(object):
390     """Representation of the smart light device config."""
391
392     def __init__(
393             self,
394             config_file: str = None,
395     ) -> None:
396         if config_file is None:
397             config_file = config.config[
398                 'light_utils_network_mac_addresses_location'
399             ]
400         self.macs_by_name = {}
401         self._keywords_by_name = {}
402         self.keywords_by_mac = {}
403         self.names_by_mac = {}
404         self.corpus = logical_search.Corpus()
405         with open(config_file, "r") as f:
406             contents = f.readlines()
407
408         diningroom_lights = []
409         bookcase_lights = []
410         for line in contents:
411             line = line.rstrip("\n")
412             line = re.sub(r"#.*$", r"", line)
413             line = line.strip()
414             if line == "":
415                 continue
416             (mac, name, keywords) = line.split(",")
417             mac = mac.strip()
418             name = name.strip()
419             keywords = keywords.strip()
420             if "perm" not in keywords:
421                 continue
422             self.macs_by_name[name] = mac
423             self._keywords_by_name[name] = keywords
424             self.keywords_by_mac[mac] = keywords
425             self.names_by_mac[mac] = name
426
427             if "bookcase_light_" in name:
428                 bookcase_lights.append(mac)
429             elif "diningroom_light_" in name:
430                 diningroom_lights.append(mac)
431             else:
432                 self.index_light(name, keywords, mac)
433
434         name = 'bookcase_lights'
435         group = []
436         keywords = 'perm wifi light smart goog dimmer'
437         for b in bookcase_lights:
438             group.append(self.get_light_by_mac(b))
439         self.bookcase_group = GoogleLightGroup(
440             name,
441             group,
442             keywords,
443         )
444         mac = self.bookcase_group.get_mac()
445         self.macs_by_name[name] = mac
446         self._keywords_by_name[name] = keywords
447         self.keywords_by_mac[mac] = keywords
448         self.names_by_mac[mac] = name
449         self.index_light(name, keywords, mac)
450
451         name = 'dining_room_lights'
452         group = []
453         for b in diningroom_lights:
454             group.append(self.get_light_by_mac(b))
455         self.diningroom_group = GoogleLightGroup(
456             name,
457             group,
458             keywords,
459         )
460         mac = self.diningroom_group.get_mac()
461         self.macs_by_name[name] = mac
462         self._keywords_by_name[name] = keywords
463         self.keywords_by_mac[mac] = keywords
464         self.names_by_mac[mac] = name
465         self.index_light(name, keywords, mac)
466
467     def index_light(self, name: str, keywords: str, mac: str) -> None:
468         properties = [("name", name)]
469         tags = set()
470         for kw in keywords.split():
471             if ":" in kw:
472                 key, value = kw.split(":")
473                 properties.append((key, value))
474             else:
475                 tags.add(kw)
476         self.corpus.add_doc(
477             logical_search.Document(
478                 docid=mac,
479                 tags=tags,
480                 properties=properties,
481                 reference=None,
482             )
483         )
484
485     def __repr__(self) -> str:
486         s = "Known devices:\n"
487         for name, keywords in self._keywords_by_name.items():
488             mac = self.macs_by_name[name]
489             s += f"  {name} ({mac}) => {keywords}\n"
490         return s
491
492     def get_keywords_by_name(self, name: str) -> Optional[str]:
493         return self._keywords_by_name.get(name, None)
494
495     def get_macs_by_name(self, name: str) -> Set[str]:
496         retval = set()
497         for (mac, lname) in self.names_by_mac.items():
498             if name in lname:
499                 retval.add(mac)
500         return retval
501
502     def get_macs_by_keyword(self, keyword: str) -> Set[str]:
503         retval = set()
504         for (mac, keywords) in self.keywords_by_mac.items():
505             if keyword in keywords:
506                 retval.add(mac)
507         return retval
508
509     def get_light_by_name(self, name: str) -> Optional[Light]:
510         if name in self.macs_by_name:
511             return self.get_light_by_mac(self.macs_by_name[name])
512         return None
513
514     def get_all_lights(self) -> List[Light]:
515         retval = []
516         for (mac, kws) in self.keywords_by_mac.items():
517             if mac is not None:
518                 light = self.get_light_by_mac(mac)
519                 if light is not None:
520                     retval.append(light)
521         return group_google_lights(retval)
522
523     def get_light_by_mac(self, mac: str) -> Optional[Light]:
524         if mac in self.keywords_by_mac:
525             name = self.names_by_mac[mac]
526             kws = self.keywords_by_mac[mac]
527             if name == 'bookcase_lights':
528                 return self.bookcase_group
529             elif name == 'dining_room_lights':
530                 return self.diningroom_group
531             elif "tplink" in kws.lower():
532                 return TPLinkLight(name, mac, kws)
533             else:
534                 return GoogleLight(name, mac, kws)
535         return None
536
537     def query(self, query: str) -> List[Light]:
538         """Evaluates a lighting query expression formed of keywords to search
539         for, logical operators (and, or, not), and parenthesis.
540         Returns a list of matching lights.
541         """
542         retval = []
543         results = self.corpus.query(query)
544         if results is not None:
545             for mac in results:
546                 if mac is not None:
547                     light = self.get_light_by_mac(mac)
548                     if light is not None:
549                         retval.append(light)
550         return group_google_lights(retval)