A bunch of changes...
[python_utils.git] / smart_home / lights.py
1 #!/usr/bin/env python3
2
3 """Utilities for dealing with the smart lights."""
4
5 from abc import abstractmethod
6 import datetime
7 import json
8 import logging
9 import os
10 import re
11 import subprocess
12 import sys
13 from typing import Any, Dict, List, Optional, Tuple
14
15 from overrides import overrides
16 import tinytuya as tt
17
18 import ansi
19 import argparse_utils
20 import arper
21 import config
22 import logging_utils
23 import smart_home.device as dev
24 from google_assistant import ask_google, GoogleResponse
25 from decorator_utils import timeout, memoized
26
27 logger = logging.getLogger(__name__)
28
29 args = config.add_commandline_args(
30     f"Smart Lights ({__file__})",
31     "Args related to smart lights.",
32 )
33 args.add_argument(
34     '--smart_lights_tplink_location',
35     default='/home/scott/bin/tplink.py',
36     metavar='FILENAME',
37     help='The location of the tplink.py helper',
38     type=argparse_utils.valid_filename,
39 )
40
41
42 @timeout(
43     5.0, use_signals=False, error_message="Timed out waiting for tplink.py"
44 )
45 def tplink_light_command(command: str) -> bool:
46     result = os.system(command)
47     signal = result & 0xFF
48     if signal != 0:
49         logger.warning(f'{command} died with signal {signal}')
50         logging_utils.hlog("%s died with signal %d" % (command, signal))
51         return False
52     else:
53         exit_value = result >> 8
54         if exit_value != 0:
55             logger.warning(f'{command} failed, exited {exit_value}')
56             logging_utils.hlog("%s failed, exit %d" % (command, exit_value))
57             return False
58     logger.debug(f'{command} succeeded.')
59     return True
60
61
62 class BaseLight(dev.Device):
63     def __init__(self, name: str, mac: str, keywords: str = "") -> None:
64         super().__init__(name.strip(), mac.strip(), keywords)
65
66     @staticmethod
67     def parse_color_string(color: str) -> Optional[Tuple[int, int, int]]:
68         m = re.match(
69             'r#?([0-9a-fA-F][0-9a-fA-F])([0-9a-fA-F][0-9a-fA-F])([0-9a-fA-F][0-9a-fA-F])',
70             color
71         )
72         if m is not None and len(m.group) == 3:
73             red = int(m.group(0), 16)
74             green = int(m.group(1), 16)
75             blue = int(m.group(2), 16)
76             return (red, green, blue)
77         color = color.lower()
78         return ansi.COLOR_NAMES_TO_RGB.get(color, None)
79
80     @abstractmethod
81     def status(self) -> str:
82         pass
83
84     @abstractmethod
85     def turn_on(self) -> bool:
86         pass
87
88     @abstractmethod
89     def turn_off(self) -> bool:
90         pass
91
92     @abstractmethod
93     def is_on(self) -> bool:
94         pass
95
96     @abstractmethod
97     def is_off(self) -> bool:
98         pass
99
100     @abstractmethod
101     def get_dimmer_level(self) -> Optional[int]:
102         pass
103
104     @abstractmethod
105     def set_dimmer_level(self, level: int) -> bool:
106         pass
107
108     @abstractmethod
109     def make_color(self, color: str) -> bool:
110         pass
111
112
113 class GoogleLight(BaseLight):
114     def __init__(self, name: str, mac: str, keywords: str = "") -> None:
115         super().__init__(name, mac, keywords)
116
117     def goog_name(self) -> str:
118         name = self.get_name()
119         return name.replace("_", " ")
120
121     @staticmethod
122     def parse_google_response(response: GoogleResponse) -> bool:
123         return response.success
124
125     @overrides
126     def turn_on(self) -> bool:
127         return GoogleLight.parse_google_response(
128             ask_google(f"turn {self.goog_name()} on")
129         )
130
131     @overrides
132     def turn_off(self) -> bool:
133         return GoogleLight.parse_google_response(
134             ask_google(f"turn {self.goog_name()} off")
135         )
136
137     @overrides
138     def status(self) -> str:
139         if self.is_on():
140             return 'ON'
141         return 'off'
142
143     @overrides
144     def is_on(self) -> bool:
145         r = ask_google(f"is {self.goog_name()} on?")
146         if not r.success:
147             return False
148         return 'is on' in r.audio_transcription
149
150     @overrides
151     def is_off(self) -> bool:
152         return not self.is_on()
153
154     @overrides
155     def get_dimmer_level(self) -> Optional[int]:
156         if not self.has_keyword("dimmer"):
157             return False
158         r = ask_google(f'how bright is {self.goog_name()}?')
159         if not r.success:
160             return None
161
162         # the bookcase one is set to 40% bright
163         txt = r.audio_transcription
164         m = re.search(r"(\d+)% bright", txt)
165         if m is not None:
166             return int(m.group(1))
167         if "is off" in txt:
168             return 0
169         return None
170
171     @overrides
172     def set_dimmer_level(self, level: int) -> bool:
173         if not self.has_keyword("dimmer"):
174             return False
175         if 0 <= level <= 100:
176             was_on = self.is_on()
177             r = ask_google(f"set {self.goog_name()} to {level} percent")
178             if not r.success:
179                 return False
180             if not was_on:
181                 self.turn_off()
182             return True
183         return False
184
185     @overrides
186     def make_color(self, color: str) -> bool:
187         return GoogleLight.parse_google_response(
188             ask_google(f"make {self.goog_name()} {color}")
189         )
190
191
192 class TuyaLight(BaseLight):
193     ids_by_mac = {
194         '68:C6:3A:DE:1A:94': '8844664268c63ade1a94',
195         '68:C6:3A:DE:27:1A': '8844664268c63ade271a',
196         '68:C6:3A:DE:1D:95': '8844664268c63ade1d95',
197         '68:C6:3A:DE:19:B3': '8844664268c63ade19b3',
198         '80:7D:3A:77:3B:F5': '07445340807d3a773bf5',
199         '80:7D:3A:58:37:02': '07445340807d3a583702',
200     }
201     keys_by_mac = {
202         '68:C6:3A:DE:1A:94': '237f19b1b3d49c36',
203         '68:C6:3A:DE:27:1A': '237f19b1b3d49c36',
204         '68:C6:3A:DE:1D:95': '04b90fc5cd7625d8',
205         '68:C6:3A:DE:19:B3': '2d601f2892f1aefd',
206         '80:7D:3A:77:3B:F5': '27ab921fe4633519',
207         '80:7D:3A:58:37:02': '8559b5416bfa0c05',
208     }
209
210     def __init__(self, name: str, mac: str, keywords: str = "") -> None:
211         super().__init__(name, mac, keywords)
212         mac = mac.upper()
213         if mac not in TuyaLight.ids_by_mac or mac not in TuyaLight.keys_by_mac:
214             raise Exception(f'{mac} is unknown; add it to ids_by_mac and keys_by_mac')
215         self.devid = TuyaLight.ids_by_mac[mac]
216         self.key = TuyaLight.keys_by_mac[mac]
217         self.arper = arper.Arper()
218         ip = self.get_ip()
219         self.bulb = tt.BulbDevice(self.devid, ip, local_key=self.key)
220
221     def get_status(self) -> Dict[str, Any]:
222         return self.bulb.status()
223
224     @overrides
225     def status(self) -> str:
226         ret = ''
227         for k, v in self.bulb.status().items():
228             ret += f'{k} = {v}\n'
229         return ret
230
231     @overrides
232     def turn_on(self) -> bool:
233         self.bulb.turn_on()
234         return True
235
236     @overrides
237     def turn_off(self) -> bool:
238         self.bulb.turn_off()
239         return True
240
241     @overrides
242     def is_on(self) -> bool:
243         s = self.get_status()
244         return s['dps']['1']
245
246     @overrides
247     def is_off(self) -> bool:
248         return not self.is_on()
249
250     @overrides
251     def get_dimmer_level(self) -> Optional[int]:
252         s = self.get_status()
253         return s['dps']['3']
254
255     @overrides
256     def set_dimmer_level(self, level: int) -> bool:
257         logger.debug(f'Setting brightness to {level}')
258         self.bulb.set_brightness(level)
259         return True
260
261     @overrides
262     def make_color(self, color: str) -> bool:
263         rgb = BaseLight.parse_color_string(color)
264         logger.debug(f'Light color: {color} -> {rgb}')
265         if rgb is not None:
266             self.bulb.set_colour(rgb[0], rgb[1], rgb[2])
267             return True
268         return False
269
270
271 class TPLinkLight(BaseLight):
272     def __init__(self, name: str, mac: str, keywords: str = "") -> None:
273         super().__init__(name, mac, keywords)
274         self.children: List[str] = []
275         self.info: Optional[Dict] = None
276         self.info_ts: Optional[datetime.datetime] = None
277         if "children" in self.keywords:
278             self.info = self.get_info()
279             if self.info is not None:
280                 for child in self.info["children"]:
281                     self.children.append(child["id"])
282
283     @memoized
284     def get_tplink_name(self) -> Optional[str]:
285         self.info = self.get_info()
286         if self.info is not None:
287             return self.info["alias"]
288         return None
289
290     def get_cmdline(self, child: str = None) -> str:
291         cmd = (
292             f"{config.config['smart_lights_tplink_location']} -m {self.mac} "
293             f"--no_logging_console "
294         )
295         if child is not None:
296             cmd += f"-x {child} "
297         return cmd
298
299     def get_children(self) -> List[str]:
300         return self.children
301
302     def command(
303         self, cmd: str, child: str = None, extra_args: str = None
304     ) -> bool:
305         cmd = self.get_cmdline(child) + f"-c {cmd}"
306         if extra_args is not None:
307             cmd += f" {extra_args}"
308         logger.debug(f'About to execute {cmd}')
309         return tplink_light_command(cmd)
310
311     @overrides
312     def turn_on(self, child: str = None) -> bool:
313         return self.command("on", child)
314
315     @overrides
316     def turn_off(self, child: str = None) -> bool:
317         return self.command("off", child)
318
319     @overrides
320     def is_on(self) -> bool:
321         self.info = self.get_info()
322         if self.info is None:
323             raise Exception('Unable to get info?')
324         return self.info.get("relay_state", "0") == "1"
325
326     @overrides
327     def is_off(self) -> bool:
328         return not self.is_on()
329
330     @overrides
331     def make_color(self, color: str) -> bool:
332         raise NotImplementedError
333
334     @timeout(
335         10.0, use_signals=False, error_message="Timed out waiting for tplink.py"
336     )
337     def get_info(self) -> Optional[Dict]:
338         cmd = self.get_cmdline() + "-c info"
339         out = subprocess.getoutput(cmd)
340         logger.debug(f'RAW OUT> {out}')
341         out = re.sub("Sent:.*\n", "", out)
342         out = re.sub("Received: *", "", out)
343         try:
344             self.info = json.loads(out)["system"]["get_sysinfo"]
345             self.info_ts = datetime.datetime.now()
346             return self.info
347         except Exception as e:
348             logger.exception(e)
349             print(out, file=sys.stderr)
350             self.info = None
351             self.info_ts = None
352             return None
353
354     @overrides
355     def status(self) -> str:
356         ret = ''
357         for k, v in self.get_info().items():
358             ret += f'{k} = {v}\n'
359         return ret
360
361     def get_on_duration_seconds(self, child: str = None) -> int:
362         self.info = self.get_info()
363         if child is None:
364             if self.info is None:
365                 return 0
366             return int(self.info.get("on_time", "0"))
367         else:
368             if self.info is None:
369                 return 0
370             for chi in self.info.get("children", {}):
371                 if chi["id"] == child:
372                     return int(chi.get("on_time", "0"))
373         return 0
374
375     @overrides
376     def get_dimmer_level(self) -> Optional[int]:
377         if not self.has_keyword("dimmer"):
378             return False
379         self.info = self.get_info()
380         if self.info is None:
381             return None
382         return int(self.info.get("brightness", "0"))
383
384     @overrides
385     def set_dimmer_level(self, level: int) -> bool:
386         if not self.has_keyword("dimmer"):
387             return False
388         cmd = (
389             self.get_cmdline()
390             + f'-j \'{{"smartlife.iot.dimmer":{{"set_brightness":{{"brightness":{level} }} }} }}\''
391         )
392         return tplink_light_command(cmd)
393
394
395 # class GoogleLightGroup(GoogleLight):
396 #     def __init__(self, name: str, members: List[GoogleLight], keywords: str = "") -> None:
397 #         if len(members) < 1:
398 #             raise Exception("There must be at least one light in the group.")
399 #         self.members = members
400 #         mac = GoogleLightGroup.make_up_mac(members)
401 #         super().__init__(name, mac, keywords)
402
403 #     @staticmethod
404 #     def make_up_mac(members: List[GoogleLight]):
405 #         mac = members[0].get_mac()
406 #         b = mac.split(':')
407 #         b[5] = int(b[5], 16) + 1
408 #         if b[5] > 255:
409 #             b[5] = 0
410 #         b[5] = str(b[5])
411 #         return ":".join(b)
412
413 #     def is_on(self) -> bool:
414 #         r = ask_google(f"are {self.goog_name()} on?")
415 #         if not r.success:
416 #             return False
417 #         return 'is on' in r.audio_transcription
418
419 #     def get_dimmer_level(self) -> Optional[int]:
420 #         if not self.has_keyword("dimmer"):
421 #             return False
422 #         r = ask_google(f'how bright are {self.goog_name()}?')
423 #         if not r.success:
424 #             return None
425
426 #         # four lights are set to 100% brightness
427 #         txt = r.audio_transcription
428 #         m = re.search(r"(\d+)% bright", txt)
429 #         if m is not None:
430 #             return int(m.group(1))
431 #         if "is off" in txt:
432 #             return 0
433 #         return None
434
435 #     def set_dimmer_level(self, level: int) -> bool:
436 #         if not self.has_keyword("dimmer"):
437 #             return False
438 #         if 0 <= level <= 100:
439 #             was_on = self.is_on()
440 #             r = ask_google(f"set {self.goog_name()} to {level} percent")
441 #             if not r.success:
442 #                 return False
443 #             if not was_on:
444 #                 self.turn_off()
445 #             return True
446 #         return False
447
448 #     def make_color(self, color: str) -> bool:
449 #         return GoogleLight.parse_google_response(
450 #             ask_google(f"make {self.goog_name()} {color}")
451 #         )