Make subdirs type clean too.
[python_utils.git] / smart_home / lights.py
1 #!/usr/bin/env python3
2
3 """Utilities for dealing with the smart lights."""
4
5 import datetime
6 import json
7 import logging
8 import os
9 import re
10 import subprocess
11 import sys
12 from abc import abstractmethod
13 from typing import Any, Dict, List, Optional, Tuple
14
15 import tinytuya as tt
16 from overrides import overrides
17
18 import ansi
19 import argparse_utils
20 import arper
21 import config
22 import logging_utils
23 import smart_home.device as dev
24 from decorator_utils import memoized, timeout
25 from google_assistant import GoogleResponse, ask_google
26
27 logger = logging.getLogger(__name__)
28
29 args = config.add_commandline_args(
30     f"Smart Lights ({__file__})",
31     "Args related to smart lights.",
32 )
33 args.add_argument(
34     '--smart_lights_tplink_location',
35     default='/home/scott/bin/tplink.py',
36     metavar='FILENAME',
37     help='The location of the tplink.py helper',
38     type=argparse_utils.valid_filename,
39 )
40
41
42 @timeout(5.0, use_signals=False, error_message="Timed out waiting for tplink.py")
43 def tplink_light_command(command: str) -> bool:
44     result = os.system(command)
45     signal = result & 0xFF
46     if signal != 0:
47         msg = f'{command} died with signal {signal}'
48         logger.warning(msg)
49         logging_utils.hlog(msg)
50         return False
51     else:
52         exit_value = result >> 8
53         if exit_value != 0:
54             msg = f'{command} failed, exited {exit_value}'
55             logger.warning(msg)
56             logging_utils.hlog(msg)
57             return False
58     logger.debug(f'{command} succeeded.')
59     return True
60
61
62 class BaseLight(dev.Device):
63     def __init__(self, name: str, mac: str, keywords: str = "") -> None:
64         super().__init__(name.strip(), mac.strip(), keywords)
65
66     @staticmethod
67     def parse_color_string(color: str) -> Optional[Tuple[int, int, int]]:
68         m = re.match(
69             'r#?([0-9a-fA-F][0-9a-fA-F])([0-9a-fA-F][0-9a-fA-F])([0-9a-fA-F][0-9a-fA-F])',
70             color,
71         )
72         if m is not None and len(m.groups()) == 3:
73             red = int(m.group(0), 16)
74             green = int(m.group(1), 16)
75             blue = int(m.group(2), 16)
76             return (red, green, blue)
77         color = color.lower()
78         return ansi.COLOR_NAMES_TO_RGB.get(color, None)
79
80     @abstractmethod
81     def status(self) -> str:
82         pass
83
84     @abstractmethod
85     def turn_on(self) -> bool:
86         pass
87
88     @abstractmethod
89     def turn_off(self) -> bool:
90         pass
91
92     @abstractmethod
93     def is_on(self) -> bool:
94         pass
95
96     @abstractmethod
97     def is_off(self) -> bool:
98         pass
99
100     @abstractmethod
101     def get_dimmer_level(self) -> Optional[int]:
102         pass
103
104     @abstractmethod
105     def set_dimmer_level(self, level: int) -> bool:
106         pass
107
108     @abstractmethod
109     def make_color(self, color: str) -> bool:
110         pass
111
112
113 class GoogleLight(BaseLight):
114     def __init__(self, name: str, mac: str, keywords: str = "") -> None:
115         super().__init__(name, mac, keywords)
116
117     def goog_name(self) -> str:
118         name = self.get_name()
119         return name.replace("_", " ")
120
121     @staticmethod
122     def parse_google_response(response: GoogleResponse) -> bool:
123         return response.success
124
125     @overrides
126     def turn_on(self) -> bool:
127         return GoogleLight.parse_google_response(
128             ask_google(f"turn {self.goog_name()} on")
129         )
130
131     @overrides
132     def turn_off(self) -> bool:
133         return GoogleLight.parse_google_response(
134             ask_google(f"turn {self.goog_name()} off")
135         )
136
137     @overrides
138     def status(self) -> str:
139         if self.is_on():
140             return 'ON'
141         return 'off'
142
143     @overrides
144     def is_on(self) -> bool:
145         r = ask_google(f"is {self.goog_name()} on?")
146         if not r.success:
147             return False
148         if r.audio_transcription is not None:
149             return 'is on' in r.audio_transcription
150         raise Exception("Can't reach Google?!")
151
152     @overrides
153     def is_off(self) -> bool:
154         return not self.is_on()
155
156     @overrides
157     def get_dimmer_level(self) -> Optional[int]:
158         if not self.has_keyword("dimmer"):
159             return False
160         r = ask_google(f'how bright is {self.goog_name()}?')
161         if not r.success:
162             return None
163
164         # the bookcase one is set to 40% bright
165         txt = r.audio_transcription
166         if txt is not None:
167             m = re.search(r"(\d+)% bright", txt)
168             if m is not None:
169                 return int(m.group(1))
170             if "is off" in txt:
171                 return 0
172         return None
173
174     @overrides
175     def set_dimmer_level(self, level: int) -> bool:
176         if not self.has_keyword("dimmer"):
177             return False
178         if 0 <= level <= 100:
179             was_on = self.is_on()
180             r = ask_google(f"set {self.goog_name()} to {level} percent")
181             if not r.success:
182                 return False
183             if not was_on:
184                 self.turn_off()
185             return True
186         return False
187
188     @overrides
189     def make_color(self, color: str) -> bool:
190         return GoogleLight.parse_google_response(
191             ask_google(f"make {self.goog_name()} {color}")
192         )
193
194
195 class TuyaLight(BaseLight):
196     ids_by_mac = {
197         '68:C6:3A:DE:1A:94': '8844664268c63ade1a94',
198         '68:C6:3A:DE:27:1A': '8844664268c63ade271a',
199         '68:C6:3A:DE:1D:95': '8844664268c63ade1d95',
200         '68:C6:3A:DE:19:B3': '8844664268c63ade19b3',
201         '80:7D:3A:77:3B:F5': '07445340807d3a773bf5',
202         '80:7D:3A:58:37:02': '07445340807d3a583702',
203     }
204     keys_by_mac = {
205         '68:C6:3A:DE:1A:94': '237f19b1b3d49c36',
206         '68:C6:3A:DE:27:1A': '237f19b1b3d49c36',
207         '68:C6:3A:DE:1D:95': '04b90fc5cd7625d8',
208         '68:C6:3A:DE:19:B3': '2d601f2892f1aefd',
209         '80:7D:3A:77:3B:F5': '27ab921fe4633519',
210         '80:7D:3A:58:37:02': '8559b5416bfa0c05',
211     }
212
213     def __init__(self, name: str, mac: str, keywords: str = "") -> None:
214         super().__init__(name, mac, keywords)
215         mac = mac.upper()
216         if mac not in TuyaLight.ids_by_mac or mac not in TuyaLight.keys_by_mac:
217             raise Exception(f'{mac} is unknown; add it to ids_by_mac and keys_by_mac')
218         self.devid = TuyaLight.ids_by_mac[mac]
219         self.key = TuyaLight.keys_by_mac[mac]
220         self.arper = arper.Arper()
221         ip = self.get_ip()
222         self.bulb = tt.BulbDevice(self.devid, ip, local_key=self.key)
223
224     def get_status(self) -> Dict[str, Any]:
225         return self.bulb.status()
226
227     @overrides
228     def status(self) -> str:
229         ret = ''
230         for k, v in self.bulb.status().items():
231             ret += f'{k} = {v}\n'
232         return ret
233
234     @overrides
235     def turn_on(self) -> bool:
236         self.bulb.turn_on()
237         return True
238
239     @overrides
240     def turn_off(self) -> bool:
241         self.bulb.turn_off()
242         return True
243
244     @overrides
245     def is_on(self) -> bool:
246         s = self.get_status()
247         return s['dps']['1']
248
249     @overrides
250     def is_off(self) -> bool:
251         return not self.is_on()
252
253     @overrides
254     def get_dimmer_level(self) -> Optional[int]:
255         s = self.get_status()
256         return s['dps']['3']
257
258     @overrides
259     def set_dimmer_level(self, level: int) -> bool:
260         logger.debug(f'Setting brightness to {level}')
261         self.bulb.set_brightness(level)
262         return True
263
264     @overrides
265     def make_color(self, color: str) -> bool:
266         rgb = BaseLight.parse_color_string(color)
267         logger.debug(f'Light color: {color} -> {rgb}')
268         if rgb is not None:
269             self.bulb.set_colour(rgb[0], rgb[1], rgb[2])
270             return True
271         return False
272
273
274 class TPLinkLight(BaseLight):
275     def __init__(self, name: str, mac: str, keywords: str = "") -> None:
276         super().__init__(name, mac, keywords)
277         self.children: List[str] = []
278         self.info: Optional[Dict] = None
279         self.info_ts: Optional[datetime.datetime] = None
280         if "children" in self.keywords:
281             self.info = self.get_info()
282             if self.info is not None:
283                 for child in self.info["children"]:
284                     self.children.append(child["id"])
285
286     @memoized
287     def get_tplink_name(self) -> Optional[str]:
288         self.info = self.get_info()
289         if self.info is not None:
290             return self.info["alias"]
291         return None
292
293     def get_cmdline(self, child: str = None) -> str:
294         cmd = (
295             f"{config.config['smart_lights_tplink_location']} -m {self.mac} "
296             f"--no_logging_console "
297         )
298         if child is not None:
299             cmd += f"-x {child} "
300         return cmd
301
302     def get_children(self) -> List[str]:
303         return self.children
304
305     def command(self, cmd: str, child: str = None, extra_args: str = None) -> bool:
306         cmd = self.get_cmdline(child) + f"-c {cmd}"
307         if extra_args is not None:
308             cmd += f" {extra_args}"
309         logger.debug(f'About to execute {cmd}')
310         return tplink_light_command(cmd)
311
312     @overrides
313     def turn_on(self, child: str = None) -> bool:
314         return self.command("on", child)
315
316     @overrides
317     def turn_off(self, child: str = None) -> bool:
318         return self.command("off", child)
319
320     @overrides
321     def is_on(self) -> bool:
322         self.info = self.get_info()
323         if self.info is None:
324             raise Exception('Unable to get info?')
325         return self.info.get("relay_state", 0) == 1
326
327     @overrides
328     def is_off(self) -> bool:
329         return not self.is_on()
330
331     @overrides
332     def make_color(self, color: str) -> bool:
333         raise NotImplementedError
334
335     @timeout(10.0, use_signals=False, error_message="Timed out waiting for tplink.py")
336     def get_info(self) -> Optional[Dict]:
337         cmd = self.get_cmdline() + "-c info"
338         out = subprocess.getoutput(cmd)
339         logger.debug(f'RAW OUT> {out}')
340         out = re.sub("Sent:.*\n", "", out)
341         out = re.sub("Received: *", "", out)
342         try:
343             self.info = json.loads(out)["system"]["get_sysinfo"]
344             logger.debug(json.dumps(self.info, indent=4, sort_keys=True))
345             self.info_ts = datetime.datetime.now()
346             return self.info
347         except Exception as e:
348             logger.exception(e)
349             print(out, file=sys.stderr)
350             self.info = None
351             self.info_ts = None
352             return None
353
354     @overrides
355     def status(self) -> str:
356         ret = ''
357         for k, v in self.get_info().items():
358             ret += f'{k} = {v}\n'
359         return ret
360
361     def get_on_duration_seconds(self, child: str = None) -> int:
362         self.info = self.get_info()
363         if child is None:
364             if self.info is None:
365                 return 0
366             return int(self.info.get("on_time", "0"))
367         else:
368             if self.info is None:
369                 return 0
370             for chi in self.info.get("children", {}):
371                 if chi["id"] == child:
372                     return int(chi.get("on_time", "0"))
373         return 0
374
375     @overrides
376     def get_dimmer_level(self) -> Optional[int]:
377         if not self.has_keyword("dimmer"):
378             return False
379         self.info = self.get_info()
380         if self.info is None:
381             return None
382         return int(self.info.get("brightness", "0"))
383
384     @overrides
385     def set_dimmer_level(self, level: int) -> bool:
386         if not self.has_keyword("dimmer"):
387             return False
388         cmd = (
389             self.get_cmdline()
390             + f'-j \'{{"smartlife.iot.dimmer":{{"set_brightness":{{"brightness":{level} }} }} }}\''
391         )
392         return tplink_light_command(cmd)
393
394
395 # class GoogleLightGroup(GoogleLight):
396 #     def __init__(self, name: str, members: List[GoogleLight], keywords: str = "") -> None:
397 #         if len(members) < 1:
398 #             raise Exception("There must be at least one light in the group.")
399 #         self.members = members
400 #         mac = GoogleLightGroup.make_up_mac(members)
401 #         super().__init__(name, mac, keywords)
402
403 #     @staticmethod
404 #     def make_up_mac(members: List[GoogleLight]):
405 #         mac = members[0].get_mac()
406 #         b = mac.split(':')
407 #         b[5] = int(b[5], 16) + 1
408 #         if b[5] > 255:
409 #             b[5] = 0
410 #         b[5] = str(b[5])
411 #         return ":".join(b)
412
413 #     def is_on(self) -> bool:
414 #         r = ask_google(f"are {self.goog_name()} on?")
415 #         if not r.success:
416 #             return False
417 #         return 'is on' in r.audio_transcription
418
419 #     def get_dimmer_level(self) -> Optional[int]:
420 #         if not self.has_keyword("dimmer"):
421 #             return False
422 #         r = ask_google(f'how bright are {self.goog_name()}?')
423 #         if not r.success:
424 #             return None
425
426 #         # four lights are set to 100% brightness
427 #         txt = r.audio_transcription
428 #         m = re.search(r"(\d+)% bright", txt)
429 #         if m is not None:
430 #             return int(m.group(1))
431 #         if "is off" in txt:
432 #             return 0
433 #         return None
434
435 #     def set_dimmer_level(self, level: int) -> bool:
436 #         if not self.has_keyword("dimmer"):
437 #             return False
438 #         if 0 <= level <= 100:
439 #             was_on = self.is_on()
440 #             r = ask_google(f"set {self.goog_name()} to {level} percent")
441 #             if not r.success:
442 #                 return False
443 #             if not was_on:
444 #                 self.turn_off()
445 #             return True
446 #         return False
447
448 #     def make_color(self, color: str) -> bool:
449 #         return GoogleLight.parse_google_response(
450 #             ask_google(f"make {self.goog_name()} {color}")
451 #         )