Start using warnings from stdlib.
[python_utils.git] / smart_home / lights.py
1 #!/usr/bin/env python3
2
3 """Utilities for dealing with the smart lights."""
4
5 from abc import abstractmethod
6 import datetime
7 import json
8 import logging
9 import os
10 import re
11 import subprocess
12 import sys
13 from typing import Any, Dict, List, Optional, Tuple
14 import warnings
15
16 from overrides import overrides
17 import tinytuya as tt
18
19 import ansi
20 import argparse_utils
21 import arper
22 import config
23 import logging_utils
24 import smart_home.device as dev
25 from google_assistant import ask_google, GoogleResponse
26 from decorator_utils import timeout, memoized
27
28 logger = logging.getLogger(__name__)
29
30 args = config.add_commandline_args(
31     f"Smart Lights ({__file__})",
32     "Args related to smart lights.",
33 )
34 args.add_argument(
35     '--smart_lights_tplink_location',
36     default='/home/scott/bin/tplink.py',
37     metavar='FILENAME',
38     help='The location of the tplink.py helper',
39     type=argparse_utils.valid_filename,
40 )
41
42
43 @timeout(
44     5.0, use_signals=False, error_message="Timed out waiting for tplink.py"
45 )
46 def tplink_light_command(command: str) -> bool:
47     result = os.system(command)
48     signal = result & 0xFF
49     if signal != 0:
50         msg = f'{command} died with signal {signal}'
51         logger.warning(msg)
52         warnings.warn(msg)
53         logging_utils.hlog(msg)
54         return False
55     else:
56         exit_value = result >> 8
57         if exit_value != 0:
58             msg = f'{command} failed, exited {exit_value}'
59             logger.warning(msg)
60             warnings.warn(msg)
61             logging_utils.hlog(msg)
62             return False
63     logger.debug(f'{command} succeeded.')
64     return True
65
66
67 class BaseLight(dev.Device):
68     def __init__(self, name: str, mac: str, keywords: str = "") -> None:
69         super().__init__(name.strip(), mac.strip(), keywords)
70
71     @staticmethod
72     def parse_color_string(color: str) -> Optional[Tuple[int, int, int]]:
73         m = re.match(
74             'r#?([0-9a-fA-F][0-9a-fA-F])([0-9a-fA-F][0-9a-fA-F])([0-9a-fA-F][0-9a-fA-F])',
75             color
76         )
77         if m is not None and len(m.group) == 3:
78             red = int(m.group(0), 16)
79             green = int(m.group(1), 16)
80             blue = int(m.group(2), 16)
81             return (red, green, blue)
82         color = color.lower()
83         return ansi.COLOR_NAMES_TO_RGB.get(color, None)
84
85     @abstractmethod
86     def status(self) -> str:
87         pass
88
89     @abstractmethod
90     def turn_on(self) -> bool:
91         pass
92
93     @abstractmethod
94     def turn_off(self) -> bool:
95         pass
96
97     @abstractmethod
98     def is_on(self) -> bool:
99         pass
100
101     @abstractmethod
102     def is_off(self) -> bool:
103         pass
104
105     @abstractmethod
106     def get_dimmer_level(self) -> Optional[int]:
107         pass
108
109     @abstractmethod
110     def set_dimmer_level(self, level: int) -> bool:
111         pass
112
113     @abstractmethod
114     def make_color(self, color: str) -> bool:
115         pass
116
117
118 class GoogleLight(BaseLight):
119     def __init__(self, name: str, mac: str, keywords: str = "") -> None:
120         super().__init__(name, mac, keywords)
121
122     def goog_name(self) -> str:
123         name = self.get_name()
124         return name.replace("_", " ")
125
126     @staticmethod
127     def parse_google_response(response: GoogleResponse) -> bool:
128         return response.success
129
130     @overrides
131     def turn_on(self) -> bool:
132         return GoogleLight.parse_google_response(
133             ask_google(f"turn {self.goog_name()} on")
134         )
135
136     @overrides
137     def turn_off(self) -> bool:
138         return GoogleLight.parse_google_response(
139             ask_google(f"turn {self.goog_name()} off")
140         )
141
142     @overrides
143     def status(self) -> str:
144         if self.is_on():
145             return 'ON'
146         return 'off'
147
148     @overrides
149     def is_on(self) -> bool:
150         r = ask_google(f"is {self.goog_name()} on?")
151         if not r.success:
152             return False
153         return 'is on' in r.audio_transcription
154
155     @overrides
156     def is_off(self) -> bool:
157         return not self.is_on()
158
159     @overrides
160     def get_dimmer_level(self) -> Optional[int]:
161         if not self.has_keyword("dimmer"):
162             return False
163         r = ask_google(f'how bright is {self.goog_name()}?')
164         if not r.success:
165             return None
166
167         # the bookcase one is set to 40% bright
168         txt = r.audio_transcription
169         m = re.search(r"(\d+)% bright", txt)
170         if m is not None:
171             return int(m.group(1))
172         if "is off" in txt:
173             return 0
174         return None
175
176     @overrides
177     def set_dimmer_level(self, level: int) -> bool:
178         if not self.has_keyword("dimmer"):
179             return False
180         if 0 <= level <= 100:
181             was_on = self.is_on()
182             r = ask_google(f"set {self.goog_name()} to {level} percent")
183             if not r.success:
184                 return False
185             if not was_on:
186                 self.turn_off()
187             return True
188         return False
189
190     @overrides
191     def make_color(self, color: str) -> bool:
192         return GoogleLight.parse_google_response(
193             ask_google(f"make {self.goog_name()} {color}")
194         )
195
196
197 class TuyaLight(BaseLight):
198     ids_by_mac = {
199         '68:C6:3A:DE:1A:94': '8844664268c63ade1a94',
200         '68:C6:3A:DE:27:1A': '8844664268c63ade271a',
201         '68:C6:3A:DE:1D:95': '8844664268c63ade1d95',
202         '68:C6:3A:DE:19:B3': '8844664268c63ade19b3',
203         '80:7D:3A:77:3B:F5': '07445340807d3a773bf5',
204         '80:7D:3A:58:37:02': '07445340807d3a583702',
205     }
206     keys_by_mac = {
207         '68:C6:3A:DE:1A:94': '237f19b1b3d49c36',
208         '68:C6:3A:DE:27:1A': '237f19b1b3d49c36',
209         '68:C6:3A:DE:1D:95': '04b90fc5cd7625d8',
210         '68:C6:3A:DE:19:B3': '2d601f2892f1aefd',
211         '80:7D:3A:77:3B:F5': '27ab921fe4633519',
212         '80:7D:3A:58:37:02': '8559b5416bfa0c05',
213     }
214
215     def __init__(self, name: str, mac: str, keywords: str = "") -> None:
216         super().__init__(name, mac, keywords)
217         mac = mac.upper()
218         if mac not in TuyaLight.ids_by_mac or mac not in TuyaLight.keys_by_mac:
219             raise Exception(f'{mac} is unknown; add it to ids_by_mac and keys_by_mac')
220         self.devid = TuyaLight.ids_by_mac[mac]
221         self.key = TuyaLight.keys_by_mac[mac]
222         self.arper = arper.Arper()
223         ip = self.get_ip()
224         self.bulb = tt.BulbDevice(self.devid, ip, local_key=self.key)
225
226     def get_status(self) -> Dict[str, Any]:
227         return self.bulb.status()
228
229     @overrides
230     def status(self) -> str:
231         ret = ''
232         for k, v in self.bulb.status().items():
233             ret += f'{k} = {v}\n'
234         return ret
235
236     @overrides
237     def turn_on(self) -> bool:
238         self.bulb.turn_on()
239         return True
240
241     @overrides
242     def turn_off(self) -> bool:
243         self.bulb.turn_off()
244         return True
245
246     @overrides
247     def is_on(self) -> bool:
248         s = self.get_status()
249         return s['dps']['1']
250
251     @overrides
252     def is_off(self) -> bool:
253         return not self.is_on()
254
255     @overrides
256     def get_dimmer_level(self) -> Optional[int]:
257         s = self.get_status()
258         return s['dps']['3']
259
260     @overrides
261     def set_dimmer_level(self, level: int) -> bool:
262         logger.debug(f'Setting brightness to {level}')
263         self.bulb.set_brightness(level)
264         return True
265
266     @overrides
267     def make_color(self, color: str) -> bool:
268         rgb = BaseLight.parse_color_string(color)
269         logger.debug(f'Light color: {color} -> {rgb}')
270         if rgb is not None:
271             self.bulb.set_colour(rgb[0], rgb[1], rgb[2])
272             return True
273         return False
274
275
276 class TPLinkLight(BaseLight):
277     def __init__(self, name: str, mac: str, keywords: str = "") -> None:
278         super().__init__(name, mac, keywords)
279         self.children: List[str] = []
280         self.info: Optional[Dict] = None
281         self.info_ts: Optional[datetime.datetime] = None
282         if "children" in self.keywords:
283             self.info = self.get_info()
284             if self.info is not None:
285                 for child in self.info["children"]:
286                     self.children.append(child["id"])
287
288     @memoized
289     def get_tplink_name(self) -> Optional[str]:
290         self.info = self.get_info()
291         if self.info is not None:
292             return self.info["alias"]
293         return None
294
295     def get_cmdline(self, child: str = None) -> str:
296         cmd = (
297             f"{config.config['smart_lights_tplink_location']} -m {self.mac} "
298             f"--no_logging_console "
299         )
300         if child is not None:
301             cmd += f"-x {child} "
302         return cmd
303
304     def get_children(self) -> List[str]:
305         return self.children
306
307     def command(
308         self, cmd: str, child: str = None, extra_args: str = None
309     ) -> bool:
310         cmd = self.get_cmdline(child) + f"-c {cmd}"
311         if extra_args is not None:
312             cmd += f" {extra_args}"
313         logger.debug(f'About to execute {cmd}')
314         return tplink_light_command(cmd)
315
316     @overrides
317     def turn_on(self, child: str = None) -> bool:
318         return self.command("on", child)
319
320     @overrides
321     def turn_off(self, child: str = None) -> bool:
322         return self.command("off", child)
323
324     @overrides
325     def is_on(self) -> bool:
326         self.info = self.get_info()
327         if self.info is None:
328             raise Exception('Unable to get info?')
329         return self.info.get("relay_state", "0") == "1"
330
331     @overrides
332     def is_off(self) -> bool:
333         return not self.is_on()
334
335     @overrides
336     def make_color(self, color: str) -> bool:
337         raise NotImplementedError
338
339     @timeout(
340         10.0, use_signals=False, error_message="Timed out waiting for tplink.py"
341     )
342     def get_info(self) -> Optional[Dict]:
343         cmd = self.get_cmdline() + "-c info"
344         out = subprocess.getoutput(cmd)
345         logger.debug(f'RAW OUT> {out}')
346         out = re.sub("Sent:.*\n", "", out)
347         out = re.sub("Received: *", "", out)
348         try:
349             self.info = json.loads(out)["system"]["get_sysinfo"]
350             self.info_ts = datetime.datetime.now()
351             return self.info
352         except Exception as e:
353             logger.exception(e)
354             print(out, file=sys.stderr)
355             self.info = None
356             self.info_ts = None
357             return None
358
359     @overrides
360     def status(self) -> str:
361         ret = ''
362         for k, v in self.get_info().items():
363             ret += f'{k} = {v}\n'
364         return ret
365
366     def get_on_duration_seconds(self, child: str = None) -> int:
367         self.info = self.get_info()
368         if child is None:
369             if self.info is None:
370                 return 0
371             return int(self.info.get("on_time", "0"))
372         else:
373             if self.info is None:
374                 return 0
375             for chi in self.info.get("children", {}):
376                 if chi["id"] == child:
377                     return int(chi.get("on_time", "0"))
378         return 0
379
380     @overrides
381     def get_dimmer_level(self) -> Optional[int]:
382         if not self.has_keyword("dimmer"):
383             return False
384         self.info = self.get_info()
385         if self.info is None:
386             return None
387         return int(self.info.get("brightness", "0"))
388
389     @overrides
390     def set_dimmer_level(self, level: int) -> bool:
391         if not self.has_keyword("dimmer"):
392             return False
393         cmd = (
394             self.get_cmdline()
395             + f'-j \'{{"smartlife.iot.dimmer":{{"set_brightness":{{"brightness":{level} }} }} }}\''
396         )
397         return tplink_light_command(cmd)
398
399
400 # class GoogleLightGroup(GoogleLight):
401 #     def __init__(self, name: str, members: List[GoogleLight], keywords: str = "") -> None:
402 #         if len(members) < 1:
403 #             raise Exception("There must be at least one light in the group.")
404 #         self.members = members
405 #         mac = GoogleLightGroup.make_up_mac(members)
406 #         super().__init__(name, mac, keywords)
407
408 #     @staticmethod
409 #     def make_up_mac(members: List[GoogleLight]):
410 #         mac = members[0].get_mac()
411 #         b = mac.split(':')
412 #         b[5] = int(b[5], 16) + 1
413 #         if b[5] > 255:
414 #             b[5] = 0
415 #         b[5] = str(b[5])
416 #         return ":".join(b)
417
418 #     def is_on(self) -> bool:
419 #         r = ask_google(f"are {self.goog_name()} on?")
420 #         if not r.success:
421 #             return False
422 #         return 'is on' in r.audio_transcription
423
424 #     def get_dimmer_level(self) -> Optional[int]:
425 #         if not self.has_keyword("dimmer"):
426 #             return False
427 #         r = ask_google(f'how bright are {self.goog_name()}?')
428 #         if not r.success:
429 #             return None
430
431 #         # four lights are set to 100% brightness
432 #         txt = r.audio_transcription
433 #         m = re.search(r"(\d+)% bright", txt)
434 #         if m is not None:
435 #             return int(m.group(1))
436 #         if "is off" in txt:
437 #             return 0
438 #         return None
439
440 #     def set_dimmer_level(self, level: int) -> bool:
441 #         if not self.has_keyword("dimmer"):
442 #             return False
443 #         if 0 <= level <= 100:
444 #             was_on = self.is_on()
445 #             r = ask_google(f"set {self.goog_name()} to {level} percent")
446 #             if not r.success:
447 #                 return False
448 #             if not was_on:
449 #                 self.turn_off()
450 #             return True
451 #         return False
452
453 #     def make_color(self, color: str) -> bool:
454 #         return GoogleLight.parse_google_response(
455 #             ask_google(f"make {self.goog_name()} {color}")
456 #         )