Moving smart lights into smart_home to prepare for adding
[python_utils.git] / smart_home / lights.py
1 #!/usr/bin/env python3
2
3 """Utilities for dealing with the smart lights."""
4
5 from abc import ABC, abstractmethod
6 import datetime
7 import json
8 import logging
9 import os
10 import re
11 import subprocess
12 import sys
13 from typing import Any, Dict, List, Optional, Set
14
15 import tinytuya as tt
16
17 import argparse_utils
18 import config
19 import logging_utils
20 import logical_search
21 from google_assistant import ask_google, GoogleResponse
22 from decorator_utils import timeout, memoized
23
24 logger = logging.getLogger(__name__)
25
26 parser = config.add_commandline_args(
27     f"Light Utils ({__file__})",
28     "Args related to light utilities."
29 )
30 parser.add_argument(
31     '--light_utils_tplink_location',
32     default='/home/scott/bin/tplink.py',
33     metavar='FILENAME',
34     help='The location of the tplink.py helper',
35     type=argparse_utils.valid_filename,
36 )
37 parser.add_argument(
38     '--light_utils_network_mac_addresses_location',
39     default='/home/scott/bin/network_mac_addresses.txt',
40     metavar='FILENAME',
41     help='The location of network_mac_addresses.txt',
42     type=argparse_utils.valid_filename,
43 )
44
45
46 @timeout(
47     5.0, use_signals=False, error_message="Timed out waiting for tplink.py"
48 )
49 def tplink_light_command(command: str) -> bool:
50     result = os.system(command)
51     signal = result & 0xFF
52     if signal != 0:
53         logger.warning(f'{command} died with signal {signal}')
54         logging_utils.hlog("%s died with signal %d" % (command, signal))
55         return False
56     else:
57         exit_value = result >> 8
58         if exit_value != 0:
59             logger.warning(f'{command} failed, exited {exit_value}')
60             logging_utils.hlog("%s failed, exit %d" % (command, exit_value))
61             return False
62     logger.debug(f'{command} succeeded.')
63     return True
64
65
66 class Light(ABC):
67     def __init__(self, name: str, mac: str, keywords: str = "") -> None:
68         self.name = name.strip()
69         self.mac = mac.strip()
70         self.keywords = keywords.strip()
71         self.kws = keywords.split()
72
73     def get_name(self) -> str:
74         return self.name
75
76     def get_mac(self) -> str:
77         return self.mac
78
79     @abstractmethod
80     def turn_on(self) -> bool:
81         pass
82
83     @abstractmethod
84     def turn_off(self) -> bool:
85         pass
86
87     @abstractmethod
88     def is_on(self) -> bool:
89         pass
90
91     @abstractmethod
92     def is_off(self) -> bool:
93         pass
94
95     @abstractmethod
96     def get_dimmer_level(self) -> Optional[int]:
97         pass
98
99     @abstractmethod
100     def set_dimmer_level(self, level: int) -> bool:
101         pass
102
103     @abstractmethod
104     def make_color(self, color: str) -> bool:
105         pass
106
107     def get_keywords(self) -> List[str]:
108         return self.kws
109
110     def has_keyword(self, keyword: str) -> bool:
111         for kw in self.kws:
112             if kw == keyword:
113                 return True
114         return False
115
116
117 class GoogleLight(Light):
118     def __init__(self, name: str, mac: str, keywords: str = "") -> None:
119         super().__init__(name, mac, keywords)
120
121     def goog_name(self) -> str:
122         name = self.get_name()
123         return name.replace("_", " ")
124
125     @staticmethod
126     def parse_google_response(response: GoogleResponse) -> bool:
127         return response.success
128
129     def turn_on(self) -> bool:
130         return GoogleLight.parse_google_response(
131             ask_google(f"turn {self.goog_name()} on")
132         )
133
134     def turn_off(self) -> bool:
135         return GoogleLight.parse_google_response(
136             ask_google(f"turn {self.goog_name()} off")
137         )
138
139     def is_on(self) -> bool:
140         r = ask_google(f"is {self.goog_name()} on?")
141         if not r.success:
142             return False
143         return 'is on' in r.audio_transcription
144
145     def is_off(self) -> bool:
146         return not self.is_on()
147
148     def get_dimmer_level(self) -> Optional[int]:
149         if not self.has_keyword("dimmer"):
150             return False
151         r = ask_google(f'how bright is {self.goog_name()}?')
152         if not r.success:
153             return None
154
155         # the bookcase one is set to 40% bright
156         txt = r.audio_transcription
157         m = re.search(r"(\d+)% bright", txt)
158         if m is not None:
159             return int(m.group(1))
160         if "is off" in txt:
161             return 0
162         return None
163
164     def set_dimmer_level(self, level: int) -> bool:
165         if not self.has_keyword("dimmer"):
166             return False
167         if 0 <= level <= 100:
168             was_on = self.is_on()
169             r = ask_google(f"set {self.goog_name()} to {level} percent")
170             if not r.success:
171                 return False
172             if not was_on:
173                 self.turn_off()
174             return True
175         return False
176
177     def make_color(self, color: str) -> bool:
178         return GoogleLight.parse_google_response(
179             ask_google(f"make {self.goog_name()} {color}")
180         )
181
182
183 class TuyaLight(Light):
184     ids_by_mac = {
185         '68:C6:3A:DE:1A:94': '8844664268c63ade1a94',
186         '68:C6:3A:DE:27:1A': '8844664268c63ade271a',
187         '68:C6:3A:DE:1D:95': '8844664268c63ade1d95',
188         '68:C6:3A:DE:19:B3': '8844664268c63ade19b3',
189         '80:7D:3A:77:3B:F5': '07445340807d3a773bf5',
190         '80:7D:3A:58:37:02': '07445340807d3a583702',
191     }
192     keys_by_mac = {
193         '68:C6:3A:DE:1A:94': '237f19b1b3d49c36',
194         '68:C6:3A:DE:27:1A': '237f19b1b3d49c36',
195         '68:C6:3A:DE:1D:95': '04b90fc5cd7625d8',
196         '68:C6:3A:DE:19:B3': '2d601f2892f1aefd',
197         '80:7D:3A:77:3B:F5': '27ab921fe4633519',
198         '80:7D:3A:58:37:02': '8559b5416bfa0c05',
199     }
200
201     def __init__(self, name: str, mac: str, keywords: str = "") -> None:
202         from subprocess import Popen, PIPE
203         super().__init__(name, mac, keywords)
204         mac = mac.upper()
205         if mac not in TuyaLight.ids_by_mac or mac not in TuyaLight.keys_by_mac:
206             raise Exception(f'{mac} is unknown; add it to ids_by_mac and keys_by_mac')
207         self.devid = TuyaLight.ids_by_mac[mac]
208         self.key = TuyaLight.keys_by_mac[mac]
209         try:
210             pid = Popen(['maclookup', mac], stdout=PIPE)
211             ip = pid.communicate()[0]
212             ip = ip[:-1]
213         except Exception:
214             ip = '0.0.0.0'
215         self.bulb = tt.BulbDevice(self.devid, ip, local_key=self.key)
216
217     def turn_on(self) -> bool:
218         self.bulb.turn_on()
219         return True
220
221     def turn_off(self) -> bool:
222         self.bulb.turn_off()
223         return True
224
225     def get_status(self) -> Dict[str, Any]:
226         return self.bulb.status()
227
228     def is_on(self) -> bool:
229         s = self.get_status()
230         return s['dps']['1']
231
232     def is_off(self) -> bool:
233         return not self.is_on()
234
235     def get_dimmer_level(self) -> Optional[int]:
236         s = self.get_status()
237         return s['dps']['3']
238
239     def set_dimmer_level(self, level: int) -> bool:
240         self.bulb.set_brightness(level)
241         return True
242
243     def make_color(self, color: str) -> bool:
244         self.bulb.set_colour(255,0,0)
245         return True
246
247
248 class TPLinkLight(Light):
249     def __init__(self, name: str, mac: str, keywords: str = "") -> None:
250         super().__init__(name, mac, keywords)
251         self.children: List[str] = []
252         self.info: Optional[Dict] = None
253         self.info_ts: Optional[datetime.datetime] = None
254         if "children" in self.keywords:
255             self.info = self.get_info()
256             if self.info is not None:
257                 for child in self.info["children"]:
258                     self.children.append(child["id"])
259
260     @memoized
261     def get_tplink_name(self) -> Optional[str]:
262         self.info = self.get_info()
263         if self.info is not None:
264             return self.info["alias"]
265         return None
266
267     def get_cmdline(self, child: str = None) -> str:
268         cmd = (
269             f"{config.config['light_utils_tplink_location']} -m {self.mac} "
270             f"--no_logging_console "
271         )
272         if child is not None:
273             cmd += f"-x {child} "
274         return cmd
275
276     def get_children(self) -> List[str]:
277         return self.children
278
279     def command(
280         self, cmd: str, child: str = None, extra_args: str = None
281     ) -> bool:
282         cmd = self.get_cmdline(child) + f"-c {cmd}"
283         if extra_args is not None:
284             cmd += f" {extra_args}"
285         logger.debug(f'About to execute {cmd}')
286         return tplink_light_command(cmd)
287
288     def turn_on(self, child: str = None) -> bool:
289         return self.command("on", child)
290
291     def turn_off(self, child: str = None) -> bool:
292         return self.command("off", child)
293
294     def is_on(self) -> bool:
295         return self.get_on_duration_seconds() > 0
296
297     def is_off(self) -> bool:
298         return not self.is_on()
299
300     def make_color(self, color: str) -> bool:
301         raise NotImplementedError
302
303     @timeout(
304         10.0, use_signals=False, error_message="Timed out waiting for tplink.py"
305     )
306     def get_info(self) -> Optional[Dict]:
307         cmd = self.get_cmdline() + "-c info"
308         out = subprocess.getoutput(cmd)
309         out = re.sub("Sent:.*\n", "", out)
310         out = re.sub("Received: *", "", out)
311         try:
312             self.info = json.loads(out)["system"]["get_sysinfo"]
313             self.info_ts = datetime.datetime.now()
314             return self.info
315         except Exception as e:
316             logger.exception(e)
317             print(out, file=sys.stderr)
318             self.info = None
319             self.info_ts = None
320             return None
321
322     def get_on_duration_seconds(self, child: str = None) -> int:
323         self.info = self.get_info()
324         if child is None:
325             if self.info is None:
326                 return 0
327             return int(self.info.get("on_time", "0"))
328         else:
329             if self.info is None:
330                 return 0
331             for chi in self.info.get("children", {}):
332                 if chi["id"] == child:
333                     return int(chi.get("on_time", "0"))
334         return 0
335
336     def get_on_limit_seconds(self) -> Optional[int]:
337         for kw in self.kws:
338             m = re.search(r"timeout:(\d+)", kw)
339             if m is not None:
340                 return int(m.group(1)) * 60
341         return None
342
343     def get_dimmer_level(self) -> Optional[int]:
344         if not self.has_keyword("dimmer"):
345             return False
346         self.info = self.get_info()
347         if self.info is None:
348             return None
349         return int(self.info.get("brightness", "0"))
350
351     def set_dimmer_level(self, level: int) -> bool:
352         if not self.has_keyword("dimmer"):
353             return False
354         cmd = (
355             self.get_cmdline()
356             + f'-j \'{{"smartlife.iot.dimmer":{{"set_brightness":{{"brightness":{level} }} }} }}\''
357         )
358         return tplink_light_command(cmd)
359
360
361 class GoogleLightGroup(GoogleLight):
362     def __init__(self, name: str, members: List[GoogleLight], keywords: str = "") -> None:
363         if len(members) < 1:
364             raise Exception("There must be at least one light in the group.")
365         self.members = members
366         mac = GoogleLightGroup.make_up_mac(members)
367         super().__init__(name, mac, keywords)
368
369     @staticmethod
370     def make_up_mac(members: List[GoogleLight]):
371         mac = members[0].get_mac()
372         b = mac.split(':')
373         b[5] = int(b[5], 16) + 1
374         if b[5] > 255:
375             b[5] = 0
376         b[5] = str(b[5])
377         return ":".join(b)
378
379     def is_on(self) -> bool:
380         r = ask_google(f"are {self.goog_name()} on?")
381         if not r.success:
382             return False
383         return 'is on' in r.audio_transcription
384
385     def get_dimmer_level(self) -> Optional[int]:
386         if not self.has_keyword("dimmer"):
387             return False
388         r = ask_google(f'how bright are {self.goog_name()}?')
389         if not r.success:
390             return None
391
392         # four lights are set to 100% brightness
393         txt = r.audio_transcription
394         m = re.search(r"(\d+)% bright", txt)
395         if m is not None:
396             return int(m.group(1))
397         if "is off" in txt:
398             return 0
399         return None
400
401     def set_dimmer_level(self, level: int) -> bool:
402         if not self.has_keyword("dimmer"):
403             return False
404         if 0 <= level <= 100:
405             was_on = self.is_on()
406             r = ask_google(f"set {self.goog_name()} to {level} percent")
407             if not r.success:
408                 return False
409             if not was_on:
410                 self.turn_off()
411             return True
412         return False
413
414     def make_color(self, color: str) -> bool:
415         return GoogleLight.parse_google_response(
416             ask_google(f"make {self.goog_name()} {color}")
417         )
418
419
420 class LightingConfig(object):
421     """Representation of the smart light device config."""
422
423     def __init__(
424             self,
425             config_file: str = None,
426     ) -> None:
427         if config_file is None:
428             config_file = config.config[
429                 'light_utils_network_mac_addresses_location'
430             ]
431         self.macs_by_name = {}
432         self._keywords_by_name = {}
433         self.keywords_by_mac = {}
434         self.names_by_mac = {}
435         self.corpus = logical_search.Corpus()
436         with open(config_file, "r") as f:
437             contents = f.readlines()
438
439         diningroom_lights = []
440         bookcase_lights = []
441         for line in contents:
442             line = line.rstrip("\n")
443             line = re.sub(r"#.*$", r"", line)
444             line = line.strip()
445             if line == "":
446                 continue
447             (mac, name, keywords) = line.split(",")
448             mac = mac.strip()
449             name = name.strip()
450             keywords = keywords.strip()
451             if "perm" not in keywords:
452                 continue
453             self.macs_by_name[name] = mac
454             self._keywords_by_name[name] = keywords
455             self.keywords_by_mac[mac] = keywords
456             self.names_by_mac[mac] = name
457
458 #            if "bookcase_light_" in name:
459 #                bookcase_lights.append(mac)
460 #            elif "diningroom_light_" in name:
461 #                diningroom_lights.append(mac)
462 #            else:
463             self.index_light(name, keywords, mac)
464
465         # name = 'bookcase_lights'
466         # group = []
467         # keywords = 'perm wifi light smart goog dimmer'
468         # for b in bookcase_lights:
469         #     group.append(self.get_light_by_mac(b))
470         # self.bookcase_group = GoogleLightGroup(
471         #     name,
472         #     group,
473         #     keywords,
474         # )
475         # mac = self.bookcase_group.get_mac()
476         # self.macs_by_name[name] = mac
477         # self._keywords_by_name[name] = keywords
478         # self.keywords_by_mac[mac] = keywords
479         # self.names_by_mac[mac] = name
480         # self.index_light(name, keywords, mac)
481
482         # name = 'dining_room_lights'
483         # group = []
484         # for b in diningroom_lights:
485         #     group.append(self.get_light_by_mac(b))
486         # self.diningroom_group = GoogleLightGroup(
487         #     name,
488         #     group,
489         #     keywords,
490         # )
491         # mac = self.diningroom_group.get_mac()
492         # self.macs_by_name[name] = mac
493         # self._keywords_by_name[name] = keywords
494         # self.keywords_by_mac[mac] = keywords
495         # self.names_by_mac[mac] = name
496         # self.index_light(name, keywords, mac)
497
498     def index_light(self, name: str, keywords: str, mac: str) -> None:
499         properties = [("name", name)]
500         tags = set()
501         for kw in keywords.split():
502             if ":" in kw:
503                 key, value = kw.split(":")
504                 properties.append((key, value))
505             else:
506                 tags.add(kw)
507         light = logical_search.Document(
508             docid=mac,
509             tags=tags,
510             properties=properties,
511             reference=None,
512         )
513         self.corpus.add_doc(light)
514
515     def __repr__(self) -> str:
516         s = "Known devices:\n"
517         for name, keywords in self._keywords_by_name.items():
518             mac = self.macs_by_name[name]
519             s += f"  {name} ({mac}) => {keywords}\n"
520         return s
521
522     def get_keywords_by_name(self, name: str) -> Optional[str]:
523         return self._keywords_by_name.get(name, None)
524
525     def get_macs_by_name(self, name: str) -> Set[str]:
526         retval = set()
527         for (mac, lname) in self.names_by_mac.items():
528             if name in lname:
529                 retval.add(mac)
530         return retval
531
532     def get_macs_by_keyword(self, keyword: str) -> Set[str]:
533         retval = set()
534         for (mac, keywords) in self.keywords_by_mac.items():
535             if keyword in keywords:
536                 retval.add(mac)
537         return retval
538
539     def get_light_by_name(self, name: str) -> Optional[Light]:
540         if name in self.macs_by_name:
541             return self.get_light_by_mac(self.macs_by_name[name])
542         return None
543
544     def get_all_lights(self) -> List[Light]:
545         retval = []
546         for (mac, kws) in self.keywords_by_mac.items():
547             if mac is not None:
548                 light = self.get_light_by_mac(mac)
549                 if light is not None:
550                     retval.append(light)
551         return retval
552
553     def get_light_by_mac(self, mac: str) -> Optional[Light]:
554         if mac in self.keywords_by_mac:
555             name = self.names_by_mac[mac]
556             kws = self.keywords_by_mac[mac]
557             if name == 'bookcase_lights':
558                 return self.bookcase_group
559             elif name == 'dining_room_lights':
560                 return self.diningroom_group
561             elif 'tplink' in kws.lower():
562                 return TPLinkLight(name, mac, kws)
563             elif 'tuya' in kws.lower():
564                 return TuyaLight(name, mac, kws)
565             else:
566                 return GoogleLight(name, mac, kws)
567         return None
568
569     def query(self, query: str) -> List[Light]:
570         """Evaluates a lighting query expression formed of keywords to search
571         for, logical operators (and, or, not), and parenthesis.
572         Returns a list of matching lights.
573         """
574         retval = []
575         results = self.corpus.query(query)
576         if results is not None:
577             for mac in results:
578                 if mac is not None:
579                     light = self.get_light_by_mac(mac)
580                     if light is not None:
581                         retval.append(light)
582         return retval