Various changes.
[python_utils.git] / light_utils.py
1 #!/usr/bin/env python3
2
3 """Utilities for dealing with the smart lights."""
4
5 from abc import ABC, abstractmethod
6 import datetime
7 import json
8 import logging
9 import os
10 import re
11 import subprocess
12 import sys
13 from typing import Dict, List, Optional, Set
14
15 import argparse_utils
16 import config
17 import logging_utils
18 from google_assistant import ask_google, GoogleResponse
19 from decorator_utils import timeout, memoized
20
21 logger = logging.getLogger(__name__)
22
23 parser = config.add_commandline_args(
24     f"Light Utils ({__file__})",
25     "Args related to light utilities."
26 )
27 parser.add_argument(
28     '--light_utils_tplink_location',
29     default='/home/scott/bin/tplink.py',
30     metavar='FILENAME',
31     help='The location of the tplink.py helper',
32     type=argparse_utils.valid_filename,
33 )
34 parser.add_argument(
35     '--light_utils_network_mac_addresses_location',
36     default='/home/scott/bin/network_mac_addresses.txt',
37     metavar='FILENAME',
38     help='The location of network_mac_addresses.txt',
39     type=argparse_utils.valid_filename,
40 )
41
42
43 @timeout(
44     5.0, use_signals=False, error_message="Timed out waiting for tplink.py"
45 )
46 def tplink_light_command(command: str) -> bool:
47     result = os.system(command)
48     signal = result & 0xFF
49     if signal != 0:
50         logging_utils.hlog("%s died with signal %d" % (command, signal))
51         return False
52     else:
53         exit_value = result >> 8
54         if exit_value != 0:
55             logging_utils.hlog("%s failed, exit %d" % (command, exit_value))
56             return False
57     return True
58
59
60 class Light(ABC):
61     def __init__(self, name: str, mac: str, keywords: str = "") -> None:
62         self.name = name.strip()
63         self.mac = mac.strip()
64         self.keywords = keywords.strip()
65         self.kws = keywords.split()
66
67     def get_name(self) -> str:
68         return self.name
69
70     def get_mac(self) -> str:
71         return self.mac
72
73     @abstractmethod
74     def turn_on(self) -> bool:
75         pass
76
77     @abstractmethod
78     def turn_off(self) -> bool:
79         pass
80
81     @abstractmethod
82     def is_on(self) -> bool:
83         pass
84
85     @abstractmethod
86     def is_off(self) -> bool:
87         pass
88
89     @abstractmethod
90     def get_dimmer_level(self) -> Optional[int]:
91         pass
92
93     @abstractmethod
94     def set_dimmer_level(self, level: int) -> bool:
95         pass
96
97     @abstractmethod
98     def make_color(self, color: str) -> bool:
99         pass
100
101     def get_keywords(self) -> List[str]:
102         return self.kws
103
104     def has_keyword(self, keyword: str) -> bool:
105         for kw in self.kws:
106             if kw == keyword:
107                 return True
108         return False
109
110
111 class GoogleLight(Light):
112     def __init__(self, name: str, mac: str, keywords: str = "") -> None:
113         super().__init__(name, mac, keywords)
114
115     def goog_name(self) -> str:
116         name = self.get_name()
117         return name.replace("_", " ")
118
119     @staticmethod
120     def parse_google_response(response: GoogleResponse) -> bool:
121         return response.success
122
123     def turn_on(self) -> bool:
124         return GoogleLight.parse_google_response(
125             ask_google(f"turn {self.goog_name()} on")
126         )
127
128     def turn_off(self) -> bool:
129         return GoogleLight.parse_google_response(
130             ask_google(f"turn {self.goog_name()} off")
131         )
132
133     def is_on(self) -> bool:
134         r = ask_google(f"is {self.goog_name()} on?")
135         if not r.success:
136             return False
137         return 'is on' in r.audio_transcription
138
139     def is_off(self) -> bool:
140         return not self.is_on()
141
142     def get_dimmer_level(self) -> Optional[int]:
143         if not self.has_keyword("dimmer"):
144             return False
145         r = ask_google(f'how bright is {self.goog_name()}?')
146         if not r.success:
147             return None
148
149         # the bookcase one is set to 40% bright
150         txt = r.audio_transcription
151         m = re.search(r"(\d+)% bright", txt)
152         if m is not None:
153             return int(m.group(1))
154         if "is off" in txt:
155             return 0
156         return None
157
158     def set_dimmer_level(self, level: int) -> bool:
159         if not self.has_keyword("dimmer"):
160             return False
161         if 0 <= level <= 100:
162             was_on = self.is_on()
163             r = ask_google(f"set {self.goog_name()} to {level} percent")
164             if not r.success:
165                 return False
166             if not was_on:
167                 self.turn_off()
168             return True
169         return False
170
171     def make_color(self, color: str) -> bool:
172         return GoogleLight.parse_google_response(
173             ask_google(f"make {self.goog_name()} {color}")
174         )
175
176
177 class TPLinkLight(Light):
178     def __init__(self, name: str, mac: str, keywords: str = "") -> None:
179         super().__init__(name, mac, keywords)
180         self.children: List[str] = []
181         self.info: Optional[Dict] = None
182         self.info_ts: Optional[datetime.datetime] = None
183         if "children" in self.keywords:
184             self.info = self.get_info()
185             if self.info is not None:
186                 for child in self.info["children"]:
187                     self.children.append(child["id"])
188
189     @memoized
190     def get_tplink_name(self) -> Optional[str]:
191         self.info = self.get_info()
192         if self.info is not None:
193             return self.info["alias"]
194         return None
195
196     def get_cmdline(self, child: str = None) -> str:
197         cmd = (
198             f"{config.config['light_utils_tplink_location']} -m {self.mac} "
199             f"--no_logging_console "
200         )
201         if child is not None:
202             cmd += f"-x {child} "
203         return cmd
204
205     def get_children(self) -> List[str]:
206         return self.children
207
208     def command(
209         self, cmd: str, child: str = None, extra_args: str = None
210     ) -> bool:
211         cmd = self.get_cmdline(child) + f"-c {cmd}"
212         if extra_args is not None:
213             cmd += f" {extra_args}"
214         return tplink_light_command(cmd)
215
216     def turn_on(self, child: str = None) -> bool:
217         return self.command("on", child)
218
219     def turn_off(self, child: str = None) -> bool:
220         return self.command("off", child)
221
222     def is_on(self) -> bool:
223         return self.get_on_duration_seconds() > 0
224
225     def is_off(self) -> bool:
226         return not self.is_on()
227
228     def make_color(self, color: str) -> bool:
229         raise NotImplementedError
230
231     @timeout(
232         10.0, use_signals=False, error_message="Timed out waiting for tplink.py"
233     )
234     def get_info(self) -> Optional[Dict]:
235         cmd = self.get_cmdline() + "-c info"
236         out = subprocess.getoutput(cmd)
237         out = re.sub("Sent:.*\n", "", out)
238         out = re.sub("Received: *", "", out)
239         try:
240             self.info = json.loads(out)["system"]["get_sysinfo"]
241             self.info_ts = datetime.datetime.now()
242             return self.info
243         except Exception as e:
244             logger.exception(e)
245             print(out, file=sys.stderr)
246             self.info = None
247             self.info_ts = None
248             return None
249
250     def get_on_duration_seconds(self, child: str = None) -> int:
251         self.info = self.get_info()
252         if child is None:
253             if self.info is None:
254                 return 0
255             return int(self.info.get("on_time", "0"))
256         else:
257             if self.info is None:
258                 return 0
259             for chi in self.info.get("children", {}):
260                 if chi["id"] == child:
261                     return int(chi.get("on_time", "0"))
262         return 0
263
264     def get_on_limit_seconds(self) -> Optional[int]:
265         for kw in self.kws:
266             m = re.search(r"timeout:(\d+)", kw)
267             if m is not None:
268                 return int(m.group(1)) * 60
269         return None
270
271     def get_dimmer_level(self) -> Optional[int]:
272         if not self.has_keyword("dimmer"):
273             return False
274         self.info = self.get_info()
275         if self.info is None:
276             return None
277         return int(self.info.get("brightness", "0"))
278
279     def set_dimmer_level(self, level: int) -> bool:
280         if not self.has_keyword("dimmer"):
281             return False
282         cmd = (
283             self.get_cmdline()
284             + f'-j \'{{"smartlife.iot.dimmer":{{"set_brightness":{{"brightness":{level} }} }} }}\''
285         )
286         return tplink_light_command(cmd)
287
288
289 class LightingConfig(object):
290     """Representation of the smart light device config."""
291
292     def __init__(
293             self,
294             config_file: str = None,
295     ) -> None:
296         import logical_search
297         if config_file is None:
298             config_file = config.config[
299                 'light_utils_network_mac_addresses_location'
300             ]
301         self.macs_by_name = {}
302         self._keywords_by_name = {}
303         self.keywords_by_mac = {}
304         self.names_by_mac = {}
305         self.corpus = logical_search.Corpus()
306         with open(config_file, "r") as f:
307             contents = f.readlines()
308         for line in contents:
309             line = line.rstrip("\n")
310             line = re.sub(r"#.*$", r"", line)
311             line = line.strip()
312             if line == "":
313                 continue
314             (mac, name, keywords) = line.split(",")
315             mac = mac.strip()
316             name = name.strip()
317             keywords = keywords.strip()
318             if "perm" not in keywords:
319                 continue
320             properties = [("name", name)]
321             tags = set()
322             for kw in keywords.split():
323                 if ":" in kw:
324                     key, value = kw.split(":")
325                     properties.append((key, value))
326                 else:
327                     tags.add(kw)
328             properties.append(("name", name))
329             self.macs_by_name[name] = mac
330             self._keywords_by_name[name] = keywords
331             self.keywords_by_mac[mac] = keywords
332             self.names_by_mac[mac] = name
333             self.corpus.add_doc(
334                 logical_search.Document(
335                     docid=mac,
336                     tags=tags,
337                     properties=properties,
338                     reference=None,
339                 )
340             )
341
342     def __repr__(self) -> str:
343         s = "Known devices:\n"
344         for name, keywords in self._keywords_by_name.items():
345             mac = self.macs_by_name[name]
346             s += f"  {name} ({mac}) => {keywords}\n"
347         return s
348
349     def get_keywords_by_name(self, name: str) -> Optional[str]:
350         return self._keywords_by_name.get(name, None)
351
352     def get_macs_by_name(self, name: str) -> Set[str]:
353         retval = set()
354         for (mac, lname) in self.names_by_mac.items():
355             if name in lname:
356                 retval.add(mac)
357         return retval
358
359     def get_macs_by_keyword(self, keyword: str) -> Set[str]:
360         retval = set()
361         for (mac, keywords) in self.keywords_by_mac.items():
362             if keyword in keywords:
363                 retval.add(mac)
364         return retval
365
366     def get_light_by_name(self, name: str) -> Optional[Light]:
367         if name in self.macs_by_name:
368             return self.get_light_by_mac(self.macs_by_name[name])
369         return None
370
371     def get_all_lights(self) -> List[Light]:
372         retval = []
373         for (mac, kws) in self.keywords_by_mac.items():
374             if mac is not None:
375                 light = self.get_light_by_mac(mac)
376                 if light is not None:
377                     retval.append(light)
378         return retval
379
380     def get_light_by_mac(self, mac: str) -> Optional[Light]:
381         if mac in self.keywords_by_mac:
382             name = self.names_by_mac[mac]
383             kws = self.keywords_by_mac[mac]
384             if "tplink" in kws.lower():
385                 return TPLinkLight(name, mac, kws)
386             else:
387                 return GoogleLight(name, mac, kws)
388         return None
389
390     def query(self, query: str) -> List[Light]:
391         """Evaluates a lighting query expression formed of keywords to search
392         for, logical operators (and, or, not), and parenthesis.
393         Returns a list of matching lights.
394         """
395         retval = []
396         results = self.corpus.query(query)
397         if results is not None:
398             for mac in results:
399                 if mac is not None:
400                     light = self.get_light_by_mac(mac)
401                     if light is not None:
402                         retval.append(light)
403         return retval