Change settings in flake8 and black.
[python_utils.git] / smart_home / lights.py
1 #!/usr/bin/env python3
2
3 """Utilities for dealing with the smart lights."""
4
5 import datetime
6 import json
7 import logging
8 import os
9 import re
10 import subprocess
11 import sys
12 from abc import abstractmethod
13 from typing import Any, Dict, List, Optional, Tuple
14
15 import tinytuya as tt
16 from overrides import overrides
17
18 import ansi
19 import argparse_utils
20 import arper
21 import config
22 import logging_utils
23 import smart_home.device as dev
24 from decorator_utils import memoized, timeout
25 from google_assistant import GoogleResponse, ask_google
26
27 logger = logging.getLogger(__name__)
28
29 args = config.add_commandline_args(
30     f"Smart Lights ({__file__})",
31     "Args related to smart lights.",
32 )
33 args.add_argument(
34     '--smart_lights_tplink_location',
35     default='/home/scott/bin/tplink.py',
36     metavar='FILENAME',
37     help='The location of the tplink.py helper',
38     type=argparse_utils.valid_filename,
39 )
40
41
42 @timeout(5.0, use_signals=False, error_message="Timed out waiting for tplink.py")
43 def tplink_light_command(command: str) -> bool:
44     result = os.system(command)
45     signal = result & 0xFF
46     if signal != 0:
47         msg = f'{command} died with signal {signal}'
48         logger.warning(msg)
49         logging_utils.hlog(msg)
50         return False
51     else:
52         exit_value = result >> 8
53         if exit_value != 0:
54             msg = f'{command} failed, exited {exit_value}'
55             logger.warning(msg)
56             logging_utils.hlog(msg)
57             return False
58     logger.debug(f'{command} succeeded.')
59     return True
60
61
62 class BaseLight(dev.Device):
63     def __init__(self, name: str, mac: str, keywords: str = "") -> None:
64         super().__init__(name.strip(), mac.strip(), keywords)
65
66     @staticmethod
67     def parse_color_string(color: str) -> Optional[Tuple[int, int, int]]:
68         m = re.match(
69             'r#?([0-9a-fA-F][0-9a-fA-F])([0-9a-fA-F][0-9a-fA-F])([0-9a-fA-F][0-9a-fA-F])',
70             color,
71         )
72         if m is not None and len(m.groups()) == 3:
73             red = int(m.group(0), 16)
74             green = int(m.group(1), 16)
75             blue = int(m.group(2), 16)
76             return (red, green, blue)
77         color = color.lower()
78         return ansi.COLOR_NAMES_TO_RGB.get(color, None)
79
80     @abstractmethod
81     def status(self) -> str:
82         pass
83
84     @abstractmethod
85     def turn_on(self) -> bool:
86         pass
87
88     @abstractmethod
89     def turn_off(self) -> bool:
90         pass
91
92     @abstractmethod
93     def is_on(self) -> bool:
94         pass
95
96     @abstractmethod
97     def is_off(self) -> bool:
98         pass
99
100     @abstractmethod
101     def get_dimmer_level(self) -> Optional[int]:
102         pass
103
104     @abstractmethod
105     def set_dimmer_level(self, level: int) -> bool:
106         pass
107
108     @abstractmethod
109     def make_color(self, color: str) -> bool:
110         pass
111
112
113 class GoogleLight(BaseLight):
114     def __init__(self, name: str, mac: str, keywords: str = "") -> None:
115         super().__init__(name, mac, keywords)
116
117     def goog_name(self) -> str:
118         name = self.get_name()
119         return name.replace("_", " ")
120
121     @staticmethod
122     def parse_google_response(response: GoogleResponse) -> bool:
123         return response.success
124
125     @overrides
126     def turn_on(self) -> bool:
127         return GoogleLight.parse_google_response(ask_google(f"turn {self.goog_name()} on"))
128
129     @overrides
130     def turn_off(self) -> bool:
131         return GoogleLight.parse_google_response(ask_google(f"turn {self.goog_name()} off"))
132
133     @overrides
134     def status(self) -> str:
135         if self.is_on():
136             return 'ON'
137         return 'off'
138
139     @overrides
140     def is_on(self) -> bool:
141         r = ask_google(f"is {self.goog_name()} on?")
142         if not r.success:
143             return False
144         if r.audio_transcription is not None:
145             return 'is on' in r.audio_transcription
146         raise Exception("Can't reach Google?!")
147
148     @overrides
149     def is_off(self) -> bool:
150         return not self.is_on()
151
152     @overrides
153     def get_dimmer_level(self) -> Optional[int]:
154         if not self.has_keyword("dimmer"):
155             return False
156         r = ask_google(f'how bright is {self.goog_name()}?')
157         if not r.success:
158             return None
159
160         # the bookcase one is set to 40% bright
161         txt = r.audio_transcription
162         if txt is not None:
163             m = re.search(r"(\d+)% bright", txt)
164             if m is not None:
165                 return int(m.group(1))
166             if "is off" in txt:
167                 return 0
168         return None
169
170     @overrides
171     def set_dimmer_level(self, level: int) -> bool:
172         if not self.has_keyword("dimmer"):
173             return False
174         if 0 <= level <= 100:
175             was_on = self.is_on()
176             r = ask_google(f"set {self.goog_name()} to {level} percent")
177             if not r.success:
178                 return False
179             if not was_on:
180                 self.turn_off()
181             return True
182         return False
183
184     @overrides
185     def make_color(self, color: str) -> bool:
186         return GoogleLight.parse_google_response(ask_google(f"make {self.goog_name()} {color}"))
187
188
189 class TuyaLight(BaseLight):
190     ids_by_mac = {
191         '68:C6:3A:DE:1A:94': '8844664268c63ade1a94',
192         '68:C6:3A:DE:27:1A': '8844664268c63ade271a',
193         '68:C6:3A:DE:1D:95': '8844664268c63ade1d95',
194         '68:C6:3A:DE:19:B3': '8844664268c63ade19b3',
195         '80:7D:3A:77:3B:F5': '07445340807d3a773bf5',
196         '80:7D:3A:58:37:02': '07445340807d3a583702',
197     }
198     keys_by_mac = {
199         '68:C6:3A:DE:1A:94': '237f19b1b3d49c36',
200         '68:C6:3A:DE:27:1A': '237f19b1b3d49c36',
201         '68:C6:3A:DE:1D:95': '04b90fc5cd7625d8',
202         '68:C6:3A:DE:19:B3': '2d601f2892f1aefd',
203         '80:7D:3A:77:3B:F5': '27ab921fe4633519',
204         '80:7D:3A:58:37:02': '8559b5416bfa0c05',
205     }
206
207     def __init__(self, name: str, mac: str, keywords: str = "") -> None:
208         super().__init__(name, mac, keywords)
209         mac = mac.upper()
210         if mac not in TuyaLight.ids_by_mac or mac not in TuyaLight.keys_by_mac:
211             raise Exception(f'{mac} is unknown; add it to ids_by_mac and keys_by_mac')
212         self.devid = TuyaLight.ids_by_mac[mac]
213         self.key = TuyaLight.keys_by_mac[mac]
214         self.arper = arper.Arper()
215         ip = self.get_ip()
216         self.bulb = tt.BulbDevice(self.devid, ip, local_key=self.key)
217
218     def get_status(self) -> Dict[str, Any]:
219         return self.bulb.status()
220
221     @overrides
222     def status(self) -> str:
223         ret = ''
224         for k, v in self.bulb.status().items():
225             ret += f'{k} = {v}\n'
226         return ret
227
228     @overrides
229     def turn_on(self) -> bool:
230         self.bulb.turn_on()
231         return True
232
233     @overrides
234     def turn_off(self) -> bool:
235         self.bulb.turn_off()
236         return True
237
238     @overrides
239     def is_on(self) -> bool:
240         s = self.get_status()
241         return s['dps']['1']
242
243     @overrides
244     def is_off(self) -> bool:
245         return not self.is_on()
246
247     @overrides
248     def get_dimmer_level(self) -> Optional[int]:
249         s = self.get_status()
250         return s['dps']['3']
251
252     @overrides
253     def set_dimmer_level(self, level: int) -> bool:
254         logger.debug(f'Setting brightness to {level}')
255         self.bulb.set_brightness(level)
256         return True
257
258     @overrides
259     def make_color(self, color: str) -> bool:
260         rgb = BaseLight.parse_color_string(color)
261         logger.debug(f'Light color: {color} -> {rgb}')
262         if rgb is not None:
263             self.bulb.set_colour(rgb[0], rgb[1], rgb[2])
264             return True
265         return False
266
267
268 class TPLinkLight(BaseLight):
269     def __init__(self, name: str, mac: str, keywords: str = "") -> None:
270         super().__init__(name, mac, keywords)
271         self.children: List[str] = []
272         self.info: Optional[Dict] = None
273         self.info_ts: Optional[datetime.datetime] = None
274         if "children" in self.keywords:
275             self.info = self.get_info()
276             if self.info is not None:
277                 for child in self.info["children"]:
278                     self.children.append(child["id"])
279
280     @memoized
281     def get_tplink_name(self) -> Optional[str]:
282         self.info = self.get_info()
283         if self.info is not None:
284             return self.info["alias"]
285         return None
286
287     def get_cmdline(self, child: str = None) -> str:
288         cmd = (
289             f"{config.config['smart_lights_tplink_location']} -m {self.mac} "
290             f"--no_logging_console "
291         )
292         if child is not None:
293             cmd += f"-x {child} "
294         return cmd
295
296     def get_children(self) -> List[str]:
297         return self.children
298
299     def command(self, cmd: str, child: str = None, extra_args: str = None) -> bool:
300         cmd = self.get_cmdline(child) + f"-c {cmd}"
301         if extra_args is not None:
302             cmd += f" {extra_args}"
303         logger.debug(f'About to execute {cmd}')
304         return tplink_light_command(cmd)
305
306     @overrides
307     def turn_on(self, child: str = None) -> bool:
308         return self.command("on", child)
309
310     @overrides
311     def turn_off(self, child: str = None) -> bool:
312         return self.command("off", child)
313
314     @overrides
315     def is_on(self) -> bool:
316         self.info = self.get_info()
317         if self.info is None:
318             raise Exception('Unable to get info?')
319         return self.info.get("relay_state", 0) == 1
320
321     @overrides
322     def is_off(self) -> bool:
323         return not self.is_on()
324
325     @overrides
326     def make_color(self, color: str) -> bool:
327         raise NotImplementedError
328
329     @timeout(10.0, use_signals=False, error_message="Timed out waiting for tplink.py")
330     def get_info(self) -> Optional[Dict]:
331         cmd = self.get_cmdline() + "-c info"
332         out = subprocess.getoutput(cmd)
333         logger.debug(f'RAW OUT> {out}')
334         out = re.sub("Sent:.*\n", "", out)
335         out = re.sub("Received: *", "", out)
336         try:
337             self.info = json.loads(out)["system"]["get_sysinfo"]
338             logger.debug(json.dumps(self.info, indent=4, sort_keys=True))
339             self.info_ts = datetime.datetime.now()
340             return self.info
341         except Exception as e:
342             logger.exception(e)
343             print(out, file=sys.stderr)
344             self.info = None
345             self.info_ts = None
346             return None
347
348     @overrides
349     def status(self) -> str:
350         ret = ''
351         for k, v in self.get_info().items():
352             ret += f'{k} = {v}\n'
353         return ret
354
355     def get_on_duration_seconds(self, child: str = None) -> int:
356         self.info = self.get_info()
357         if child is None:
358             if self.info is None:
359                 return 0
360             return int(self.info.get("on_time", "0"))
361         else:
362             if self.info is None:
363                 return 0
364             for chi in self.info.get("children", {}):
365                 if chi["id"] == child:
366                     return int(chi.get("on_time", "0"))
367         return 0
368
369     @overrides
370     def get_dimmer_level(self) -> Optional[int]:
371         if not self.has_keyword("dimmer"):
372             return False
373         self.info = self.get_info()
374         if self.info is None:
375             return None
376         return int(self.info.get("brightness", "0"))
377
378     @overrides
379     def set_dimmer_level(self, level: int) -> bool:
380         if not self.has_keyword("dimmer"):
381             return False
382         cmd = (
383             self.get_cmdline()
384             + f'-j \'{{"smartlife.iot.dimmer":{{"set_brightness":{{"brightness":{level} }} }} }}\''
385         )
386         return tplink_light_command(cmd)
387
388
389 # class GoogleLightGroup(GoogleLight):
390 #     def __init__(self, name: str, members: List[GoogleLight], keywords: str = "") -> None:
391 #         if len(members) < 1:
392 #             raise Exception("There must be at least one light in the group.")
393 #         self.members = members
394 #         mac = GoogleLightGroup.make_up_mac(members)
395 #         super().__init__(name, mac, keywords)
396
397 #     @staticmethod
398 #     def make_up_mac(members: List[GoogleLight]):
399 #         mac = members[0].get_mac()
400 #         b = mac.split(':')
401 #         b[5] = int(b[5], 16) + 1
402 #         if b[5] > 255:
403 #             b[5] = 0
404 #         b[5] = str(b[5])
405 #         return ":".join(b)
406
407 #     def is_on(self) -> bool:
408 #         r = ask_google(f"are {self.goog_name()} on?")
409 #         if not r.success:
410 #             return False
411 #         return 'is on' in r.audio_transcription
412
413 #     def get_dimmer_level(self) -> Optional[int]:
414 #         if not self.has_keyword("dimmer"):
415 #             return False
416 #         r = ask_google(f'how bright are {self.goog_name()}?')
417 #         if not r.success:
418 #             return None
419
420 #         # four lights are set to 100% brightness
421 #         txt = r.audio_transcription
422 #         m = re.search(r"(\d+)% bright", txt)
423 #         if m is not None:
424 #             return int(m.group(1))
425 #         if "is off" in txt:
426 #             return 0
427 #         return None
428
429 #     def set_dimmer_level(self, level: int) -> bool:
430 #         if not self.has_keyword("dimmer"):
431 #             return False
432 #         if 0 <= level <= 100:
433 #             was_on = self.is_on()
434 #             r = ask_google(f"set {self.goog_name()} to {level} percent")
435 #             if not r.success:
436 #                 return False
437 #             if not was_on:
438 #                 self.turn_off()
439 #             return True
440 #         return False
441
442 #     def make_color(self, color: str) -> bool:
443 #         return GoogleLight.parse_google_response(
444 #             ask_google(f"make {self.goog_name()} {color}")
445 #         )