Clean up ACLs
[python_utils.git] / light_utils.py
1 #!/usr/bin/env python3
2
3 """Utilities for dealing with the smart lights."""
4
5 from abc import ABC, abstractmethod
6 import datetime
7 import json
8 import logging
9 import os
10 import re
11 import subprocess
12 import sys
13 from typing import Dict, List, Optional, Set
14
15 import argparse_utils
16 import config
17 import logical_search
18 import logging_utils
19 import google_assistant as goog
20 from decorator_utils import timeout, memoized
21
22 logger = logging.getLogger(__name__)
23
24 parser = config.add_commandline_args(
25     f"Light Utils ({__file__})",
26     "Args related to light utilities."
27 )
28 parser.add_argument(
29     '--light_utils_tplink_location',
30     default='/home/scott/bin/tplink.py',
31     metavar='FILENAME',
32     help='The location of the tplink.py helper',
33     type=argparse_utils.valid_filename,
34 )
35 parser.add_argument(
36     '--light_utils_network_mac_addresses_location',
37     default='/home/scott/bin/network_mac_addresses.txt',
38     metavar='FILENAME',
39     help='The location of network_mac_addresses.txt',
40     type=argparse_utils.valid_filename,
41 )
42
43
44 @timeout(
45     5.0, use_signals=False, error_message="Timed out waiting for tplink.py"
46 )
47 def tplink_light_command(command: str) -> bool:
48     result = os.system(command)
49     signal = result & 0xFF
50     if signal != 0:
51         logging_utils.hlog("%s died with signal %d" % (command, signal))
52         return False
53     else:
54         exit_value = result >> 8
55         if exit_value != 0:
56             logging_utils.hlog("%s failed, exit %d" % (command, exit_value))
57             return False
58     return True
59
60
61 class Light(ABC):
62     def __init__(self, name: str, mac: str, keywords: str = "") -> None:
63         self.name = name.strip()
64         self.mac = mac.strip()
65         self.keywords = keywords.strip()
66         self.kws = keywords.split()
67
68     def get_name(self) -> str:
69         return self.name
70
71     def get_mac(self) -> str:
72         return self.mac
73
74     @abstractmethod
75     def turn_on(self) -> bool:
76         pass
77
78     @abstractmethod
79     def turn_off(self) -> bool:
80         pass
81
82     @abstractmethod
83     def is_on(self) -> bool:
84         pass
85
86     @abstractmethod
87     def is_off(self) -> bool:
88         pass
89
90     @abstractmethod
91     def get_dimmer_level(self) -> Optional[int]:
92         pass
93
94     @abstractmethod
95     def set_dimmer_level(self, level: int) -> bool:
96         pass
97
98     @abstractmethod
99     def make_color(self, color: str) -> bool:
100         pass
101
102     def get_keywords(self) -> List[str]:
103         return self.kws
104
105     def has_keyword(self, keyword: str) -> bool:
106         for kw in self.kws:
107             if kw == keyword:
108                 return True
109         return False
110
111
112 class GoogleLight(Light):
113     def __init__(self, name: str, mac: str, keywords: str = "") -> None:
114         super().__init__(name, mac, keywords)
115
116     def goog_name(self) -> str:
117         name = self.get_name()
118         return name.replace("_", " ")
119
120     @staticmethod
121     def parse_google_response(response: goog.GoogleResponse) -> bool:
122         return response.success
123
124     def turn_on(self) -> bool:
125         return GoogleLight.parse_google_response(
126             goog.ask_google(f"turn {self.goog_name()} on")
127         )
128
129     def turn_off(self) -> bool:
130         return GoogleLight.parse_google_response(
131             goog.ask_google(f"turn {self.goog_name()} off")
132         )
133
134     def is_on(self) -> bool:
135         r = goog.ask_google(f"is {self.goog_name()} on?")
136         if not r.success:
137             return False
138         return 'is on' in r.audio_transcription
139
140     def is_off(self) -> bool:
141         return not self.is_on()
142
143     def get_dimmer_level(self) -> Optional[int]:
144         if not self.has_keyword("dimmer"):
145             return False
146         r = goog.ask_google(f'how bright is {self.goog_name()}?')
147         if not r.success:
148             return None
149
150         # the bookcase one is set to 40% bright
151         txt = r.audio_transcription
152         m = re.search(r"(\d+)% bright", txt)
153         if m is not None:
154             return int(m.group(1))
155         if "is off" in txt:
156             return 0
157         return None
158
159     def set_dimmer_level(self, level: int) -> bool:
160         if not self.has_keyword("dimmer"):
161             return False
162         if 0 <= level <= 100:
163             was_on = self.is_on()
164             r = goog.ask_google(f"set {self.goog_name()} to {level} percent")
165             if not r.success:
166                 return False
167             if not was_on:
168                 self.turn_off()
169             return True
170         return False
171
172     def make_color(self, color: str) -> bool:
173         return GoogleLight.parse_google_response(
174             goog.ask_google(f"make {self.goog_name()} {color}")
175         )
176
177
178 class TPLinkLight(Light):
179     def __init__(self, name: str, mac: str, keywords: str = "") -> None:
180         super().__init__(name, mac, keywords)
181         self.children: List[str] = []
182         self.info: Optional[Dict] = None
183         self.info_ts: Optional[datetime.datetime] = None
184         if "children" in self.keywords:
185             self.info = self.get_info()
186             if self.info is not None:
187                 for child in self.info["children"]:
188                     self.children.append(child["id"])
189
190     @memoized
191     def get_tplink_name(self) -> Optional[str]:
192         self.info = self.get_info()
193         if self.info is not None:
194             return self.info["alias"]
195         return None
196
197     def get_cmdline(self, child: str = None) -> str:
198         cmd = (
199             f"{config.config['light_utils_tplink_location']} -m {self.mac} "
200             f"--no_logging_console "
201         )
202         if child is not None:
203             cmd += f"-x {child} "
204         return cmd
205
206     def get_children(self) -> List[str]:
207         return self.children
208
209     def command(
210         self, cmd: str, child: str = None, extra_args: str = None
211     ) -> bool:
212         cmd = self.get_cmdline(child) + f"-c {cmd}"
213         if extra_args is not None:
214             cmd += f" {extra_args}"
215         return tplink_light_command(cmd)
216
217     def turn_on(self, child: str = None) -> bool:
218         return self.command("on", child)
219
220     def turn_off(self, child: str = None) -> bool:
221         return self.command("off", child)
222
223     def is_on(self) -> bool:
224         return self.get_on_duration_seconds() > 0
225
226     def is_off(self) -> bool:
227         return not self.is_on()
228
229     def make_color(self, color: str) -> bool:
230         raise NotImplementedError
231
232     @timeout(
233         10.0, use_signals=False, error_message="Timed out waiting for tplink.py"
234     )
235     def get_info(self) -> Optional[Dict]:
236         cmd = self.get_cmdline() + "-c info"
237         out = subprocess.getoutput(cmd)
238         out = re.sub("Sent:.*\n", "", out)
239         out = re.sub("Received: *", "", out)
240         try:
241             self.info = json.loads(out)["system"]["get_sysinfo"]
242             self.info_ts = datetime.datetime.now()
243             return self.info
244         except Exception as e:
245             logger.exception(e)
246             print(out, file=sys.stderr)
247             self.info = None
248             self.info_ts = None
249             return None
250
251     def get_on_duration_seconds(self, child: str = None) -> int:
252         self.info = self.get_info()
253         if child is None:
254             if self.info is None:
255                 return 0
256             return int(self.info.get("on_time", "0"))
257         else:
258             if self.info is None:
259                 return 0
260             for chi in self.info.get("children", {}):
261                 if chi["id"] == child:
262                     return int(chi.get("on_time", "0"))
263         return 0
264
265     def get_on_limit_seconds(self) -> Optional[int]:
266         for kw in self.kws:
267             m = re.search(r"timeout:(\d+)", kw)
268             if m is not None:
269                 return int(m.group(1)) * 60
270         return None
271
272     def get_dimmer_level(self) -> Optional[int]:
273         if not self.has_keyword("dimmer"):
274             return False
275         self.info = self.get_info()
276         if self.info is None:
277             return None
278         return int(self.info.get("brightness", "0"))
279
280     def set_dimmer_level(self, level: int) -> bool:
281         if not self.has_keyword("dimmer"):
282             return False
283         cmd = (
284             self.get_cmdline()
285             + f'-j \'{{"smartlife.iot.dimmer":{{"set_brightness":{{"brightness":{level} }} }} }}\''
286         )
287         return tplink_light_command(cmd)
288
289
290 class LightingConfig(object):
291     """Representation of the smart light device config."""
292
293     def __init__(
294             self,
295             config_file: str = None,
296     ) -> None:
297         if config_file is None:
298             config_file = config.config[
299                 'light_utils_network_mac_addresses_location'
300             ]
301         self.macs_by_name = {}
302         self._keywords_by_name = {}
303         self.keywords_by_mac = {}
304         self.names_by_mac = {}
305         self.corpus = logical_search.Corpus()
306         with open(config_file, "r") as f:
307             contents = f.readlines()
308         for line in contents:
309             line = line.rstrip("\n")
310             line = re.sub(r"#.*$", r"", line)
311             line = line.strip()
312             if line == "":
313                 continue
314             (mac, name, keywords) = line.split(",")
315             mac = mac.strip()
316             name = name.strip()
317             keywords = keywords.strip()
318             if "perm" not in keywords:
319                 continue
320             properties = [("name", name)]
321             tags = set()
322             for kw in keywords.split():
323                 if ":" in kw:
324                     key, value = kw.split(":")
325                     properties.append((key, value))
326                 else:
327                     tags.add(kw)
328             properties.append(("name", name))
329             self.macs_by_name[name] = mac
330             self._keywords_by_name[name] = keywords
331             self.keywords_by_mac[mac] = keywords
332             self.names_by_mac[mac] = name
333             self.corpus.add_doc(
334                 logical_search.Document(
335                     docid=mac,
336                     tags=tags,
337                     properties=properties,
338                     reference=None,
339                 )
340             )
341
342     def __repr__(self) -> str:
343         s = "Known devices:\n"
344         for name, keywords in self._keywords_by_name.items():
345             mac = self.macs_by_name[name]
346             s += f"  {name} ({mac}) => {keywords}\n"
347         return s
348
349     def get_keywords_by_name(self, name: str) -> Optional[str]:
350         return self._keywords_by_name.get(name, None)
351
352     def get_macs_by_name(self, name: str) -> Set[str]:
353         retval = set()
354         for (mac, lname) in self.names_by_mac.items():
355             if name in lname:
356                 retval.add(mac)
357         return retval
358
359     def get_macs_by_keyword(self, keyword: str) -> Set[str]:
360         retval = set()
361         for (mac, keywords) in self.keywords_by_mac.items():
362             if keyword in keywords:
363                 retval.add(mac)
364         return retval
365
366     def get_light_by_name(self, name: str) -> Optional[Light]:
367         if name in self.macs_by_name:
368             return self.get_light_by_mac(self.macs_by_name[name])
369         return None
370
371     def get_all_lights(self) -> List[Light]:
372         retval = []
373         for (mac, kws) in self.keywords_by_mac.items():
374             if mac is not None:
375                 light = self.get_light_by_mac(mac)
376                 if light is not None:
377                     retval.append(light)
378         return retval
379
380     def get_light_by_mac(self, mac: str) -> Optional[Light]:
381         if mac in self.keywords_by_mac:
382             name = self.names_by_mac[mac]
383             kws = self.keywords_by_mac[mac]
384             if "tplink" in kws.lower():
385                 return TPLinkLight(name, mac, kws)
386             else:
387                 return GoogleLight(name, mac, kws)
388         return None
389
390     def query(self, query: str) -> List[Light]:
391         """Evaluates a lighting query expression formed of keywords to search
392         for, logical operators (and, or, not), and parenthesis.
393         Returns a list of matching lights.
394         """
395         retval = []
396         results = self.corpus.query(query)
397         if results is not None:
398             for mac in results:
399                 if mac is not None:
400                     light = self.get_light_by_mac(mac)
401                     if light is not None:
402                         retval.append(light)
403         return retval