Minor editing.
[python_utils.git] / light_utils.py
1 #!/usr/bin/env python3
2
3 """Utilities for dealing with the smart lights."""
4
5 from abc import ABC, abstractmethod
6 import datetime
7 import json
8 import logging
9 import os
10 import re
11 import subprocess
12 import sys
13 from typing import Dict, List, Optional, Set
14
15 import argparse_utils
16 import config
17 import logical_search
18 import logging_utils
19 import google_assistant as goog
20 from decorator_utils import timeout, memoized
21
22 logger = logging.getLogger(__name__)
23
24 parser = config.add_commandline_args(
25     f"Light Utils ({__file__})",
26     "Args related to light utilities."
27 )
28 parser.add_argument(
29     '--light_utils_tplink_location',
30     default='/home/scott/bin/tplink.py',
31     metavar='FILENAME',
32     help='The location of the tplink.py helper',
33     type=argparse_utils.valid_filename,
34 )
35 parser.add_argument(
36     '--light_utils_network_mac_addresses_location',
37     default='/home/scott/bin/network_mac_addresses.txt',
38     metavar='FILENAME',
39     help='The location of network_mac_addresses.txt',
40     type=argparse_utils.valid_filename,
41 )
42
43
44 @timeout(
45     5.0, use_signals=False, error_message="Timed out waiting for tplink.py"
46 )
47 def tplink_light_command(command: str) -> bool:
48     result = os.system(command)
49     signal = result & 0xFF
50     if signal != 0:
51         logging_utils.hlog("%s died with signal %d" % (command, signal))
52         return False
53     else:
54         exit_value = result >> 8
55         if exit_value != 0:
56             logging_utils.hlog("%s failed, exit %d" % (command, exit_value))
57             return False
58     return True
59
60
61 class Light(ABC):
62     def __init__(self, name: str, mac: str, keywords: str = "") -> None:
63         self.name = name.strip()
64         self.mac = mac.strip()
65         self.keywords = keywords.strip()
66         self.kws = keywords.split()
67
68     def get_name(self) -> str:
69         return self.name
70
71     def get_mac(self) -> str:
72         return self.mac
73
74     @abstractmethod
75     def turn_on(self) -> bool:
76         pass
77
78     @abstractmethod
79     def turn_off(self) -> bool:
80         pass
81
82     @abstractmethod
83     def set_dimmer_level(self, level: int) -> bool:
84         pass
85
86     @abstractmethod
87     def make_color(self, color: str) -> bool:
88         pass
89
90     def get_keywords(self) -> List[str]:
91         return self.kws
92
93     def has_keyword(self, keyword: str) -> bool:
94         for kw in self.kws:
95             if kw == keyword:
96                 return True
97         return False
98
99
100 class GoogleLight(Light):
101     def __init__(self, name: str, mac: str, keywords: str = "") -> None:
102         super().__init__(name, mac, keywords)
103
104     def goog_name(self) -> str:
105         name = self.get_name()
106         return name.replace("_", " ")
107
108     @staticmethod
109     def parse_google_response(response: goog.GoogleResponse) -> bool:
110         return response.success
111
112     def turn_on(self) -> bool:
113         return GoogleLight.parse_google_response(
114             goog.ask_google(f"turn {self.goog_name()} on")
115         )
116
117     def turn_off(self) -> bool:
118         return GoogleLight.parse_google_response(
119             goog.ask_google(f"turn {self.goog_name()} off")
120         )
121
122     def set_dimmer_level(self, level: int) -> bool:
123         if 0 <= level <= 100:
124             return GoogleLight.parse_google_response(
125                 goog.ask_google(f"set {self.goog_name()} to {level} percent")
126             )
127         return False
128
129     def make_color(self, color: str) -> bool:
130         return GoogleLight.parse_google_response(
131             goog.ask_google(f"make {self.goog_name()} {color}")
132         )
133
134
135 class TPLinkLight(Light):
136     def __init__(self, name: str, mac: str, keywords: str = "") -> None:
137         super().__init__(name, mac, keywords)
138         self.children: List[str] = []
139         self.info: Optional[Dict] = None
140         self.info_ts: Optional[datetime.datetime] = None
141         if "children" in self.keywords:
142             self.info = self.get_info()
143             if self.info is not None:
144                 for child in self.info["children"]:
145                     self.children.append(child["id"])
146
147     @memoized
148     def get_tplink_name(self) -> Optional[str]:
149         self.info = self.get_info()
150         if self.info is not None:
151             return self.info["alias"]
152         return None
153
154     def get_cmdline(self, child: str = None) -> str:
155         cmd = (
156             f"{config.config['light_utils_tplink_location']} -m {self.mac} "
157             f"--no_logging_console "
158         )
159         if child is not None:
160             cmd += f"-x {child} "
161         return cmd
162
163     def get_children(self) -> List[str]:
164         return self.children
165
166     def command(
167         self, cmd: str, child: str = None, extra_args: str = None
168     ) -> bool:
169         cmd = self.get_cmdline(child) + f"-c {cmd}"
170         if extra_args is not None:
171             cmd += f" {extra_args}"
172         return tplink_light_command(cmd)
173
174     def turn_on(self, child: str = None) -> bool:
175         return self.command("on", child)
176
177     def turn_off(self, child: str = None) -> bool:
178         return self.command("off", child)
179
180     def make_color(self, color: str) -> bool:
181         raise NotImplementedError
182
183     @timeout(
184         10.0, use_signals=False, error_message="Timed out waiting for tplink.py"
185     )
186     def get_info(self) -> Optional[Dict]:
187         cmd = self.get_cmdline() + "-c info"
188         out = subprocess.getoutput(cmd)
189         out = re.sub("Sent:.*\n", "", out)
190         out = re.sub("Received: *", "", out)
191         try:
192             self.info = json.loads(out)["system"]["get_sysinfo"]
193             self.info_ts = datetime.datetime.now()
194             return self.info
195         except Exception as e:
196             logger.exception(e)
197             print(out, file=sys.stderr)
198             self.info = None
199             self.info_ts = None
200             return None
201
202     def get_on_duration_seconds(self, child: str = None) -> int:
203         self.info = self.get_info()
204         if child is None:
205             if self.info is None:
206                 return 0
207             return int(self.info.get("on_time", "0"))
208         else:
209             if self.info is None:
210                 return 0
211             for chi in self.info.get("children", {}):
212                 if chi["id"] == child:
213                     return int(chi.get("on_time", "0"))
214         return 0
215
216     def get_on_limit_seconds(self) -> Optional[int]:
217         for kw in self.kws:
218             m = re.search(r"timeout:(\d+)", kw)
219             if m is not None:
220                 return int(m.group(1)) * 60
221         return None
222
223     def set_dimmer_level(self, level: int) -> bool:
224         if not self.has_keyword("dimmer"):
225             return False
226         cmd = (
227             self.get_cmdline()
228             + f'-j \'{{"smartlife.iot.dimmer":{{"set_brightness":{{"brightness":{level} }} }} }}\''
229         )
230         return tplink_light_command(cmd)
231
232
233 class LightingConfig(object):
234     """Representation of the smart light device config."""
235
236     def __init__(
237             self,
238             config_file: str = None,
239     ) -> None:
240         if config_file is None:
241             config_file = config.config[
242                 'light_utils_network_mac_addresses_location'
243             ]
244         self.macs_by_name = {}
245         self._keywords_by_name = {}
246         self.keywords_by_mac = {}
247         self.names_by_mac = {}
248         self.corpus = logical_search.Corpus()
249         with open(config_file, "r") as f:
250             contents = f.readlines()
251         for line in contents:
252             line = line.rstrip("\n")
253             line = re.sub(r"#.*$", r"", line)
254             line = line.strip()
255             if line == "":
256                 continue
257             (mac, name, keywords) = line.split(",")
258             mac = mac.strip()
259             name = name.strip()
260             keywords = keywords.strip()
261             if "perm" not in keywords:
262                 continue
263             properties = [("name", name)]
264             tags = set()
265             for kw in keywords.split():
266                 if ":" in kw:
267                     key, value = kw.split(":")
268                     properties.append((key, value))
269                 else:
270                     tags.add(kw)
271             properties.append(("name", name))
272             self.macs_by_name[name] = mac
273             self._keywords_by_name[name] = keywords
274             self.keywords_by_mac[mac] = keywords
275             self.names_by_mac[mac] = name
276             self.corpus.add_doc(
277                 logical_search.Document(
278                     docid=mac,
279                     tags=tags,
280                     properties=properties,
281                     reference=None,
282                 )
283             )
284
285     def __repr__(self) -> str:
286         s = "Known devices:\n"
287         for name, keywords in self._keywords_by_name.items():
288             mac = self.macs_by_name[name]
289             s += f"  {name} ({mac}) => {keywords}\n"
290         return s
291
292     def get_keywords_by_name(self, name: str) -> Optional[str]:
293         return self._keywords_by_name.get(name, None)
294
295     def get_macs_by_name(self, name: str) -> Set[str]:
296         retval = set()
297         for (mac, lname) in self.names_by_mac.items():
298             if name in lname:
299                 retval.add(mac)
300         return retval
301
302     def get_macs_by_keyword(self, keyword: str) -> Set[str]:
303         retval = set()
304         for (mac, keywords) in self.keywords_by_mac.items():
305             if keyword in keywords:
306                 retval.add(mac)
307         return retval
308
309     def get_light_by_name(self, name: str) -> Optional[Light]:
310         if name in self.macs_by_name:
311             return self.get_light_by_mac(self.macs_by_name[name])
312         return None
313
314     def get_all_lights(self) -> List[Light]:
315         retval = []
316         for (mac, kws) in self.keywords_by_mac.items():
317             if mac is not None:
318                 light = self.get_light_by_mac(mac)
319                 if light is not None:
320                     retval.append(light)
321         return retval
322
323     def get_light_by_mac(self, mac: str) -> Optional[Light]:
324         if mac in self.keywords_by_mac:
325             name = self.names_by_mac[mac]
326             kws = self.keywords_by_mac[mac]
327             if "tplink" in kws.lower():
328                 return TPLinkLight(name, mac, kws)
329             else:
330                 return GoogleLight(name, mac, kws)
331         return None
332
333     def query(self, query: str) -> List[Light]:
334         """Evaluates a lighting query expression formed of keywords to search
335         for, logical operators (and, or, not), and parenthesis.
336         Returns a list of matching lights.
337         """
338         retval = []
339         results = self.corpus.query(query)
340         if results is not None:
341             for mac in results:
342                 if mac is not None:
343                     light = self.get_light_by_mac(mac)
344                     if light is not None:
345                         retval.append(light)
346         return retval