Scale back warnings.warn and add stacklevels= where appropriate.
[python_utils.git] / smart_home / lights.py
1 #!/usr/bin/env python3
2
3 """Utilities for dealing with the smart lights."""
4
5 from abc import abstractmethod
6 import datetime
7 import json
8 import logging
9 import os
10 import re
11 import subprocess
12 import sys
13 from typing import Any, Dict, List, Optional, Tuple
14
15 from overrides import overrides
16 import tinytuya as tt
17
18 import ansi
19 import argparse_utils
20 import arper
21 import config
22 import logging_utils
23 import smart_home.device as dev
24 from google_assistant import ask_google, GoogleResponse
25 from decorator_utils import timeout, memoized
26
27 logger = logging.getLogger(__name__)
28
29 args = config.add_commandline_args(
30     f"Smart Lights ({__file__})",
31     "Args related to smart lights.",
32 )
33 args.add_argument(
34     '--smart_lights_tplink_location',
35     default='/home/scott/bin/tplink.py',
36     metavar='FILENAME',
37     help='The location of the tplink.py helper',
38     type=argparse_utils.valid_filename,
39 )
40
41
42 @timeout(
43     5.0, use_signals=False, error_message="Timed out waiting for tplink.py"
44 )
45 def tplink_light_command(command: str) -> bool:
46     result = os.system(command)
47     signal = result & 0xFF
48     if signal != 0:
49         msg = f'{command} died with signal {signal}'
50         logger.warning(msg)
51         logging_utils.hlog(msg)
52         return False
53     else:
54         exit_value = result >> 8
55         if exit_value != 0:
56             msg = f'{command} failed, exited {exit_value}'
57             logger.warning(msg)
58             logging_utils.hlog(msg)
59             return False
60     logger.debug(f'{command} succeeded.')
61     return True
62
63
64 class BaseLight(dev.Device):
65     def __init__(self, name: str, mac: str, keywords: str = "") -> None:
66         super().__init__(name.strip(), mac.strip(), keywords)
67
68     @staticmethod
69     def parse_color_string(color: str) -> Optional[Tuple[int, int, int]]:
70         m = re.match(
71             'r#?([0-9a-fA-F][0-9a-fA-F])([0-9a-fA-F][0-9a-fA-F])([0-9a-fA-F][0-9a-fA-F])',
72             color
73         )
74         if m is not None and len(m.group) == 3:
75             red = int(m.group(0), 16)
76             green = int(m.group(1), 16)
77             blue = int(m.group(2), 16)
78             return (red, green, blue)
79         color = color.lower()
80         return ansi.COLOR_NAMES_TO_RGB.get(color, None)
81
82     @abstractmethod
83     def status(self) -> str:
84         pass
85
86     @abstractmethod
87     def turn_on(self) -> bool:
88         pass
89
90     @abstractmethod
91     def turn_off(self) -> bool:
92         pass
93
94     @abstractmethod
95     def is_on(self) -> bool:
96         pass
97
98     @abstractmethod
99     def is_off(self) -> bool:
100         pass
101
102     @abstractmethod
103     def get_dimmer_level(self) -> Optional[int]:
104         pass
105
106     @abstractmethod
107     def set_dimmer_level(self, level: int) -> bool:
108         pass
109
110     @abstractmethod
111     def make_color(self, color: str) -> bool:
112         pass
113
114
115 class GoogleLight(BaseLight):
116     def __init__(self, name: str, mac: str, keywords: str = "") -> None:
117         super().__init__(name, mac, keywords)
118
119     def goog_name(self) -> str:
120         name = self.get_name()
121         return name.replace("_", " ")
122
123     @staticmethod
124     def parse_google_response(response: GoogleResponse) -> bool:
125         return response.success
126
127     @overrides
128     def turn_on(self) -> bool:
129         return GoogleLight.parse_google_response(
130             ask_google(f"turn {self.goog_name()} on")
131         )
132
133     @overrides
134     def turn_off(self) -> bool:
135         return GoogleLight.parse_google_response(
136             ask_google(f"turn {self.goog_name()} off")
137         )
138
139     @overrides
140     def status(self) -> str:
141         if self.is_on():
142             return 'ON'
143         return 'off'
144
145     @overrides
146     def is_on(self) -> bool:
147         r = ask_google(f"is {self.goog_name()} on?")
148         if not r.success:
149             return False
150         return 'is on' in r.audio_transcription
151
152     @overrides
153     def is_off(self) -> bool:
154         return not self.is_on()
155
156     @overrides
157     def get_dimmer_level(self) -> Optional[int]:
158         if not self.has_keyword("dimmer"):
159             return False
160         r = ask_google(f'how bright is {self.goog_name()}?')
161         if not r.success:
162             return None
163
164         # the bookcase one is set to 40% bright
165         txt = r.audio_transcription
166         m = re.search(r"(\d+)% bright", txt)
167         if m is not None:
168             return int(m.group(1))
169         if "is off" in txt:
170             return 0
171         return None
172
173     @overrides
174     def set_dimmer_level(self, level: int) -> bool:
175         if not self.has_keyword("dimmer"):
176             return False
177         if 0 <= level <= 100:
178             was_on = self.is_on()
179             r = ask_google(f"set {self.goog_name()} to {level} percent")
180             if not r.success:
181                 return False
182             if not was_on:
183                 self.turn_off()
184             return True
185         return False
186
187     @overrides
188     def make_color(self, color: str) -> bool:
189         return GoogleLight.parse_google_response(
190             ask_google(f"make {self.goog_name()} {color}")
191         )
192
193
194 class TuyaLight(BaseLight):
195     ids_by_mac = {
196         '68:C6:3A:DE:1A:94': '8844664268c63ade1a94',
197         '68:C6:3A:DE:27:1A': '8844664268c63ade271a',
198         '68:C6:3A:DE:1D:95': '8844664268c63ade1d95',
199         '68:C6:3A:DE:19:B3': '8844664268c63ade19b3',
200         '80:7D:3A:77:3B:F5': '07445340807d3a773bf5',
201         '80:7D:3A:58:37:02': '07445340807d3a583702',
202     }
203     keys_by_mac = {
204         '68:C6:3A:DE:1A:94': '237f19b1b3d49c36',
205         '68:C6:3A:DE:27:1A': '237f19b1b3d49c36',
206         '68:C6:3A:DE:1D:95': '04b90fc5cd7625d8',
207         '68:C6:3A:DE:19:B3': '2d601f2892f1aefd',
208         '80:7D:3A:77:3B:F5': '27ab921fe4633519',
209         '80:7D:3A:58:37:02': '8559b5416bfa0c05',
210     }
211
212     def __init__(self, name: str, mac: str, keywords: str = "") -> None:
213         super().__init__(name, mac, keywords)
214         mac = mac.upper()
215         if mac not in TuyaLight.ids_by_mac or mac not in TuyaLight.keys_by_mac:
216             raise Exception(f'{mac} is unknown; add it to ids_by_mac and keys_by_mac')
217         self.devid = TuyaLight.ids_by_mac[mac]
218         self.key = TuyaLight.keys_by_mac[mac]
219         self.arper = arper.Arper()
220         ip = self.get_ip()
221         self.bulb = tt.BulbDevice(self.devid, ip, local_key=self.key)
222
223     def get_status(self) -> Dict[str, Any]:
224         return self.bulb.status()
225
226     @overrides
227     def status(self) -> str:
228         ret = ''
229         for k, v in self.bulb.status().items():
230             ret += f'{k} = {v}\n'
231         return ret
232
233     @overrides
234     def turn_on(self) -> bool:
235         self.bulb.turn_on()
236         return True
237
238     @overrides
239     def turn_off(self) -> bool:
240         self.bulb.turn_off()
241         return True
242
243     @overrides
244     def is_on(self) -> bool:
245         s = self.get_status()
246         return s['dps']['1']
247
248     @overrides
249     def is_off(self) -> bool:
250         return not self.is_on()
251
252     @overrides
253     def get_dimmer_level(self) -> Optional[int]:
254         s = self.get_status()
255         return s['dps']['3']
256
257     @overrides
258     def set_dimmer_level(self, level: int) -> bool:
259         logger.debug(f'Setting brightness to {level}')
260         self.bulb.set_brightness(level)
261         return True
262
263     @overrides
264     def make_color(self, color: str) -> bool:
265         rgb = BaseLight.parse_color_string(color)
266         logger.debug(f'Light color: {color} -> {rgb}')
267         if rgb is not None:
268             self.bulb.set_colour(rgb[0], rgb[1], rgb[2])
269             return True
270         return False
271
272
273 class TPLinkLight(BaseLight):
274     def __init__(self, name: str, mac: str, keywords: str = "") -> None:
275         super().__init__(name, mac, keywords)
276         self.children: List[str] = []
277         self.info: Optional[Dict] = None
278         self.info_ts: Optional[datetime.datetime] = None
279         if "children" in self.keywords:
280             self.info = self.get_info()
281             if self.info is not None:
282                 for child in self.info["children"]:
283                     self.children.append(child["id"])
284
285     @memoized
286     def get_tplink_name(self) -> Optional[str]:
287         self.info = self.get_info()
288         if self.info is not None:
289             return self.info["alias"]
290         return None
291
292     def get_cmdline(self, child: str = None) -> str:
293         cmd = (
294             f"{config.config['smart_lights_tplink_location']} -m {self.mac} "
295             f"--no_logging_console "
296         )
297         if child is not None:
298             cmd += f"-x {child} "
299         return cmd
300
301     def get_children(self) -> List[str]:
302         return self.children
303
304     def command(
305         self, cmd: str, child: str = None, extra_args: str = None
306     ) -> bool:
307         cmd = self.get_cmdline(child) + f"-c {cmd}"
308         if extra_args is not None:
309             cmd += f" {extra_args}"
310         logger.debug(f'About to execute {cmd}')
311         return tplink_light_command(cmd)
312
313     @overrides
314     def turn_on(self, child: str = None) -> bool:
315         return self.command("on", child)
316
317     @overrides
318     def turn_off(self, child: str = None) -> bool:
319         return self.command("off", child)
320
321     @overrides
322     def is_on(self) -> bool:
323         self.info = self.get_info()
324         if self.info is None:
325             raise Exception('Unable to get info?')
326         return self.info.get("relay_state", "0") == "1"
327
328     @overrides
329     def is_off(self) -> bool:
330         return not self.is_on()
331
332     @overrides
333     def make_color(self, color: str) -> bool:
334         raise NotImplementedError
335
336     @timeout(
337         10.0, use_signals=False, error_message="Timed out waiting for tplink.py"
338     )
339     def get_info(self) -> Optional[Dict]:
340         cmd = self.get_cmdline() + "-c info"
341         out = subprocess.getoutput(cmd)
342         logger.debug(f'RAW OUT> {out}')
343         out = re.sub("Sent:.*\n", "", out)
344         out = re.sub("Received: *", "", out)
345         try:
346             self.info = json.loads(out)["system"]["get_sysinfo"]
347             self.info_ts = datetime.datetime.now()
348             return self.info
349         except Exception as e:
350             logger.exception(e)
351             print(out, file=sys.stderr)
352             self.info = None
353             self.info_ts = None
354             return None
355
356     @overrides
357     def status(self) -> str:
358         ret = ''
359         for k, v in self.get_info().items():
360             ret += f'{k} = {v}\n'
361         return ret
362
363     def get_on_duration_seconds(self, child: str = None) -> int:
364         self.info = self.get_info()
365         if child is None:
366             if self.info is None:
367                 return 0
368             return int(self.info.get("on_time", "0"))
369         else:
370             if self.info is None:
371                 return 0
372             for chi in self.info.get("children", {}):
373                 if chi["id"] == child:
374                     return int(chi.get("on_time", "0"))
375         return 0
376
377     @overrides
378     def get_dimmer_level(self) -> Optional[int]:
379         if not self.has_keyword("dimmer"):
380             return False
381         self.info = self.get_info()
382         if self.info is None:
383             return None
384         return int(self.info.get("brightness", "0"))
385
386     @overrides
387     def set_dimmer_level(self, level: int) -> bool:
388         if not self.has_keyword("dimmer"):
389             return False
390         cmd = (
391             self.get_cmdline()
392             + f'-j \'{{"smartlife.iot.dimmer":{{"set_brightness":{{"brightness":{level} }} }} }}\''
393         )
394         return tplink_light_command(cmd)
395
396
397 # class GoogleLightGroup(GoogleLight):
398 #     def __init__(self, name: str, members: List[GoogleLight], keywords: str = "") -> None:
399 #         if len(members) < 1:
400 #             raise Exception("There must be at least one light in the group.")
401 #         self.members = members
402 #         mac = GoogleLightGroup.make_up_mac(members)
403 #         super().__init__(name, mac, keywords)
404
405 #     @staticmethod
406 #     def make_up_mac(members: List[GoogleLight]):
407 #         mac = members[0].get_mac()
408 #         b = mac.split(':')
409 #         b[5] = int(b[5], 16) + 1
410 #         if b[5] > 255:
411 #             b[5] = 0
412 #         b[5] = str(b[5])
413 #         return ":".join(b)
414
415 #     def is_on(self) -> bool:
416 #         r = ask_google(f"are {self.goog_name()} on?")
417 #         if not r.success:
418 #             return False
419 #         return 'is on' in r.audio_transcription
420
421 #     def get_dimmer_level(self) -> Optional[int]:
422 #         if not self.has_keyword("dimmer"):
423 #             return False
424 #         r = ask_google(f'how bright are {self.goog_name()}?')
425 #         if not r.success:
426 #             return None
427
428 #         # four lights are set to 100% brightness
429 #         txt = r.audio_transcription
430 #         m = re.search(r"(\d+)% bright", txt)
431 #         if m is not None:
432 #             return int(m.group(1))
433 #         if "is off" in txt:
434 #             return 0
435 #         return None
436
437 #     def set_dimmer_level(self, level: int) -> bool:
438 #         if not self.has_keyword("dimmer"):
439 #             return False
440 #         if 0 <= level <= 100:
441 #             was_on = self.is_on()
442 #             r = ask_google(f"set {self.goog_name()} to {level} percent")
443 #             if not r.success:
444 #                 return False
445 #             if not was_on:
446 #                 self.turn_off()
447 #             return True
448 #         return False
449
450 #     def make_color(self, color: str) -> bool:
451 #         return GoogleLight.parse_google_response(
452 #             ask_google(f"make {self.goog_name()} {color}")
453 #         )