54467223b9b1cc36c5edabe9d81df187cc1a409c
[python_utils.git] / smart_home / lights.py
1 #!/usr/bin/env python3
2
3 """Utilities for dealing with the smart lights."""
4
5 from abc import abstractmethod
6 import datetime
7 import json
8 import logging
9 import os
10 import re
11 import subprocess
12 import sys
13 from typing import Any, Dict, List, Optional
14
15 import tinytuya as tt
16
17 import argparse_utils
18 import config
19 import logging_utils
20 import smart_home.device as dev
21 from google_assistant import ask_google, GoogleResponse
22 from decorator_utils import timeout, memoized
23
24 logger = logging.getLogger(__name__)
25
26 parser = config.add_commandline_args(
27     f"Smart Lights ({__file__})",
28     "Args related to smart lights.",
29 )
30 parser.add_argument(
31     '--smart_lights_tplink_location',
32     default='/home/scott/bin/tplink.py',
33     metavar='FILENAME',
34     help='The location of the tplink.py helper',
35     type=argparse_utils.valid_filename,
36 )
37
38
39 @timeout(
40     5.0, use_signals=False, error_message="Timed out waiting for tplink.py"
41 )
42 def tplink_light_command(command: str) -> bool:
43     result = os.system(command)
44     signal = result & 0xFF
45     if signal != 0:
46         logger.warning(f'{command} died with signal {signal}')
47         logging_utils.hlog("%s died with signal %d" % (command, signal))
48         return False
49     else:
50         exit_value = result >> 8
51         if exit_value != 0:
52             logger.warning(f'{command} failed, exited {exit_value}')
53             logging_utils.hlog("%s failed, exit %d" % (command, exit_value))
54             return False
55     logger.debug(f'{command} succeeded.')
56     return True
57
58
59 class BaseLight(dev.Device):
60     def __init__(self, name: str, mac: str, keywords: str = "") -> None:
61         super().__init__(name.strip(), mac.strip(), keywords)
62
63     @abstractmethod
64     def turn_on(self) -> bool:
65         pass
66
67     @abstractmethod
68     def turn_off(self) -> bool:
69         pass
70
71     @abstractmethod
72     def is_on(self) -> bool:
73         pass
74
75     @abstractmethod
76     def is_off(self) -> bool:
77         pass
78
79     @abstractmethod
80     def get_dimmer_level(self) -> Optional[int]:
81         pass
82
83     @abstractmethod
84     def set_dimmer_level(self, level: int) -> bool:
85         pass
86
87     @abstractmethod
88     def make_color(self, color: str) -> bool:
89         pass
90
91
92 class GoogleLight(BaseLight):
93     def __init__(self, name: str, mac: str, keywords: str = "") -> None:
94         super().__init__(name, mac, keywords)
95
96     def goog_name(self) -> str:
97         name = self.get_name()
98         return name.replace("_", " ")
99
100     @staticmethod
101     def parse_google_response(response: GoogleResponse) -> bool:
102         return response.success
103
104     def turn_on(self) -> bool:
105         return GoogleLight.parse_google_response(
106             ask_google(f"turn {self.goog_name()} on")
107         )
108
109     def turn_off(self) -> bool:
110         return GoogleLight.parse_google_response(
111             ask_google(f"turn {self.goog_name()} off")
112         )
113
114     def is_on(self) -> bool:
115         r = ask_google(f"is {self.goog_name()} on?")
116         if not r.success:
117             return False
118         return 'is on' in r.audio_transcription
119
120     def is_off(self) -> bool:
121         return not self.is_on()
122
123     def get_dimmer_level(self) -> Optional[int]:
124         if not self.has_keyword("dimmer"):
125             return False
126         r = ask_google(f'how bright is {self.goog_name()}?')
127         if not r.success:
128             return None
129
130         # the bookcase one is set to 40% bright
131         txt = r.audio_transcription
132         m = re.search(r"(\d+)% bright", txt)
133         if m is not None:
134             return int(m.group(1))
135         if "is off" in txt:
136             return 0
137         return None
138
139     def set_dimmer_level(self, level: int) -> bool:
140         if not self.has_keyword("dimmer"):
141             return False
142         if 0 <= level <= 100:
143             was_on = self.is_on()
144             r = ask_google(f"set {self.goog_name()} to {level} percent")
145             if not r.success:
146                 return False
147             if not was_on:
148                 self.turn_off()
149             return True
150         return False
151
152     def make_color(self, color: str) -> bool:
153         return GoogleLight.parse_google_response(
154             ask_google(f"make {self.goog_name()} {color}")
155         )
156
157
158 class TuyaLight(BaseLight):
159     ids_by_mac = {
160         '68:C6:3A:DE:1A:94': '8844664268c63ade1a94',
161         '68:C6:3A:DE:27:1A': '8844664268c63ade271a',
162         '68:C6:3A:DE:1D:95': '8844664268c63ade1d95',
163         '68:C6:3A:DE:19:B3': '8844664268c63ade19b3',
164         '80:7D:3A:77:3B:F5': '07445340807d3a773bf5',
165         '80:7D:3A:58:37:02': '07445340807d3a583702',
166     }
167     keys_by_mac = {
168         '68:C6:3A:DE:1A:94': '237f19b1b3d49c36',
169         '68:C6:3A:DE:27:1A': '237f19b1b3d49c36',
170         '68:C6:3A:DE:1D:95': '04b90fc5cd7625d8',
171         '68:C6:3A:DE:19:B3': '2d601f2892f1aefd',
172         '80:7D:3A:77:3B:F5': '27ab921fe4633519',
173         '80:7D:3A:58:37:02': '8559b5416bfa0c05',
174     }
175
176     def __init__(self, name: str, mac: str, keywords: str = "") -> None:
177         from subprocess import Popen, PIPE
178         super().__init__(name, mac, keywords)
179         mac = mac.upper()
180         if mac not in TuyaLight.ids_by_mac or mac not in TuyaLight.keys_by_mac:
181             raise Exception(f'{mac} is unknown; add it to ids_by_mac and keys_by_mac')
182         self.devid = TuyaLight.ids_by_mac[mac]
183         self.key = TuyaLight.keys_by_mac[mac]
184         try:
185             pid = Popen(['maclookup', mac], stdout=PIPE)
186             ip = pid.communicate()[0]
187             ip = ip[:-1]
188         except Exception:
189             ip = '0.0.0.0'
190         self.bulb = tt.BulbDevice(self.devid, ip, local_key=self.key)
191
192     def turn_on(self) -> bool:
193         self.bulb.turn_on()
194         return True
195
196     def turn_off(self) -> bool:
197         self.bulb.turn_off()
198         return True
199
200     def get_status(self) -> Dict[str, Any]:
201         return self.bulb.status()
202
203     def is_on(self) -> bool:
204         s = self.get_status()
205         return s['dps']['1']
206
207     def is_off(self) -> bool:
208         return not self.is_on()
209
210     def get_dimmer_level(self) -> Optional[int]:
211         s = self.get_status()
212         return s['dps']['3']
213
214     def set_dimmer_level(self, level: int) -> bool:
215         self.bulb.set_brightness(level)
216         return True
217
218     def make_color(self, color: str) -> bool:
219         self.bulb.set_colour(255,0,0)
220         return True
221
222
223 class TPLinkLight(BaseLight):
224     def __init__(self, name: str, mac: str, keywords: str = "") -> None:
225         super().__init__(name, mac, keywords)
226         self.children: List[str] = []
227         self.info: Optional[Dict] = None
228         self.info_ts: Optional[datetime.datetime] = None
229         if "children" in self.keywords:
230             self.info = self.get_info()
231             if self.info is not None:
232                 for child in self.info["children"]:
233                     self.children.append(child["id"])
234
235     @memoized
236     def get_tplink_name(self) -> Optional[str]:
237         self.info = self.get_info()
238         if self.info is not None:
239             return self.info["alias"]
240         return None
241
242     def get_cmdline(self, child: str = None) -> str:
243         cmd = (
244             f"{config.config['smart_lights_tplink_location']} -m {self.mac} "
245             f"--no_logging_console "
246         )
247         if child is not None:
248             cmd += f"-x {child} "
249         return cmd
250
251     def get_children(self) -> List[str]:
252         return self.children
253
254     def command(
255         self, cmd: str, child: str = None, extra_args: str = None
256     ) -> bool:
257         cmd = self.get_cmdline(child) + f"-c {cmd}"
258         if extra_args is not None:
259             cmd += f" {extra_args}"
260         logger.debug(f'About to execute {cmd}')
261         return tplink_light_command(cmd)
262
263     def turn_on(self, child: str = None) -> bool:
264         return self.command("on", child)
265
266     def turn_off(self, child: str = None) -> bool:
267         return self.command("off", child)
268
269     def is_on(self) -> bool:
270         return self.get_on_duration_seconds() > 0
271
272     def is_off(self) -> bool:
273         return not self.is_on()
274
275     def make_color(self, color: str) -> bool:
276         raise NotImplementedError
277
278     @timeout(
279         10.0, use_signals=False, error_message="Timed out waiting for tplink.py"
280     )
281     def get_info(self) -> Optional[Dict]:
282         cmd = self.get_cmdline() + "-c info"
283         out = subprocess.getoutput(cmd)
284         out = re.sub("Sent:.*\n", "", out)
285         out = re.sub("Received: *", "", out)
286         try:
287             self.info = json.loads(out)["system"]["get_sysinfo"]
288             self.info_ts = datetime.datetime.now()
289             return self.info
290         except Exception as e:
291             logger.exception(e)
292             print(out, file=sys.stderr)
293             self.info = None
294             self.info_ts = None
295             return None
296
297     def get_on_duration_seconds(self, child: str = None) -> int:
298         self.info = self.get_info()
299         if child is None:
300             if self.info is None:
301                 return 0
302             return int(self.info.get("on_time", "0"))
303         else:
304             if self.info is None:
305                 return 0
306             for chi in self.info.get("children", {}):
307                 if chi["id"] == child:
308                     return int(chi.get("on_time", "0"))
309         return 0
310
311     def get_on_limit_seconds(self) -> Optional[int]:
312         for kw in self.kws:
313             m = re.search(r"timeout:(\d+)", kw)
314             if m is not None:
315                 return int(m.group(1)) * 60
316         return None
317
318     def get_dimmer_level(self) -> Optional[int]:
319         if not self.has_keyword("dimmer"):
320             return False
321         self.info = self.get_info()
322         if self.info is None:
323             return None
324         return int(self.info.get("brightness", "0"))
325
326     def set_dimmer_level(self, level: int) -> bool:
327         if not self.has_keyword("dimmer"):
328             return False
329         cmd = (
330             self.get_cmdline()
331             + f'-j \'{{"smartlife.iot.dimmer":{{"set_brightness":{{"brightness":{level} }} }} }}\''
332         )
333         return tplink_light_command(cmd)
334
335
336 # class GoogleLightGroup(GoogleLight):
337 #     def __init__(self, name: str, members: List[GoogleLight], keywords: str = "") -> None:
338 #         if len(members) < 1:
339 #             raise Exception("There must be at least one light in the group.")
340 #         self.members = members
341 #         mac = GoogleLightGroup.make_up_mac(members)
342 #         super().__init__(name, mac, keywords)
343
344 #     @staticmethod
345 #     def make_up_mac(members: List[GoogleLight]):
346 #         mac = members[0].get_mac()
347 #         b = mac.split(':')
348 #         b[5] = int(b[5], 16) + 1
349 #         if b[5] > 255:
350 #             b[5] = 0
351 #         b[5] = str(b[5])
352 #         return ":".join(b)
353
354 #     def is_on(self) -> bool:
355 #         r = ask_google(f"are {self.goog_name()} on?")
356 #         if not r.success:
357 #             return False
358 #         return 'is on' in r.audio_transcription
359
360 #     def get_dimmer_level(self) -> Optional[int]:
361 #         if not self.has_keyword("dimmer"):
362 #             return False
363 #         r = ask_google(f'how bright are {self.goog_name()}?')
364 #         if not r.success:
365 #             return None
366
367 #         # four lights are set to 100% brightness
368 #         txt = r.audio_transcription
369 #         m = re.search(r"(\d+)% bright", txt)
370 #         if m is not None:
371 #             return int(m.group(1))
372 #         if "is off" in txt:
373 #             return 0
374 #         return None
375
376 #     def set_dimmer_level(self, level: int) -> bool:
377 #         if not self.has_keyword("dimmer"):
378 #             return False
379 #         if 0 <= level <= 100:
380 #             was_on = self.is_on()
381 #             r = ask_google(f"set {self.goog_name()} to {level} percent")
382 #             if not r.success:
383 #                 return False
384 #             if not was_on:
385 #                 self.turn_off()
386 #             return True
387 #         return False
388
389 #     def make_color(self, color: str) -> bool:
390 #         return GoogleLight.parse_google_response(
391 #             ask_google(f"make {self.goog_name()} {color}")
392 #         )