Fix a bug in device.py around the type of keywords, add logging and
[python_utils.git] / smart_home / lights.py
1 #!/usr/bin/env python3
2
3 """Utilities for dealing with the smart lights."""
4
5 import datetime
6 import json
7 import logging
8 import os
9 import re
10 import subprocess
11 import sys
12 from abc import abstractmethod
13 from typing import Any, Dict, List, Optional, Tuple
14
15 import tinytuya as tt
16 from overrides import overrides
17
18 import ansi
19 import argparse_utils
20 import arper
21 import config
22 import logging_utils
23 import smart_home.device as dev
24 from decorator_utils import memoized, timeout
25 from google_assistant import GoogleResponse, ask_google
26
27 logger = logging.getLogger(__name__)
28
29 args = config.add_commandline_args(
30     f"Smart Lights ({__file__})",
31     "Args related to smart lights.",
32 )
33 args.add_argument(
34     '--smart_lights_tplink_location',
35     default='/home/scott/bin/tplink.py',
36     metavar='FILENAME',
37     help='The location of the tplink.py helper',
38     type=argparse_utils.valid_filename,
39 )
40
41
42 @timeout(5.0, use_signals=False, error_message="Timed out waiting for tplink.py")
43 def tplink_light_command(command: str) -> bool:
44     result = os.system(command)
45     signal = result & 0xFF
46     if signal != 0:
47         msg = f'{command} died with signal {signal}'
48         logger.warning(msg)
49         logging_utils.hlog(msg)
50         return False
51     else:
52         exit_value = result >> 8
53         if exit_value != 0:
54             msg = f'{command} failed, exited {exit_value}'
55             logger.warning(msg)
56             logging_utils.hlog(msg)
57             return False
58     logger.debug(f'{command} succeeded.')
59     return True
60
61
62 class BaseLight(dev.Device):
63     def __init__(self, name: str, mac: str, keywords: str = "") -> None:
64         super().__init__(name.strip(), mac.strip(), keywords)
65
66     @staticmethod
67     def parse_color_string(color: str) -> Optional[Tuple[int, int, int]]:
68         m = re.match(
69             'r#?([0-9a-fA-F][0-9a-fA-F])([0-9a-fA-F][0-9a-fA-F])([0-9a-fA-F][0-9a-fA-F])',
70             color,
71         )
72         if m is not None and len(m.groups()) == 3:
73             red = int(m.group(0), 16)
74             green = int(m.group(1), 16)
75             blue = int(m.group(2), 16)
76             return (red, green, blue)
77         color = color.lower()
78         return ansi.COLOR_NAMES_TO_RGB.get(color, None)
79
80     @abstractmethod
81     def status(self) -> str:
82         pass
83
84     @abstractmethod
85     def turn_on(self) -> bool:
86         pass
87
88     @abstractmethod
89     def turn_off(self) -> bool:
90         pass
91
92     @abstractmethod
93     def is_on(self) -> bool:
94         pass
95
96     @abstractmethod
97     def is_off(self) -> bool:
98         pass
99
100     @abstractmethod
101     def get_dimmer_level(self) -> Optional[int]:
102         pass
103
104     @abstractmethod
105     def set_dimmer_level(self, level: int) -> bool:
106         pass
107
108     @abstractmethod
109     def make_color(self, color: str) -> bool:
110         pass
111
112
113 class GoogleLight(BaseLight):
114     def __init__(self, name: str, mac: str, keywords: str = "") -> None:
115         super().__init__(name, mac, keywords)
116
117     def goog_name(self) -> str:
118         name = self.get_name()
119         return name.replace("_", " ")
120
121     @staticmethod
122     def parse_google_response(response: GoogleResponse) -> bool:
123         return response.success
124
125     @overrides
126     def turn_on(self) -> bool:
127         return GoogleLight.parse_google_response(ask_google(f"turn {self.goog_name()} on"))
128
129     @overrides
130     def turn_off(self) -> bool:
131         return GoogleLight.parse_google_response(ask_google(f"turn {self.goog_name()} off"))
132
133     @overrides
134     def status(self) -> str:
135         if self.is_on():
136             return 'ON'
137         return 'off'
138
139     @overrides
140     def is_on(self) -> bool:
141         r = ask_google(f"is {self.goog_name()} on?")
142         if not r.success:
143             return False
144         if r.audio_transcription is not None:
145             return 'is on' in r.audio_transcription
146         raise Exception("Can't reach Google?!")
147
148     @overrides
149     def is_off(self) -> bool:
150         return not self.is_on()
151
152     @overrides
153     def get_dimmer_level(self) -> Optional[int]:
154         if not self.has_keyword("dimmer"):
155             return False
156         r = ask_google(f'how bright is {self.goog_name()}?')
157         if not r.success:
158             return None
159
160         # the bookcase one is set to 40% bright
161         txt = r.audio_transcription
162         if txt is not None:
163             m = re.search(r"(\d+)% bright", txt)
164             if m is not None:
165                 return int(m.group(1))
166             if "is off" in txt:
167                 return 0
168         return None
169
170     @overrides
171     def set_dimmer_level(self, level: int) -> bool:
172         if not self.has_keyword("dimmer"):
173             return False
174         if 0 <= level <= 100:
175             was_on = self.is_on()
176             r = ask_google(f"set {self.goog_name()} to {level} percent")
177             if not r.success:
178                 return False
179             if not was_on:
180                 self.turn_off()
181             return True
182         return False
183
184     @overrides
185     def make_color(self, color: str) -> bool:
186         return GoogleLight.parse_google_response(ask_google(f"make {self.goog_name()} {color}"))
187
188
189 class TuyaLight(BaseLight):
190     ids_by_mac = {
191         '68:C6:3A:DE:1A:94': '8844664268c63ade1a94',
192         '68:C6:3A:DE:27:1A': '8844664268c63ade271a',
193         '68:C6:3A:DE:1D:95': '8844664268c63ade1d95',
194         '68:C6:3A:DE:19:B3': '8844664268c63ade19b3',
195         '80:7D:3A:77:3B:F5': '07445340807d3a773bf5',
196         '80:7D:3A:58:37:02': '07445340807d3a583702',
197     }
198     keys_by_mac = {
199         '68:C6:3A:DE:1A:94': '237f19b1b3d49c36',
200         '68:C6:3A:DE:27:1A': '237f19b1b3d49c36',
201         '68:C6:3A:DE:1D:95': '04b90fc5cd7625d8',
202         '68:C6:3A:DE:19:B3': '2d601f2892f1aefd',
203         '80:7D:3A:77:3B:F5': '27ab921fe4633519',
204         '80:7D:3A:58:37:02': '8559b5416bfa0c05',
205     }
206
207     def __init__(self, name: str, mac: str, keywords: str = "") -> None:
208         super().__init__(name, mac, keywords)
209         mac = mac.upper()
210         if mac not in TuyaLight.ids_by_mac or mac not in TuyaLight.keys_by_mac:
211             raise Exception(f'{mac} is unknown; add it to ids_by_mac and keys_by_mac')
212         self.devid = TuyaLight.ids_by_mac[mac]
213         self.key = TuyaLight.keys_by_mac[mac]
214         self.arper = arper.Arper()
215         ip = self.get_ip()
216         self.bulb = tt.BulbDevice(self.devid, ip, local_key=self.key)
217
218     def get_status(self) -> Dict[str, Any]:
219         return self.bulb.status()
220
221     @overrides
222     def status(self) -> str:
223         ret = ''
224         for k, v in self.bulb.status().items():
225             ret += f'{k} = {v}\n'
226         return ret
227
228     @overrides
229     def turn_on(self) -> bool:
230         self.bulb.turn_on()
231         return True
232
233     @overrides
234     def turn_off(self) -> bool:
235         self.bulb.turn_off()
236         return True
237
238     @overrides
239     def is_on(self) -> bool:
240         s = self.get_status()
241         return s['dps']['1']
242
243     @overrides
244     def is_off(self) -> bool:
245         return not self.is_on()
246
247     @overrides
248     def get_dimmer_level(self) -> Optional[int]:
249         s = self.get_status()
250         return s['dps']['3']
251
252     @overrides
253     def set_dimmer_level(self, level: int) -> bool:
254         logger.debug(f'Setting brightness to {level}')
255         self.bulb.set_brightness(level)
256         return True
257
258     @overrides
259     def make_color(self, color: str) -> bool:
260         rgb = BaseLight.parse_color_string(color)
261         logger.debug(f'Light color: {color} -> {rgb}')
262         if rgb is not None:
263             self.bulb.set_colour(rgb[0], rgb[1], rgb[2])
264             return True
265         return False
266
267
268 class TPLinkLight(BaseLight):
269     def __init__(self, name: str, mac: str, keywords: str = "") -> None:
270         super().__init__(name, mac, keywords)
271         self.children: List[str] = []
272         self.info: Optional[Dict] = None
273         self.info_ts: Optional[datetime.datetime] = None
274         if "children" in self.keywords:
275             self.info = self.get_info()
276             if self.info is not None:
277                 for child in self.info["children"]:
278                     self.children.append(child["id"])
279
280     @memoized
281     def get_tplink_name(self) -> Optional[str]:
282         self.info = self.get_info()
283         if self.info is not None:
284             return self.info["alias"]
285         return None
286
287     def get_cmdline(self, child: str = None) -> str:
288         cmd = (
289             f"{config.config['smart_lights_tplink_location']} -m {self.mac} "
290             f"--no_logging_console "
291         )
292         if child is not None:
293             cmd += f"-x {child} "
294         return cmd
295
296     def get_children(self) -> List[str]:
297         return self.children
298
299     def command(self, cmd: str, child: str = None, extra_args: str = None) -> bool:
300         cmd = self.get_cmdline(child) + f"-c {cmd}"
301         if extra_args is not None:
302             cmd += f" {extra_args}"
303         logger.debug(f'About to execute {cmd}')
304         return tplink_light_command(cmd)
305
306     @overrides
307     def turn_on(self, child: str = None) -> bool:
308         return self.command("on", child)
309
310     @overrides
311     def turn_off(self, child: str = None) -> bool:
312         return self.command("off", child)
313
314     @overrides
315     def is_on(self) -> bool:
316         self.info = self.get_info()
317         if self.info is None:
318             raise Exception('Unable to get info?')
319         return self.info.get("relay_state", 0) == 1
320
321     @overrides
322     def is_off(self) -> bool:
323         return not self.is_on()
324
325     @overrides
326     def make_color(self, color: str) -> bool:
327         raise NotImplementedError
328
329     @timeout(10.0, use_signals=False, error_message="Timed out waiting for tplink.py")
330     def get_info(self) -> Optional[Dict]:
331         cmd = self.get_cmdline() + "-c info"
332         logger.debug(f'Getting status of {self.mac} via "{cmd}"...')
333         out = subprocess.getoutput(cmd)
334         logger.debug(f'RAW OUT> {out}')
335         out = re.sub("Sent:.*\n", "", out)
336         out = re.sub("Received: *", "", out)
337         try:
338             self.info = json.loads(out)["system"]["get_sysinfo"]
339             logger.debug(json.dumps(self.info, indent=4, sort_keys=True))
340             self.info_ts = datetime.datetime.now()
341             return self.info
342         except Exception as e:
343             logger.exception(e)
344             print(out, file=sys.stderr)
345             self.info = None
346             self.info_ts = None
347             return None
348
349     @overrides
350     def status(self) -> str:
351         ret = ''
352         for k, v in self.get_info().items():
353             ret += f'{k} = {v}\n'
354         return ret
355
356     def get_on_duration_seconds(self, child: str = None) -> int:
357         self.info = self.get_info()
358         if child is None:
359             if self.info is None:
360                 return 0
361             return int(self.info.get("on_time", "0"))
362         else:
363             if self.info is None:
364                 return 0
365             for chi in self.info.get("children", {}):
366                 if chi["id"] == child:
367                     return int(chi.get("on_time", "0"))
368         return 0
369
370     @overrides
371     def get_dimmer_level(self) -> Optional[int]:
372         if not self.has_keyword("dimmer"):
373             return False
374         self.info = self.get_info()
375         if self.info is None:
376             return None
377         return int(self.info.get("brightness", "0"))
378
379     @overrides
380     def set_dimmer_level(self, level: int) -> bool:
381         if not self.has_keyword("dimmer"):
382             return False
383         cmd = (
384             self.get_cmdline()
385             + f'-j \'{{"smartlife.iot.dimmer":{{"set_brightness":{{"brightness":{level} }} }} }}\''
386         )
387         return tplink_light_command(cmd)
388
389
390 # class GoogleLightGroup(GoogleLight):
391 #     def __init__(self, name: str, members: List[GoogleLight], keywords: str = "") -> None:
392 #         if len(members) < 1:
393 #             raise Exception("There must be at least one light in the group.")
394 #         self.members = members
395 #         mac = GoogleLightGroup.make_up_mac(members)
396 #         super().__init__(name, mac, keywords)
397
398 #     @staticmethod
399 #     def make_up_mac(members: List[GoogleLight]):
400 #         mac = members[0].get_mac()
401 #         b = mac.split(':')
402 #         b[5] = int(b[5], 16) + 1
403 #         if b[5] > 255:
404 #             b[5] = 0
405 #         b[5] = str(b[5])
406 #         return ":".join(b)
407
408 #     def is_on(self) -> bool:
409 #         r = ask_google(f"are {self.goog_name()} on?")
410 #         if not r.success:
411 #             return False
412 #         return 'is on' in r.audio_transcription
413
414 #     def get_dimmer_level(self) -> Optional[int]:
415 #         if not self.has_keyword("dimmer"):
416 #             return False
417 #         r = ask_google(f'how bright are {self.goog_name()}?')
418 #         if not r.success:
419 #             return None
420
421 #         # four lights are set to 100% brightness
422 #         txt = r.audio_transcription
423 #         m = re.search(r"(\d+)% bright", txt)
424 #         if m is not None:
425 #             return int(m.group(1))
426 #         if "is off" in txt:
427 #             return 0
428 #         return None
429
430 #     def set_dimmer_level(self, level: int) -> bool:
431 #         if not self.has_keyword("dimmer"):
432 #             return False
433 #         if 0 <= level <= 100:
434 #             was_on = self.is_on()
435 #             r = ask_google(f"set {self.goog_name()} to {level} percent")
436 #             if not r.success:
437 #                 return False
438 #             if not was_on:
439 #                 self.turn_off()
440 #             return True
441 #         return False
442
443 #     def make_color(self, color: str) -> bool:
444 #         return GoogleLight.parse_google_response(
445 #             ask_google(f"make {self.goog_name()} {color}")
446 #         )