More cleanup.
[python_utils.git] / smart_home / lights.py
1 #!/usr/bin/env python3
2
3 """Utilities for dealing with the smart lights."""
4
5 import datetime
6 import json
7 import logging
8 import os
9 import re
10 import subprocess
11 import sys
12 from abc import abstractmethod
13 from typing import Any, Dict, List, Optional, Tuple
14
15 import tinytuya as tt
16 from overrides import overrides
17
18 import ansi
19 import argparse_utils
20 import arper
21 import config
22 import logging_utils
23 import smart_home.device as dev
24 from decorator_utils import memoized, timeout
25 from google_assistant import GoogleResponse, ask_google
26
27 logger = logging.getLogger(__name__)
28
29 args = config.add_commandline_args(
30     f"Smart Lights ({__file__})",
31     "Args related to smart lights.",
32 )
33 args.add_argument(
34     '--smart_lights_tplink_location',
35     default='/home/scott/bin/tplink.py',
36     metavar='FILENAME',
37     help='The location of the tplink.py helper',
38     type=argparse_utils.valid_filename,
39 )
40
41
42 @timeout(5.0, use_signals=False, error_message="Timed out waiting for tplink.py")
43 def tplink_light_command(command: str) -> bool:
44     result = os.system(command)
45     signal = result & 0xFF
46     if signal != 0:
47         msg = f'{command} died with signal {signal}'
48         logger.warning(msg)
49         logging_utils.hlog(msg)
50         return False
51     else:
52         exit_value = result >> 8
53         if exit_value != 0:
54             msg = f'{command} failed, exited {exit_value}'
55             logger.warning(msg)
56             logging_utils.hlog(msg)
57             return False
58     logger.debug('%s succeeded.', command)
59     return True
60
61
62 class BaseLight(dev.Device):
63     """A base class representing a smart light."""
64
65     def __init__(self, name: str, mac: str, keywords: str = "") -> None:
66         super().__init__(name.strip(), mac.strip(), keywords)
67
68     @staticmethod
69     def parse_color_string(color: str) -> Optional[Tuple[int, int, int]]:
70         m = re.match(
71             'r#?([0-9a-fA-F][0-9a-fA-F])([0-9a-fA-F][0-9a-fA-F])([0-9a-fA-F][0-9a-fA-F])',
72             color,
73         )
74         if m is not None and len(m.groups()) == 3:
75             red = int(m.group(0), 16)
76             green = int(m.group(1), 16)
77             blue = int(m.group(2), 16)
78             return (red, green, blue)
79         color = color.lower()
80         return ansi.COLOR_NAMES_TO_RGB.get(color, None)
81
82     @abstractmethod
83     def status(self) -> str:
84         pass
85
86     @abstractmethod
87     def turn_on(self) -> bool:
88         pass
89
90     @abstractmethod
91     def turn_off(self) -> bool:
92         pass
93
94     @abstractmethod
95     def is_on(self) -> bool:
96         pass
97
98     @abstractmethod
99     def is_off(self) -> bool:
100         pass
101
102     @abstractmethod
103     def get_dimmer_level(self) -> Optional[int]:
104         pass
105
106     @abstractmethod
107     def set_dimmer_level(self, level: int) -> bool:
108         pass
109
110     @abstractmethod
111     def make_color(self, color: str) -> bool:
112         pass
113
114
115 class GoogleLight(BaseLight):
116     """A smart light controlled by talking to Google."""
117
118     def goog_name(self) -> str:
119         name = self.get_name()
120         return name.replace("_", " ")
121
122     @staticmethod
123     def parse_google_response(response: GoogleResponse) -> bool:
124         return response.success
125
126     @overrides
127     def turn_on(self) -> bool:
128         return GoogleLight.parse_google_response(ask_google(f"turn {self.goog_name()} on"))
129
130     @overrides
131     def turn_off(self) -> bool:
132         return GoogleLight.parse_google_response(ask_google(f"turn {self.goog_name()} off"))
133
134     @overrides
135     def status(self) -> str:
136         if self.is_on():
137             return 'ON'
138         return 'off'
139
140     @overrides
141     def is_on(self) -> bool:
142         r = ask_google(f"is {self.goog_name()} on?")
143         if not r.success:
144             return False
145         if r.audio_transcription is not None:
146             return 'is on' in r.audio_transcription
147         raise Exception("Can't reach Google?!")
148
149     @overrides
150     def is_off(self) -> bool:
151         return not self.is_on()
152
153     @overrides
154     def get_dimmer_level(self) -> Optional[int]:
155         if not self.has_keyword("dimmer"):
156             return False
157         r = ask_google(f'how bright is {self.goog_name()}?')
158         if not r.success:
159             return None
160
161         # the bookcase one is set to 40% bright
162         txt = r.audio_transcription
163         if txt is not None:
164             m = re.search(r"(\d+)% bright", txt)
165             if m is not None:
166                 return int(m.group(1))
167             if "is off" in txt:
168                 return 0
169         return None
170
171     @overrides
172     def set_dimmer_level(self, level: int) -> bool:
173         if not self.has_keyword("dimmer"):
174             return False
175         if 0 <= level <= 100:
176             was_on = self.is_on()
177             r = ask_google(f"set {self.goog_name()} to {level} percent")
178             if not r.success:
179                 return False
180             if not was_on:
181                 self.turn_off()
182             return True
183         return False
184
185     @overrides
186     def make_color(self, color: str) -> bool:
187         return GoogleLight.parse_google_response(ask_google(f"make {self.goog_name()} {color}"))
188
189
190 class TuyaLight(BaseLight):
191     """A Tuya smart light."""
192
193     ids_by_mac = {
194         '68:C6:3A:DE:1A:94': '8844664268c63ade1a94',
195         '68:C6:3A:DE:27:1A': '8844664268c63ade271a',
196         '68:C6:3A:DE:1D:95': '8844664268c63ade1d95',
197         '68:C6:3A:DE:19:B3': '8844664268c63ade19b3',
198         '80:7D:3A:77:3B:F5': '07445340807d3a773bf5',
199         '80:7D:3A:58:37:02': '07445340807d3a583702',
200     }
201     keys_by_mac = {
202         '68:C6:3A:DE:1A:94': '237f19b1b3d49c36',
203         '68:C6:3A:DE:27:1A': '237f19b1b3d49c36',
204         '68:C6:3A:DE:1D:95': '04b90fc5cd7625d8',
205         '68:C6:3A:DE:19:B3': '2d601f2892f1aefd',
206         '80:7D:3A:77:3B:F5': '27ab921fe4633519',
207         '80:7D:3A:58:37:02': '8559b5416bfa0c05',
208     }
209
210     def __init__(self, name: str, mac: str, keywords: str = "") -> None:
211         super().__init__(name, mac, keywords)
212         mac = mac.upper()
213         if mac not in TuyaLight.ids_by_mac or mac not in TuyaLight.keys_by_mac:
214             raise Exception(f'{mac} is unknown; add it to ids_by_mac and keys_by_mac')
215         self.devid = TuyaLight.ids_by_mac[mac]
216         self.key = TuyaLight.keys_by_mac[mac]
217         self.arper = arper.Arper()
218         ip = self.get_ip()
219         self.bulb = tt.BulbDevice(self.devid, ip, local_key=self.key)
220
221     def get_status(self) -> Dict[str, Any]:
222         return self.bulb.status()
223
224     @overrides
225     def status(self) -> str:
226         ret = ''
227         for k, v in self.bulb.status().items():
228             ret += f'{k} = {v}\n'
229         return ret
230
231     @overrides
232     def turn_on(self) -> bool:
233         self.bulb.turn_on()
234         return True
235
236     @overrides
237     def turn_off(self) -> bool:
238         self.bulb.turn_off()
239         return True
240
241     @overrides
242     def is_on(self) -> bool:
243         s = self.get_status()
244         return s['dps']['1']
245
246     @overrides
247     def is_off(self) -> bool:
248         return not self.is_on()
249
250     @overrides
251     def get_dimmer_level(self) -> Optional[int]:
252         s = self.get_status()
253         return s['dps']['3']
254
255     @overrides
256     def set_dimmer_level(self, level: int) -> bool:
257         logger.debug('Setting brightness to %d', level)
258         self.bulb.set_brightness(level)
259         return True
260
261     @overrides
262     def make_color(self, color: str) -> bool:
263         rgb = BaseLight.parse_color_string(color)
264         logger.debug('Light color: %s -> %s', color, rgb)
265         if rgb is not None:
266             self.bulb.set_colour(rgb[0], rgb[1], rgb[2])
267             return True
268         return False
269
270
271 class TPLinkLight(BaseLight):
272     """A TPLink smart light."""
273
274     def __init__(self, name: str, mac: str, keywords: str = "") -> None:
275         super().__init__(name, mac, keywords)
276         self.children: List[str] = []
277         self.info: Optional[Dict] = None
278         self.info_ts: Optional[datetime.datetime] = None
279         if self.keywords is not None:
280             if "children" in self.keywords:
281                 self.info = self.get_info()
282                 if self.info is not None:
283                     for child in self.info["children"]:
284                         self.children.append(child["id"])
285
286     @memoized
287     def get_tplink_name(self) -> Optional[str]:
288         self.info = self.get_info()
289         if self.info is not None:
290             return self.info["alias"]
291         return None
292
293     def get_cmdline(self, child: str = None) -> str:
294         cmd = (
295             f"{config.config['smart_lights_tplink_location']} -m {self.mac} "
296             f"--no_logging_console "
297         )
298         if child is not None:
299             cmd += f"-x {child} "
300         return cmd
301
302     def get_children(self) -> List[str]:
303         return self.children
304
305     def command(self, cmd: str, child: str = None, extra_args: str = None) -> bool:
306         cmd = self.get_cmdline(child) + f"-c {cmd}"
307         if extra_args is not None:
308             cmd += f" {extra_args}"
309         logger.debug('About to execute: %s', cmd)
310         return tplink_light_command(cmd)
311
312     @overrides
313     def turn_on(self, child: str = None) -> bool:
314         return self.command("on", child)
315
316     @overrides
317     def turn_off(self, child: str = None) -> bool:
318         return self.command("off", child)
319
320     @overrides
321     def is_on(self) -> bool:
322         self.info = self.get_info()
323         if self.info is None:
324             raise Exception('Unable to get info?')
325         return self.info.get("relay_state", 0) == 1
326
327     @overrides
328     def is_off(self) -> bool:
329         return not self.is_on()
330
331     @overrides
332     def make_color(self, color: str) -> bool:
333         raise NotImplementedError
334
335     @timeout(10.0, use_signals=False, error_message="Timed out waiting for tplink.py")
336     def get_info(self) -> Optional[Dict]:
337         cmd = self.get_cmdline() + "-c info"
338         logger.debug('Getting status of %s via "%s"...', self.mac, cmd)
339         out = subprocess.getoutput(cmd)
340         logger.debug('RAW OUT> %s', out)
341         out = re.sub("Sent:.*\n", "", out)
342         out = re.sub("Received: *", "", out)
343         try:
344             self.info = json.loads(out)["system"]["get_sysinfo"]
345             logger.debug("%s", json.dumps(self.info, indent=4, sort_keys=True))
346             self.info_ts = datetime.datetime.now()
347             return self.info
348         except Exception as e:
349             logger.exception(e)
350             print(out, file=sys.stderr)
351             self.info = None
352             self.info_ts = None
353             return None
354
355     @overrides
356     def status(self) -> str:
357         ret = ''
358         for k, v in self.get_info().items():
359             ret += f'{k} = {v}\n'
360         return ret
361
362     def get_on_duration_seconds(self, child: str = None) -> int:
363         self.info = self.get_info()
364         if child is None:
365             if self.info is None:
366                 return 0
367             return int(self.info.get("on_time", "0"))
368         else:
369             if self.info is None:
370                 return 0
371             for chi in self.info.get("children", {}):
372                 if chi["id"] == child:
373                     return int(chi.get("on_time", "0"))
374         return 0
375
376     @overrides
377     def get_dimmer_level(self) -> Optional[int]:
378         if not self.has_keyword("dimmer"):
379             return False
380         self.info = self.get_info()
381         if self.info is None:
382             return None
383         return int(self.info.get("brightness", "0"))
384
385     @overrides
386     def set_dimmer_level(self, level: int) -> bool:
387         if not self.has_keyword("dimmer"):
388             return False
389         cmd = (
390             self.get_cmdline()
391             + f'-j \'{{"smartlife.iot.dimmer":{{"set_brightness":{{"brightness":{level} }} }} }}\''
392         )
393         return tplink_light_command(cmd)
394
395
396 # class GoogleLightGroup(GoogleLight):
397 #     def __init__(self, name: str, members: List[GoogleLight], keywords: str = "") -> None:
398 #         if len(members) < 1:
399 #             raise Exception("There must be at least one light in the group.")
400 #         self.members = members
401 #         mac = GoogleLightGroup.make_up_mac(members)
402 #         super().__init__(name, mac, keywords)
403
404 #     @staticmethod
405 #     def make_up_mac(members: List[GoogleLight]):
406 #         mac = members[0].get_mac()
407 #         b = mac.split(':')
408 #         b[5] = int(b[5], 16) + 1
409 #         if b[5] > 255:
410 #             b[5] = 0
411 #         b[5] = str(b[5])
412 #         return ":".join(b)
413
414 #     def is_on(self) -> bool:
415 #         r = ask_google(f"are {self.goog_name()} on?")
416 #         if not r.success:
417 #             return False
418 #         return 'is on' in r.audio_transcription
419
420 #     def get_dimmer_level(self) -> Optional[int]:
421 #         if not self.has_keyword("dimmer"):
422 #             return False
423 #         r = ask_google(f'how bright are {self.goog_name()}?')
424 #         if not r.success:
425 #             return None
426
427 #         # four lights are set to 100% brightness
428 #         txt = r.audio_transcription
429 #         m = re.search(r"(\d+)% bright", txt)
430 #         if m is not None:
431 #             return int(m.group(1))
432 #         if "is off" in txt:
433 #             return 0
434 #         return None
435
436 #     def set_dimmer_level(self, level: int) -> bool:
437 #         if not self.has_keyword("dimmer"):
438 #             return False
439 #         if 0 <= level <= 100:
440 #             was_on = self.is_on()
441 #             r = ask_google(f"set {self.goog_name()} to {level} percent")
442 #             if not r.success:
443 #                 return False
444 #             if not was_on:
445 #                 self.turn_off()
446 #             return True
447 #         return False
448
449 #     def make_color(self, color: str) -> bool:
450 #         return GoogleLight.parse_google_response(
451 #             ask_google(f"make {self.goog_name()} {color}")
452 #         )