Small bugfixes; also, add a new machine to the remote executor pool.
[python_utils.git] / smart_home / lights.py
1 #!/usr/bin/env python3
2
3 """Utilities for dealing with the smart lights."""
4
5 from abc import abstractmethod
6 import datetime
7 import json
8 import logging
9 import os
10 import re
11 import subprocess
12 import sys
13 from typing import Any, Dict, List, Optional, Tuple
14
15 from overrides import overrides
16 import tinytuya as tt
17
18 import ansi
19 import argparse_utils
20 import arper
21 import config
22 import logging_utils
23 import smart_home.device as dev
24 from google_assistant import ask_google, GoogleResponse
25 from decorator_utils import timeout, memoized
26
27 logger = logging.getLogger(__name__)
28
29 args = config.add_commandline_args(
30     f"Smart Lights ({__file__})",
31     "Args related to smart lights.",
32 )
33 args.add_argument(
34     '--smart_lights_tplink_location',
35     default='/home/scott/bin/tplink.py',
36     metavar='FILENAME',
37     help='The location of the tplink.py helper',
38     type=argparse_utils.valid_filename,
39 )
40
41
42 @timeout(
43     5.0, use_signals=False, error_message="Timed out waiting for tplink.py"
44 )
45 def tplink_light_command(command: str) -> bool:
46     result = os.system(command)
47     signal = result & 0xFF
48     if signal != 0:
49         logger.warning(f'{command} died with signal {signal}')
50         logging_utils.hlog("%s died with signal %d" % (command, signal))
51         return False
52     else:
53         exit_value = result >> 8
54         if exit_value != 0:
55             logger.warning(f'{command} failed, exited {exit_value}')
56             logging_utils.hlog("%s failed, exit %d" % (command, exit_value))
57             return False
58     logger.debug(f'{command} succeeded.')
59     return True
60
61
62 class BaseLight(dev.Device):
63     def __init__(self, name: str, mac: str, keywords: str = "") -> None:
64         super().__init__(name.strip(), mac.strip(), keywords)
65
66     @staticmethod
67     def parse_color_string(color: str) -> Optional[Tuple[int, int, int]]:
68         m = re.match(
69             'r#?([0-9a-fA-F][0-9a-fA-F])([0-9a-fA-F][0-9a-fA-F])([0-9a-fA-F][0-9a-fA-F])',
70             color
71         )
72         if m is not None and len(m.group) == 3:
73             red = int(m.group(0), 16)
74             green = int(m.group(1), 16)
75             blue = int(m.group(2), 16)
76             return (red, green, blue)
77         color = color.lower()
78         return ansi.COLOR_NAMES_TO_RGB.get(color, None)
79
80     @abstractmethod
81     def status(self) -> str:
82         pass
83
84     @abstractmethod
85     def turn_on(self) -> bool:
86         pass
87
88     @abstractmethod
89     def turn_off(self) -> bool:
90         pass
91
92     @abstractmethod
93     def is_on(self) -> bool:
94         pass
95
96     @abstractmethod
97     def is_off(self) -> bool:
98         pass
99
100     @abstractmethod
101     def get_dimmer_level(self) -> Optional[int]:
102         pass
103
104     @abstractmethod
105     def set_dimmer_level(self, level: int) -> bool:
106         pass
107
108     @abstractmethod
109     def make_color(self, color: str) -> bool:
110         pass
111
112
113 class GoogleLight(BaseLight):
114     def __init__(self, name: str, mac: str, keywords: str = "") -> None:
115         super().__init__(name, mac, keywords)
116
117     def goog_name(self) -> str:
118         name = self.get_name()
119         return name.replace("_", " ")
120
121     @staticmethod
122     def parse_google_response(response: GoogleResponse) -> bool:
123         return response.success
124
125     @overrides
126     def turn_on(self) -> bool:
127         return GoogleLight.parse_google_response(
128             ask_google(f"turn {self.goog_name()} on")
129         )
130
131     @overrides
132     def turn_off(self) -> bool:
133         return GoogleLight.parse_google_response(
134             ask_google(f"turn {self.goog_name()} off")
135         )
136
137     @overrides
138     def status(self) -> str:
139         if self.is_on():
140             return 'ON'
141         return 'off'
142
143     @overrides
144     def is_on(self) -> bool:
145         r = ask_google(f"is {self.goog_name()} on?")
146         if not r.success:
147             return False
148         return 'is on' in r.audio_transcription
149
150     @overrides
151     def is_off(self) -> bool:
152         return not self.is_on()
153
154     @overrides
155     def get_dimmer_level(self) -> Optional[int]:
156         if not self.has_keyword("dimmer"):
157             return False
158         r = ask_google(f'how bright is {self.goog_name()}?')
159         if not r.success:
160             return None
161
162         # the bookcase one is set to 40% bright
163         txt = r.audio_transcription
164         m = re.search(r"(\d+)% bright", txt)
165         if m is not None:
166             return int(m.group(1))
167         if "is off" in txt:
168             return 0
169         return None
170
171     @overrides
172     def set_dimmer_level(self, level: int) -> bool:
173         if not self.has_keyword("dimmer"):
174             return False
175         if 0 <= level <= 100:
176             was_on = self.is_on()
177             r = ask_google(f"set {self.goog_name()} to {level} percent")
178             if not r.success:
179                 return False
180             if not was_on:
181                 self.turn_off()
182             return True
183         return False
184
185     @overrides
186     def make_color(self, color: str) -> bool:
187         return GoogleLight.parse_google_response(
188             ask_google(f"make {self.goog_name()} {color}")
189         )
190
191
192 class TuyaLight(BaseLight):
193     ids_by_mac = {
194         '68:C6:3A:DE:1A:94': '8844664268c63ade1a94',
195         '68:C6:3A:DE:27:1A': '8844664268c63ade271a',
196         '68:C6:3A:DE:1D:95': '8844664268c63ade1d95',
197         '68:C6:3A:DE:19:B3': '8844664268c63ade19b3',
198         '80:7D:3A:77:3B:F5': '07445340807d3a773bf5',
199         '80:7D:3A:58:37:02': '07445340807d3a583702',
200     }
201     keys_by_mac = {
202         '68:C6:3A:DE:1A:94': '237f19b1b3d49c36',
203         '68:C6:3A:DE:27:1A': '237f19b1b3d49c36',
204         '68:C6:3A:DE:1D:95': '04b90fc5cd7625d8',
205         '68:C6:3A:DE:19:B3': '2d601f2892f1aefd',
206         '80:7D:3A:77:3B:F5': '27ab921fe4633519',
207         '80:7D:3A:58:37:02': '8559b5416bfa0c05',
208     }
209
210     def __init__(self, name: str, mac: str, keywords: str = "") -> None:
211         super().__init__(name, mac, keywords)
212         mac = mac.upper()
213         if mac not in TuyaLight.ids_by_mac or mac not in TuyaLight.keys_by_mac:
214             raise Exception(f'{mac} is unknown; add it to ids_by_mac and keys_by_mac')
215         self.devid = TuyaLight.ids_by_mac[mac]
216         self.key = TuyaLight.keys_by_mac[mac]
217         self.arper = arper.Arper()
218         ip = self.get_ip()
219         self.bulb = tt.BulbDevice(self.devid, ip, local_key=self.key)
220
221     def get_status(self) -> Dict[str, Any]:
222         return self.bulb.status()
223
224     @overrides
225     def status(self) -> str:
226         ret = ''
227         for k, v in self.bulb.status().items():
228             ret += f'{k} = {v}\n'
229         return ret
230
231     @overrides
232     def turn_on(self) -> bool:
233         self.bulb.turn_on()
234         return True
235
236     @overrides
237     def turn_off(self) -> bool:
238         self.bulb.turn_off()
239         return True
240
241     @overrides
242     def is_on(self) -> bool:
243         s = self.get_status()
244         return s['dps']['1']
245
246     @overrides
247     def is_off(self) -> bool:
248         return not self.is_on()
249
250     @overrides
251     def get_dimmer_level(self) -> Optional[int]:
252         s = self.get_status()
253         return s['dps']['3']
254
255     @overrides
256     def set_dimmer_level(self, level: int) -> bool:
257         logger.debug(f'Setting brightness to {level}')
258         self.bulb.set_brightness(level)
259         return True
260
261     @overrides
262     def make_color(self, color: str) -> bool:
263         rgb = BaseLight.parse_color_string(color)
264         logger.debug(f'Light color: {color} -> {rgb}')
265         if rgb is not None:
266             self.bulb.set_colour(rgb[0], rgb[1], rgb[2])
267             return True
268         return False
269
270
271 class TPLinkLight(BaseLight):
272     def __init__(self, name: str, mac: str, keywords: str = "") -> None:
273         super().__init__(name, mac, keywords)
274         self.children: List[str] = []
275         self.info: Optional[Dict] = None
276         self.info_ts: Optional[datetime.datetime] = None
277         if "children" in self.keywords:
278             self.info = self.get_info()
279             if self.info is not None:
280                 for child in self.info["children"]:
281                     self.children.append(child["id"])
282
283     @memoized
284     def get_tplink_name(self) -> Optional[str]:
285         self.info = self.get_info()
286         if self.info is not None:
287             return self.info["alias"]
288         return None
289
290     def get_cmdline(self, child: str = None) -> str:
291         cmd = (
292             f"{config.config['smart_lights_tplink_location']} -m {self.mac} "
293             f"--no_logging_console "
294         )
295         if child is not None:
296             cmd += f"-x {child} "
297         return cmd
298
299     def get_children(self) -> List[str]:
300         return self.children
301
302     def command(
303         self, cmd: str, child: str = None, extra_args: str = None
304     ) -> bool:
305         cmd = self.get_cmdline(child) + f"-c {cmd}"
306         if extra_args is not None:
307             cmd += f" {extra_args}"
308         logger.debug(f'About to execute {cmd}')
309         return tplink_light_command(cmd)
310
311     @overrides
312     def turn_on(self, child: str = None) -> bool:
313         return self.command("on", child)
314
315     @overrides
316     def turn_off(self, child: str = None) -> bool:
317         return self.command("off", child)
318
319     @overrides
320     def is_on(self) -> bool:
321         return self.get_on_duration_seconds() > 0
322
323     @overrides
324     def is_off(self) -> bool:
325         return not self.is_on()
326
327     @overrides
328     def make_color(self, color: str) -> bool:
329         raise NotImplementedError
330
331     @timeout(
332         10.0, use_signals=False, error_message="Timed out waiting for tplink.py"
333     )
334     def get_info(self) -> Optional[Dict]:
335         cmd = self.get_cmdline() + "-c info"
336         out = subprocess.getoutput(cmd)
337         out = re.sub("Sent:.*\n", "", out)
338         out = re.sub("Received: *", "", out)
339         try:
340             self.info = json.loads(out)["system"]["get_sysinfo"]
341             self.info_ts = datetime.datetime.now()
342             return self.info
343         except Exception as e:
344             logger.exception(e)
345             print(out, file=sys.stderr)
346             self.info = None
347             self.info_ts = None
348             return None
349
350     @overrides
351     def status(self) -> str:
352         ret = ''
353         for k, v in self.get_info().items():
354             ret += f'{k} = {v}\n'
355         return ret
356
357     def get_on_duration_seconds(self, child: str = None) -> int:
358         self.info = self.get_info()
359         if child is None:
360             if self.info is None:
361                 return 0
362             return int(self.info.get("on_time", "0"))
363         else:
364             if self.info is None:
365                 return 0
366             for chi in self.info.get("children", {}):
367                 if chi["id"] == child:
368                     return int(chi.get("on_time", "0"))
369         return 0
370
371     @overrides
372     def get_dimmer_level(self) -> Optional[int]:
373         if not self.has_keyword("dimmer"):
374             return False
375         self.info = self.get_info()
376         if self.info is None:
377             return None
378         return int(self.info.get("brightness", "0"))
379
380     @overrides
381     def set_dimmer_level(self, level: int) -> bool:
382         if not self.has_keyword("dimmer"):
383             return False
384         cmd = (
385             self.get_cmdline()
386             + f'-j \'{{"smartlife.iot.dimmer":{{"set_brightness":{{"brightness":{level} }} }} }}\''
387         )
388         return tplink_light_command(cmd)
389
390
391 # class GoogleLightGroup(GoogleLight):
392 #     def __init__(self, name: str, members: List[GoogleLight], keywords: str = "") -> None:
393 #         if len(members) < 1:
394 #             raise Exception("There must be at least one light in the group.")
395 #         self.members = members
396 #         mac = GoogleLightGroup.make_up_mac(members)
397 #         super().__init__(name, mac, keywords)
398
399 #     @staticmethod
400 #     def make_up_mac(members: List[GoogleLight]):
401 #         mac = members[0].get_mac()
402 #         b = mac.split(':')
403 #         b[5] = int(b[5], 16) + 1
404 #         if b[5] > 255:
405 #             b[5] = 0
406 #         b[5] = str(b[5])
407 #         return ":".join(b)
408
409 #     def is_on(self) -> bool:
410 #         r = ask_google(f"are {self.goog_name()} on?")
411 #         if not r.success:
412 #             return False
413 #         return 'is on' in r.audio_transcription
414
415 #     def get_dimmer_level(self) -> Optional[int]:
416 #         if not self.has_keyword("dimmer"):
417 #             return False
418 #         r = ask_google(f'how bright are {self.goog_name()}?')
419 #         if not r.success:
420 #             return None
421
422 #         # four lights are set to 100% brightness
423 #         txt = r.audio_transcription
424 #         m = re.search(r"(\d+)% bright", txt)
425 #         if m is not None:
426 #             return int(m.group(1))
427 #         if "is off" in txt:
428 #             return 0
429 #         return None
430
431 #     def set_dimmer_level(self, level: int) -> bool:
432 #         if not self.has_keyword("dimmer"):
433 #             return False
434 #         if 0 <= level <= 100:
435 #             was_on = self.is_on()
436 #             r = ask_google(f"set {self.goog_name()} to {level} percent")
437 #             if not r.success:
438 #                 return False
439 #             if not was_on:
440 #                 self.turn_off()
441 #             return True
442 #         return False
443
444 #     def make_color(self, color: str) -> bool:
445 #         return GoogleLight.parse_google_response(
446 #             ask_google(f"make {self.goog_name()} {color}")
447 #         )