Change settings in flake8 and black.
[python_utils.git] / smart_home / outlets.py
1 #!/usr/bin/env python3
2
3 """Utilities for dealing with the smart outlets."""
4
5 import asyncio
6 import atexit
7 import datetime
8 import json
9 import logging
10 import os
11 import re
12 import subprocess
13 import sys
14 from abc import abstractmethod
15 from typing import Any, Dict, List, Optional
16
17 from meross_iot.http_api import MerossHttpClient
18 from meross_iot.manager import MerossManager
19 from overrides import overrides
20
21 import argparse_utils
22 import config
23 import decorator_utils
24 import logging_utils
25 import scott_secrets
26 import smart_home.device as dev
27 from decorator_utils import memoized, timeout
28 from google_assistant import GoogleResponse, ask_google
29
30 logger = logging.getLogger(__name__)
31
32 parser = config.add_commandline_args(
33     f"Outlet Utils ({__file__})",
34     "Args related to smart outlets.",
35 )
36 parser.add_argument(
37     '--smart_outlets_tplink_location',
38     default='/home/scott/bin/tplink.py',
39     metavar='FILENAME',
40     help='The location of the tplink.py helper',
41     type=argparse_utils.valid_filename,
42 )
43
44
45 @timeout(5.0, use_signals=False, error_message="Timed out waiting for tplink.py")
46 def tplink_outlet_command(command: str) -> bool:
47     result = os.system(command)
48     signal = result & 0xFF
49     if signal != 0:
50         msg = f'{command} died with signal {signal}'
51         logger.warning(msg)
52         logging_utils.hlog(msg)
53         return False
54     else:
55         exit_value = result >> 8
56         if exit_value != 0:
57             msg = f'{command} failed, exited {exit_value}'
58             logger.warning(msg)
59             logging_utils.hlog(msg)
60             return False
61     logger.debug(f'{command} succeeded.')
62     return True
63
64
65 class BaseOutlet(dev.Device):
66     def __init__(self, name: str, mac: str, keywords: str = "") -> None:
67         super().__init__(name.strip(), mac.strip(), keywords)
68
69     @abstractmethod
70     def turn_on(self) -> bool:
71         pass
72
73     @abstractmethod
74     def turn_off(self) -> bool:
75         pass
76
77     @abstractmethod
78     def is_on(self) -> bool:
79         pass
80
81     @abstractmethod
82     def is_off(self) -> bool:
83         pass
84
85
86 class TPLinkOutlet(BaseOutlet):
87     def __init__(self, name: str, mac: str, keywords: str = '') -> None:
88         super().__init__(name, mac, keywords)
89         self.info: Optional[Dict] = None
90         self.info_ts: Optional[datetime.datetime] = None
91
92     @memoized
93     def get_tplink_name(self) -> Optional[str]:
94         self.info = self.get_info()
95         if self.info is not None:
96             return self.info["alias"]
97         return None
98
99     def get_cmdline(self) -> str:
100         cmd = (
101             f"{config.config['smart_outlets_tplink_location']} -m {self.mac} "
102             f"--no_logging_console "
103         )
104         return cmd
105
106     def command(self, cmd: str, extra_args: str = None, **kwargs) -> bool:
107         cmd = self.get_cmdline() + f"-c {cmd}"
108         if extra_args is not None:
109             cmd += f" {extra_args}"
110         return tplink_outlet_command(cmd)
111
112     @overrides
113     def turn_on(self) -> bool:
114         return self.command('on')
115
116     @overrides
117     def turn_off(self) -> bool:
118         return self.command('off')
119
120     @overrides
121     def is_on(self) -> bool:
122         return self.get_on_duration_seconds() > 0
123
124     @overrides
125     def is_off(self) -> bool:
126         return not self.is_on()
127
128     @timeout(10.0, use_signals=False, error_message="Timed out waiting for tplink.py")
129     def get_info(self) -> Optional[Dict]:
130         cmd = self.get_cmdline() + "-c info"
131         out = subprocess.getoutput(cmd)
132         out = re.sub("Sent:.*\n", "", out)
133         out = re.sub("Received: *", "", out)
134         try:
135             self.info = json.loads(out)["system"]["get_sysinfo"]
136             self.info_ts = datetime.datetime.now()
137             return self.info
138         except Exception as e:
139             logger.exception(e)
140             print(out, file=sys.stderr)
141             self.info = None
142             self.info_ts = None
143             return None
144
145     def get_on_duration_seconds(self) -> int:
146         self.info = self.get_info()
147         if self.info is None:
148             return 0
149         return int(self.info.get("on_time", "0"))
150
151
152 class TPLinkOutletWithChildren(TPLinkOutlet):
153     def __init__(self, name: str, mac: str, keywords: str = "") -> None:
154         super().__init__(name, mac, keywords)
155         self.children: List[str] = []
156         self.info: Optional[Dict] = None
157         self.info_ts: Optional[datetime.datetime] = None
158         assert "children" in self.keywords
159         self.info = self.get_info()
160         if self.info is not None:
161             for child in self.info["children"]:
162                 self.children.append(child["id"])
163
164     @overrides
165     def get_cmdline(self, child: Optional[str] = None) -> str:
166         cmd = (
167             f"{config.config['smart_outlets_tplink_location']} -m {self.mac} "
168             f"--no_logging_console "
169         )
170         if child is not None:
171             cmd += f"-x {child} "
172         return cmd
173
174     @overrides
175     def command(self, cmd: str, extra_args: str = None, **kwargs) -> bool:
176         child: Optional[str] = kwargs.get('child', None)
177         cmd = self.get_cmdline(child) + f"-c {cmd}"
178         if extra_args is not None:
179             cmd += f" {extra_args}"
180         logger.debug(f'About to execute {cmd}')
181         return tplink_outlet_command(cmd)
182
183     def get_children(self) -> List[str]:
184         return self.children
185
186     @overrides
187     def turn_on(self, child: str = None) -> bool:
188         return self.command("on", child)
189
190     @overrides
191     def turn_off(self, child: str = None) -> bool:
192         return self.command("off", child)
193
194     def get_on_duration_seconds(self, child: str = None) -> int:
195         self.info = self.get_info()
196         if child is None:
197             if self.info is None:
198                 return 0
199             return int(self.info.get("on_time", "0"))
200         else:
201             if self.info is None:
202                 return 0
203             for chi in self.info.get("children", {}):
204                 if chi["id"] == child:
205                     return int(chi.get("on_time", "0"))
206         return 0
207
208
209 class GoogleOutlet(BaseOutlet):
210     def __init__(self, name: str, mac: str, keywords: str = "") -> None:
211         super().__init__(name.strip(), mac.strip(), keywords)
212         self.info = None
213
214     def goog_name(self) -> str:
215         name = self.get_name()
216         return name.replace('_', ' ')
217
218     @staticmethod
219     def parse_google_response(response: GoogleResponse) -> bool:
220         return response.success
221
222     @overrides
223     def turn_on(self) -> bool:
224         return GoogleOutlet.parse_google_response(ask_google(f'turn {self.goog_name()} on'))
225
226     @overrides
227     def turn_off(self) -> bool:
228         return GoogleOutlet.parse_google_response(ask_google(f'turn {self.goog_name()} off'))
229
230     @overrides
231     def is_on(self) -> bool:
232         r = ask_google(f'is {self.goog_name()} on?')
233         if not r.success:
234             return False
235         if r.audio_transcription is not None:
236             return 'is on' in r.audio_transcription
237         raise Exception('Can\'t talk to Google right now!?')
238
239     @overrides
240     def is_off(self) -> bool:
241         return not self.is_on()
242
243
244 @decorator_utils.singleton
245 class MerossWrapper(object):
246     """Global singleton helper class for MerossOutlets.  Note that
247     instantiating this class causes HTTP traffic with an external
248     Meross server.  Meross blocks customers who hit their servers too
249     aggressively so MerossOutlet is lazy about creating instances of
250     this class.
251
252     """
253
254     def __init__(self):
255         self.loop = asyncio.get_event_loop()
256         self.email = os.environ.get('MEROSS_EMAIL') or scott_secrets.MEROSS_EMAIL
257         self.password = os.environ.get('MEROSS_PASSWORD') or scott_secrets.MEROSS_PASSWORD
258         self.devices = self.loop.run_until_complete(self.find_meross_devices())
259         atexit.register(self.loop.close)
260
261     async def find_meross_devices(self) -> List[Any]:
262         http_api_client = await MerossHttpClient.async_from_user_password(
263             email=self.email, password=self.password
264         )
265
266         # Setup and start the device manager
267         manager = MerossManager(http_client=http_api_client)
268         await manager.async_init()
269
270         # Discover devices
271         await manager.async_device_discovery()
272         devices = manager.find_devices()
273         for device in devices:
274             await device.async_update()
275         return devices
276
277     def get_meross_device_by_name(self, name: str) -> Optional[Any]:
278         name = name.lower()
279         name = name.replace('_', ' ')
280         for device in self.devices:
281             if device.name.lower() == name:
282                 return device
283         return None
284
285
286 class MerossOutlet(BaseOutlet):
287     def __init__(self, name: str, mac: str, keywords: str = '') -> None:
288         super().__init__(name, mac, keywords)
289         self.meross_wrapper: Optional[MerossWrapper] = None
290         self.device: Optional[Any] = None
291
292     def lazy_initialize_device(self):
293         """If we make too many calls to Meross they will block us; only talk
294         to them when someone actually wants to control a device."""
295         if self.meross_wrapper is None:
296             self.meross_wrapper = MerossWrapper()
297             self.device = self.meross_wrapper.get_meross_device_by_name(self.name)
298             if self.device is None:
299                 raise Exception(f'{self.name} is not a known Meross device?!')
300
301     @overrides
302     def turn_on(self) -> bool:
303         self.lazy_initialize_device()
304         assert self.meross_wrapper is not None
305         assert self.device is not None
306         self.meross_wrapper.loop.run_until_complete(self.device.async_turn_on())
307         return True
308
309     @overrides
310     def turn_off(self) -> bool:
311         self.lazy_initialize_device()
312         assert self.meross_wrapper is not None
313         assert self.device is not None
314         self.meross_wrapper.loop.run_until_complete(self.device.async_turn_off())
315         return True
316
317     @overrides
318     def is_on(self) -> bool:
319         self.lazy_initialize_device()
320         assert self.device is not None
321         return self.device.is_on()
322
323     @overrides
324     def is_off(self) -> bool:
325         return not self.is_on()