Various changes.
[python_utils.git] / smart_home / lights.py
1 #!/usr/bin/env python3
2
3 """Utilities for dealing with the smart lights."""
4
5 from abc import abstractmethod
6 import datetime
7 import json
8 import logging
9 import os
10 import re
11 import subprocess
12 import sys
13 from typing import Any, Dict, List, Optional, Tuple
14
15 from overrides import overrides
16 import tinytuya as tt
17
18 import ansi
19 import argparse_utils
20 import arper
21 import config
22 import logging_utils
23 import smart_home.device as dev
24 from google_assistant import ask_google, GoogleResponse
25 from decorator_utils import timeout, memoized
26
27 logger = logging.getLogger(__name__)
28
29 args = config.add_commandline_args(
30     f"Smart Lights ({__file__})",
31     "Args related to smart lights.",
32 )
33 args.add_argument(
34     '--smart_lights_tplink_location',
35     default='/home/scott/bin/tplink.py',
36     metavar='FILENAME',
37     help='The location of the tplink.py helper',
38     type=argparse_utils.valid_filename,
39 )
40
41
42 @timeout(
43     5.0, use_signals=False, error_message="Timed out waiting for tplink.py"
44 )
45 def tplink_light_command(command: str) -> bool:
46     result = os.system(command)
47     signal = result & 0xFF
48     if signal != 0:
49         logger.warning(f'{command} died with signal {signal}')
50         logging_utils.hlog("%s died with signal %d" % (command, signal))
51         return False
52     else:
53         exit_value = result >> 8
54         if exit_value != 0:
55             logger.warning(f'{command} failed, exited {exit_value}')
56             logging_utils.hlog("%s failed, exit %d" % (command, exit_value))
57             return False
58     logger.debug(f'{command} succeeded.')
59     return True
60
61
62 class BaseLight(dev.Device):
63     def __init__(self, name: str, mac: str, keywords: str = "") -> None:
64         super().__init__(name.strip(), mac.strip(), keywords)
65
66     @staticmethod
67     def parse_color_string(color: str) -> Optional[Tuple[int, int, int]]:
68         m = re.match(
69             'r#?([0-9a-fA-F][0-9a-fA-F])([0-9a-fA-F][0-9a-fA-F])([0-9a-fA-F][0-9a-fA-F])',
70             color
71         )
72         if m is not None and len(m.group) == 3:
73             red = int(m.group(0), 16)
74             green = int(m.group(1), 16)
75             blue = int(m.group(2), 16)
76             return (red, green, blue)
77         color = color.lower()
78         return ansi.COLOR_NAMES_TO_RGB.get(color, None)
79
80     @abstractmethod
81     def status(self) -> str:
82         pass
83
84     @abstractmethod
85     def turn_on(self) -> bool:
86         pass
87
88     @abstractmethod
89     def turn_off(self) -> bool:
90         pass
91
92     @abstractmethod
93     def is_on(self) -> bool:
94         pass
95
96     @abstractmethod
97     def is_off(self) -> bool:
98         pass
99
100     @abstractmethod
101     def get_dimmer_level(self) -> Optional[int]:
102         pass
103
104     @abstractmethod
105     def set_dimmer_level(self, level: int) -> bool:
106         pass
107
108     @abstractmethod
109     def make_color(self, color: str) -> bool:
110         pass
111
112
113 class GoogleLight(BaseLight):
114     def __init__(self, name: str, mac: str, keywords: str = "") -> None:
115         super().__init__(name, mac, keywords)
116
117     def goog_name(self) -> str:
118         name = self.get_name()
119         return name.replace("_", " ")
120
121     @staticmethod
122     def parse_google_response(response: GoogleResponse) -> bool:
123         return response.success
124
125     @overrides
126     def turn_on(self) -> bool:
127         return GoogleLight.parse_google_response(
128             ask_google(f"turn {self.goog_name()} on")
129         )
130
131     @overrides
132     def turn_off(self) -> bool:
133         return GoogleLight.parse_google_response(
134             ask_google(f"turn {self.goog_name()} off")
135         )
136
137     @overrides
138     def status(self) -> str:
139         if self.is_on():
140             return 'ON'
141         return 'off'
142
143     @overrides
144     def is_on(self) -> bool:
145         r = ask_google(f"is {self.goog_name()} on?")
146         if not r.success:
147             return False
148         return 'is on' in r.audio_transcription
149
150     @overrides
151     def is_off(self) -> bool:
152         return not self.is_on()
153
154     @overrides
155     def get_dimmer_level(self) -> Optional[int]:
156         if not self.has_keyword("dimmer"):
157             return False
158         r = ask_google(f'how bright is {self.goog_name()}?')
159         if not r.success:
160             return None
161
162         # the bookcase one is set to 40% bright
163         txt = r.audio_transcription
164         m = re.search(r"(\d+)% bright", txt)
165         if m is not None:
166             return int(m.group(1))
167         if "is off" in txt:
168             return 0
169         return None
170
171     @overrides
172     def set_dimmer_level(self, level: int) -> bool:
173         if not self.has_keyword("dimmer"):
174             return False
175         if 0 <= level <= 100:
176             was_on = self.is_on()
177             r = ask_google(f"set {self.goog_name()} to {level} percent")
178             if not r.success:
179                 return False
180             if not was_on:
181                 self.turn_off()
182             return True
183         return False
184
185     @overrides
186     def make_color(self, color: str) -> bool:
187         return GoogleLight.parse_google_response(
188             ask_google(f"make {self.goog_name()} {color}")
189         )
190
191
192 class TuyaLight(BaseLight):
193     ids_by_mac = {
194         '68:C6:3A:DE:1A:94': '8844664268c63ade1a94',
195         '68:C6:3A:DE:27:1A': '8844664268c63ade271a',
196         '68:C6:3A:DE:1D:95': '8844664268c63ade1d95',
197         '68:C6:3A:DE:19:B3': '8844664268c63ade19b3',
198         '80:7D:3A:77:3B:F5': '07445340807d3a773bf5',
199         '80:7D:3A:58:37:02': '07445340807d3a583702',
200     }
201     keys_by_mac = {
202         '68:C6:3A:DE:1A:94': '237f19b1b3d49c36',
203         '68:C6:3A:DE:27:1A': '237f19b1b3d49c36',
204         '68:C6:3A:DE:1D:95': '04b90fc5cd7625d8',
205         '68:C6:3A:DE:19:B3': '2d601f2892f1aefd',
206         '80:7D:3A:77:3B:F5': '27ab921fe4633519',
207         '80:7D:3A:58:37:02': '8559b5416bfa0c05',
208     }
209
210     def __init__(self, name: str, mac: str, keywords: str = "") -> None:
211         super().__init__(name, mac, keywords)
212         mac = mac.upper()
213         if mac not in TuyaLight.ids_by_mac or mac not in TuyaLight.keys_by_mac:
214             raise Exception(f'{mac} is unknown; add it to ids_by_mac and keys_by_mac')
215         self.devid = TuyaLight.ids_by_mac[mac]
216         self.key = TuyaLight.keys_by_mac[mac]
217         self.arper = arper.Arper()
218         ip = self.get_ip()
219         self.bulb = tt.BulbDevice(self.devid, ip, local_key=self.key)
220
221     def get_status(self) -> Dict[str, Any]:
222         return self.bulb.status()
223
224     @overrides
225     def status(self) -> str:
226         ret = ''
227         for k, v in self.bulb.status().items():
228             ret += f'{k} = {v}\n'
229         return ret
230
231     @overrides
232     def turn_on(self) -> bool:
233         self.bulb.turn_on()
234         return True
235
236     @overrides
237     def turn_off(self) -> bool:
238         self.bulb.turn_off()
239         return True
240
241     @overrides
242     def is_on(self) -> bool:
243         s = self.get_status()
244         return s['dps']['1']
245
246     @overrides
247     def is_off(self) -> bool:
248         return not self.is_on()
249
250     @overrides
251     def get_dimmer_level(self) -> Optional[int]:
252         s = self.get_status()
253         return s['dps']['3']
254
255     @overrides
256     def set_dimmer_level(self, level: int) -> bool:
257         logger.debug(f'Setting brightness to {level}')
258         self.bulb.set_brightness(level)
259         return True
260
261     @overrides
262     def make_color(self, color: str) -> bool:
263         rgb = BaseLight.parse_color_string(color)
264         logger.debug(f'Light color: {color} -> {rgb}')
265         if rgb is not None:
266             self.bulb.set_colour(rgb[0], rgb[1], rgb[2])
267             return True
268         return False
269
270
271 class TPLinkLight(BaseLight):
272     def __init__(self, name: str, mac: str, keywords: str = "") -> None:
273         super().__init__(name, mac, keywords)
274         self.children: List[str] = []
275         self.info: Optional[Dict] = None
276         self.info_ts: Optional[datetime.datetime] = None
277         if "children" in self.keywords:
278             self.info = self.get_info()
279             if self.info is not None:
280                 for child in self.info["children"]:
281                     self.children.append(child["id"])
282
283     @memoized
284     def get_tplink_name(self) -> Optional[str]:
285         self.info = self.get_info()
286         if self.info is not None:
287             return self.info["alias"]
288         return None
289
290     def get_cmdline(self, child: str = None) -> str:
291         cmd = (
292             f"{config.config['smart_lights_tplink_location']} -m {self.mac} "
293             f"--no_logging_console "
294         )
295         if child is not None:
296             cmd += f"-x {child} "
297         return cmd
298
299     def get_children(self) -> List[str]:
300         return self.children
301
302     def command(
303         self, cmd: str, child: str = None, extra_args: str = None
304     ) -> bool:
305         cmd = self.get_cmdline(child) + f"-c {cmd}"
306         if extra_args is not None:
307             cmd += f" {extra_args}"
308         logger.debug(f'About to execute {cmd}')
309         return tplink_light_command(cmd)
310
311     @overrides
312     def turn_on(self, child: str = None) -> bool:
313         return self.command("on", child)
314
315     @overrides
316     def turn_off(self, child: str = None) -> bool:
317         return self.command("off", child)
318
319     @overrides
320     def is_on(self) -> bool:
321         return self.get_on_duration_seconds() > 0
322
323     @overrides
324     def is_off(self) -> bool:
325         return not self.is_on()
326
327     @overrides
328     def make_color(self, color: str) -> bool:
329         raise NotImplementedError
330
331     @timeout(
332         10.0, use_signals=False, error_message="Timed out waiting for tplink.py"
333     )
334     def get_info(self) -> Optional[Dict]:
335         cmd = self.get_cmdline() + "-c info"
336         out = subprocess.getoutput(cmd)
337         logger.debug(f'RAW OUT> {out}')
338         out = re.sub("Sent:.*\n", "", out)
339         out = re.sub("Received: *", "", out)
340         try:
341             self.info = json.loads(out)["system"]["get_sysinfo"]
342             self.info_ts = datetime.datetime.now()
343             return self.info
344         except Exception as e:
345             logger.exception(e)
346             print(out, file=sys.stderr)
347             self.info = None
348             self.info_ts = None
349             return None
350
351     @overrides
352     def status(self) -> str:
353         ret = ''
354         for k, v in self.get_info().items():
355             ret += f'{k} = {v}\n'
356         return ret
357
358     def get_on_duration_seconds(self, child: str = None) -> int:
359         self.info = self.get_info()
360         if child is None:
361             if self.info is None:
362                 return 0
363             return int(self.info.get("on_time", "0"))
364         else:
365             if self.info is None:
366                 return 0
367             for chi in self.info.get("children", {}):
368                 if chi["id"] == child:
369                     return int(chi.get("on_time", "0"))
370         return 0
371
372     @overrides
373     def get_dimmer_level(self) -> Optional[int]:
374         if not self.has_keyword("dimmer"):
375             return False
376         self.info = self.get_info()
377         if self.info is None:
378             return None
379         return int(self.info.get("brightness", "0"))
380
381     @overrides
382     def set_dimmer_level(self, level: int) -> bool:
383         if not self.has_keyword("dimmer"):
384             return False
385         cmd = (
386             self.get_cmdline()
387             + f'-j \'{{"smartlife.iot.dimmer":{{"set_brightness":{{"brightness":{level} }} }} }}\''
388         )
389         return tplink_light_command(cmd)
390
391
392 # class GoogleLightGroup(GoogleLight):
393 #     def __init__(self, name: str, members: List[GoogleLight], keywords: str = "") -> None:
394 #         if len(members) < 1:
395 #             raise Exception("There must be at least one light in the group.")
396 #         self.members = members
397 #         mac = GoogleLightGroup.make_up_mac(members)
398 #         super().__init__(name, mac, keywords)
399
400 #     @staticmethod
401 #     def make_up_mac(members: List[GoogleLight]):
402 #         mac = members[0].get_mac()
403 #         b = mac.split(':')
404 #         b[5] = int(b[5], 16) + 1
405 #         if b[5] > 255:
406 #             b[5] = 0
407 #         b[5] = str(b[5])
408 #         return ":".join(b)
409
410 #     def is_on(self) -> bool:
411 #         r = ask_google(f"are {self.goog_name()} on?")
412 #         if not r.success:
413 #             return False
414 #         return 'is on' in r.audio_transcription
415
416 #     def get_dimmer_level(self) -> Optional[int]:
417 #         if not self.has_keyword("dimmer"):
418 #             return False
419 #         r = ask_google(f'how bright are {self.goog_name()}?')
420 #         if not r.success:
421 #             return None
422
423 #         # four lights are set to 100% brightness
424 #         txt = r.audio_transcription
425 #         m = re.search(r"(\d+)% bright", txt)
426 #         if m is not None:
427 #             return int(m.group(1))
428 #         if "is off" in txt:
429 #             return 0
430 #         return None
431
432 #     def set_dimmer_level(self, level: int) -> bool:
433 #         if not self.has_keyword("dimmer"):
434 #             return False
435 #         if 0 <= level <= 100:
436 #             was_on = self.is_on()
437 #             r = ask_google(f"set {self.goog_name()} to {level} percent")
438 #             if not r.success:
439 #                 return False
440 #             if not was_on:
441 #                 self.turn_off()
442 #             return True
443 #         return False
444
445 #     def make_color(self, color: str) -> bool:
446 #         return GoogleLight.parse_google_response(
447 #             ask_google(f"make {self.goog_name()} {color}")
448 #         )