Let's be explicit with asserts; there was a bug in histogram
[python_utils.git] / smart_home / outlets.py
1 #!/usr/bin/env python3
2
3 """Utilities for dealing with the smart outlets."""
4
5 import asyncio
6 import atexit
7 import datetime
8 import json
9 import logging
10 import os
11 import re
12 import subprocess
13 import sys
14 from abc import abstractmethod
15 from typing import Any, Dict, List, Optional
16
17 from meross_iot.http_api import MerossHttpClient
18 from meross_iot.manager import MerossManager
19 from overrides import overrides
20
21 import argparse_utils
22 import config
23 import decorator_utils
24 import logging_utils
25 import scott_secrets
26 import smart_home.device as dev
27 from decorator_utils import memoized, timeout
28 from google_assistant import GoogleResponse, ask_google
29
30 logger = logging.getLogger(__name__)
31
32 parser = config.add_commandline_args(
33     f"Outlet Utils ({__file__})",
34     "Args related to smart outlets.",
35 )
36 parser.add_argument(
37     '--smart_outlets_tplink_location',
38     default='/home/scott/bin/tplink.py',
39     metavar='FILENAME',
40     help='The location of the tplink.py helper',
41     type=argparse_utils.valid_filename,
42 )
43
44
45 @timeout(5.0, use_signals=False, error_message="Timed out waiting for tplink.py")
46 def tplink_outlet_command(command: str) -> bool:
47     result = os.system(command)
48     signal = result & 0xFF
49     if signal != 0:
50         msg = f'{command} died with signal {signal}'
51         logger.warning(msg)
52         logging_utils.hlog(msg)
53         return False
54     else:
55         exit_value = result >> 8
56         if exit_value != 0:
57             msg = f'{command} failed, exited {exit_value}'
58             logger.warning(msg)
59             logging_utils.hlog(msg)
60             return False
61     logger.debug(f'{command} succeeded.')
62     return True
63
64
65 class BaseOutlet(dev.Device):
66     def __init__(self, name: str, mac: str, keywords: str = "") -> None:
67         super().__init__(name.strip(), mac.strip(), keywords)
68
69     @abstractmethod
70     def turn_on(self) -> bool:
71         pass
72
73     @abstractmethod
74     def turn_off(self) -> bool:
75         pass
76
77     @abstractmethod
78     def is_on(self) -> bool:
79         pass
80
81     @abstractmethod
82     def is_off(self) -> bool:
83         pass
84
85
86 class TPLinkOutlet(BaseOutlet):
87     def __init__(self, name: str, mac: str, keywords: str = '') -> None:
88         super().__init__(name, mac, keywords)
89         self.info: Optional[Dict] = None
90         self.info_ts: Optional[datetime.datetime] = None
91
92     @memoized
93     def get_tplink_name(self) -> Optional[str]:
94         self.info = self.get_info()
95         if self.info is not None:
96             return self.info["alias"]
97         return None
98
99     def get_cmdline(self) -> str:
100         cmd = (
101             f"{config.config['smart_outlets_tplink_location']} -m {self.mac} "
102             f"--no_logging_console "
103         )
104         return cmd
105
106     def command(self, cmd: str, extra_args: str = None, **kwargs) -> bool:
107         cmd = self.get_cmdline() + f"-c {cmd}"
108         if extra_args is not None:
109             cmd += f" {extra_args}"
110         return tplink_outlet_command(cmd)
111
112     @overrides
113     def turn_on(self) -> bool:
114         return self.command('on')
115
116     @overrides
117     def turn_off(self) -> bool:
118         return self.command('off')
119
120     @overrides
121     def is_on(self) -> bool:
122         return self.get_on_duration_seconds() > 0
123
124     @overrides
125     def is_off(self) -> bool:
126         return not self.is_on()
127
128     @timeout(10.0, use_signals=False, error_message="Timed out waiting for tplink.py")
129     def get_info(self) -> Optional[Dict]:
130         cmd = self.get_cmdline() + "-c info"
131         out = subprocess.getoutput(cmd)
132         out = re.sub("Sent:.*\n", "", out)
133         out = re.sub("Received: *", "", out)
134         try:
135             self.info = json.loads(out)["system"]["get_sysinfo"]
136             self.info_ts = datetime.datetime.now()
137             return self.info
138         except Exception as e:
139             logger.exception(e)
140             print(out, file=sys.stderr)
141             self.info = None
142             self.info_ts = None
143             return None
144
145     def get_on_duration_seconds(self) -> int:
146         self.info = self.get_info()
147         if self.info is None:
148             return 0
149         return int(self.info.get("on_time", "0"))
150
151
152 class TPLinkOutletWithChildren(TPLinkOutlet):
153     def __init__(self, name: str, mac: str, keywords: str = "") -> None:
154         super().__init__(name, mac, keywords)
155         self.children: List[str] = []
156         self.info: Optional[Dict] = None
157         self.info_ts: Optional[datetime.datetime] = None
158         assert "children" in self.keywords
159         self.info = self.get_info()
160         if self.info is not None:
161             for child in self.info["children"]:
162                 self.children.append(child["id"])
163
164     @overrides
165     def get_cmdline(self, child: Optional[str] = None) -> str:
166         cmd = (
167             f"{config.config['smart_outlets_tplink_location']} -m {self.mac} "
168             f"--no_logging_console "
169         )
170         if child is not None:
171             cmd += f"-x {child} "
172         return cmd
173
174     @overrides
175     def command(self, cmd: str, extra_args: str = None, **kwargs) -> bool:
176         child: Optional[str] = kwargs.get('child', None)
177         cmd = self.get_cmdline(child) + f"-c {cmd}"
178         if extra_args is not None:
179             cmd += f" {extra_args}"
180         logger.debug(f'About to execute {cmd}')
181         return tplink_outlet_command(cmd)
182
183     def get_children(self) -> List[str]:
184         return self.children
185
186     @overrides
187     def turn_on(self, child: str = None) -> bool:
188         return self.command("on", child)
189
190     @overrides
191     def turn_off(self, child: str = None) -> bool:
192         return self.command("off", child)
193
194     def get_on_duration_seconds(self, child: str = None) -> int:
195         self.info = self.get_info()
196         if child is None:
197             if self.info is None:
198                 return 0
199             return int(self.info.get("on_time", "0"))
200         else:
201             if self.info is None:
202                 return 0
203             for chi in self.info.get("children", {}):
204                 if chi["id"] == child:
205                     return int(chi.get("on_time", "0"))
206         return 0
207
208
209 class GoogleOutlet(BaseOutlet):
210     def __init__(self, name: str, mac: str, keywords: str = "") -> None:
211         super().__init__(name.strip(), mac.strip(), keywords)
212         self.info = None
213
214     def goog_name(self) -> str:
215         name = self.get_name()
216         return name.replace('_', ' ')
217
218     @staticmethod
219     def parse_google_response(response: GoogleResponse) -> bool:
220         return response.success
221
222     @overrides
223     def turn_on(self) -> bool:
224         return GoogleOutlet.parse_google_response(
225             ask_google(f'turn {self.goog_name()} on')
226         )
227
228     @overrides
229     def turn_off(self) -> bool:
230         return GoogleOutlet.parse_google_response(
231             ask_google(f'turn {self.goog_name()} off')
232         )
233
234     @overrides
235     def is_on(self) -> bool:
236         r = ask_google(f'is {self.goog_name()} on?')
237         if not r.success:
238             return False
239         if r.audio_transcription is not None:
240             return 'is on' in r.audio_transcription
241         raise Exception('Can\'t talk to Google right now!?')
242
243     @overrides
244     def is_off(self) -> bool:
245         return not self.is_on()
246
247
248 @decorator_utils.singleton
249 class MerossWrapper(object):
250     """Global singleton helper class for MerossOutlets.  Note that
251     instantiating this class causes HTTP traffic with an external
252     Meross server.  Meross blocks customers who hit their servers too
253     aggressively so MerossOutlet is lazy about creating instances of
254     this class.
255
256     """
257
258     def __init__(self):
259         self.loop = asyncio.get_event_loop()
260         self.email = os.environ.get('MEROSS_EMAIL') or scott_secrets.MEROSS_EMAIL
261         self.password = (
262             os.environ.get('MEROSS_PASSWORD') or scott_secrets.MEROSS_PASSWORD
263         )
264         self.devices = self.loop.run_until_complete(self.find_meross_devices())
265         atexit.register(self.loop.close)
266
267     async def find_meross_devices(self) -> List[Any]:
268         http_api_client = await MerossHttpClient.async_from_user_password(
269             email=self.email, password=self.password
270         )
271
272         # Setup and start the device manager
273         manager = MerossManager(http_client=http_api_client)
274         await manager.async_init()
275
276         # Discover devices
277         await manager.async_device_discovery()
278         devices = manager.find_devices()
279         for device in devices:
280             await device.async_update()
281         return devices
282
283     def get_meross_device_by_name(self, name: str) -> Optional[Any]:
284         name = name.lower()
285         name = name.replace('_', ' ')
286         for device in self.devices:
287             if device.name.lower() == name:
288                 return device
289         return None
290
291
292 class MerossOutlet(BaseOutlet):
293     def __init__(self, name: str, mac: str, keywords: str = '') -> None:
294         super().__init__(name, mac, keywords)
295         self.meross_wrapper: Optional[MerossWrapper] = None
296         self.device: Optional[Any] = None
297
298     def lazy_initialize_device(self):
299         """If we make too many calls to Meross they will block us; only talk
300         to them when someone actually wants to control a device."""
301         if self.meross_wrapper is None:
302             self.meross_wrapper = MerossWrapper()
303             self.device = self.meross_wrapper.get_meross_device_by_name(self.name)
304             if self.device is None:
305                 raise Exception(f'{self.name} is not a known Meross device?!')
306
307     @overrides
308     def turn_on(self) -> bool:
309         self.lazy_initialize_device()
310         assert self.meross_wrapper is not None
311         assert self.device is not None
312         self.meross_wrapper.loop.run_until_complete(self.device.async_turn_on())
313         return True
314
315     @overrides
316     def turn_off(self) -> bool:
317         self.lazy_initialize_device()
318         assert self.meross_wrapper is not None
319         assert self.device is not None
320         self.meross_wrapper.loop.run_until_complete(self.device.async_turn_off())
321         return True
322
323     @overrides
324     def is_on(self) -> bool:
325         self.lazy_initialize_device()
326         assert self.device is not None
327         return self.device.is_on()
328
329     @overrides
330     def is_off(self) -> bool:
331         return not self.is_on()