More cleanup.
[python_utils.git] / smart_home / outlets.py
1 #!/usr/bin/env python3
2
3 """Utilities for dealing with the smart outlets."""
4
5 import asyncio
6 import atexit
7 import datetime
8 import json
9 import logging
10 import os
11 import re
12 import subprocess
13 import sys
14 from abc import abstractmethod
15 from typing import Any, Dict, List, Optional
16
17 from meross_iot.http_api import MerossHttpClient
18 from meross_iot.manager import MerossManager
19 from overrides import overrides
20
21 import argparse_utils
22 import config
23 import decorator_utils
24 import logging_utils
25 import scott_secrets
26 import smart_home.device as dev
27 from decorator_utils import memoized, timeout
28 from google_assistant import GoogleResponse, ask_google
29
30 logger = logging.getLogger(__name__)
31
32 parser = config.add_commandline_args(
33     f"Outlet Utils ({__file__})",
34     "Args related to smart outlets.",
35 )
36 parser.add_argument(
37     '--smart_outlets_tplink_location',
38     default='/home/scott/bin/tplink.py',
39     metavar='FILENAME',
40     help='The location of the tplink.py helper',
41     type=argparse_utils.valid_filename,
42 )
43
44
45 @timeout(5.0, use_signals=False, error_message="Timed out waiting for tplink.py")
46 def tplink_outlet_command(command: str) -> bool:
47     result = os.system(command)
48     signal = result & 0xFF
49     if signal != 0:
50         msg = f'{command} died with signal {signal}'
51         logger.warning(msg)
52         logging_utils.hlog(msg)
53         return False
54     else:
55         exit_value = result >> 8
56         if exit_value != 0:
57             msg = f'{command} failed, exited {exit_value}'
58             logger.warning(msg)
59             logging_utils.hlog(msg)
60             return False
61     logger.debug(f'{command} succeeded.')
62     return True
63
64
65 class BaseOutlet(dev.Device):
66     def __init__(self, name: str, mac: str, keywords: str = "") -> None:
67         super().__init__(name.strip(), mac.strip(), keywords)
68
69     @abstractmethod
70     def turn_on(self) -> bool:
71         pass
72
73     @abstractmethod
74     def turn_off(self) -> bool:
75         pass
76
77     @abstractmethod
78     def is_on(self) -> bool:
79         pass
80
81     @abstractmethod
82     def is_off(self) -> bool:
83         pass
84
85
86 class TPLinkOutlet(BaseOutlet):
87     def __init__(self, name: str, mac: str, keywords: str = '') -> None:
88         super().__init__(name, mac, keywords)
89         self.info: Optional[Dict] = None
90         self.info_ts: Optional[datetime.datetime] = None
91
92     @memoized
93     def get_tplink_name(self) -> Optional[str]:
94         self.info = self.get_info()
95         if self.info is not None:
96             return self.info["alias"]
97         return None
98
99     def get_cmdline(self) -> str:
100         cmd = (
101             f"{config.config['smart_outlets_tplink_location']} -m {self.mac} "
102             f"--no_logging_console "
103         )
104         return cmd
105
106     def command(self, cmd: str, extra_args: str = None, **kwargs) -> bool:
107         cmd = self.get_cmdline() + f"-c {cmd}"
108         if extra_args is not None:
109             cmd += f" {extra_args}"
110         return tplink_outlet_command(cmd)
111
112     @overrides
113     def turn_on(self) -> bool:
114         return self.command('on')
115
116     @overrides
117     def turn_off(self) -> bool:
118         return self.command('off')
119
120     @overrides
121     def is_on(self) -> bool:
122         return self.get_on_duration_seconds() > 0
123
124     @overrides
125     def is_off(self) -> bool:
126         return not self.is_on()
127
128     @timeout(10.0, use_signals=False, error_message="Timed out waiting for tplink.py")
129     def get_info(self) -> Optional[Dict]:
130         cmd = self.get_cmdline() + "-c info"
131         out = subprocess.getoutput(cmd)
132         out = re.sub("Sent:.*\n", "", out)
133         out = re.sub("Received: *", "", out)
134         try:
135             self.info = json.loads(out)["system"]["get_sysinfo"]
136             self.info_ts = datetime.datetime.now()
137             return self.info
138         except Exception as e:
139             logger.exception(e)
140             print(out, file=sys.stderr)
141             self.info = None
142             self.info_ts = None
143             return None
144
145     def get_on_duration_seconds(self) -> int:
146         self.info = self.get_info()
147         if self.info is None:
148             return 0
149         return int(self.info.get("on_time", "0"))
150
151
152 class TPLinkOutletWithChildren(TPLinkOutlet):
153     def __init__(self, name: str, mac: str, keywords: str = "") -> None:
154         super().__init__(name, mac, keywords)
155         self.children: List[str] = []
156         self.info: Optional[Dict] = None
157         self.info_ts: Optional[datetime.datetime] = None
158         assert self.keywords is not None
159         assert "children" in self.keywords
160         self.info = self.get_info()
161         if self.info is not None:
162             for child in self.info["children"]:
163                 self.children.append(child["id"])
164
165     @overrides
166     def get_cmdline(self, child: Optional[str] = None) -> str:
167         cmd = (
168             f"{config.config['smart_outlets_tplink_location']} -m {self.mac} "
169             f"--no_logging_console "
170         )
171         if child is not None:
172             cmd += f"-x {child} "
173         return cmd
174
175     @overrides
176     def command(self, cmd: str, extra_args: str = None, **kwargs) -> bool:
177         child: Optional[str] = kwargs.get('child', None)
178         cmd = self.get_cmdline(child) + f"-c {cmd}"
179         if extra_args is not None:
180             cmd += f" {extra_args}"
181         logger.debug(f'About to execute {cmd}')
182         return tplink_outlet_command(cmd)
183
184     def get_children(self) -> List[str]:
185         return self.children
186
187     @overrides
188     def turn_on(self, child: str = None) -> bool:
189         return self.command("on", child)
190
191     @overrides
192     def turn_off(self, child: str = None) -> bool:
193         return self.command("off", child)
194
195     def get_on_duration_seconds(self, child: str = None) -> int:
196         self.info = self.get_info()
197         if child is None:
198             if self.info is None:
199                 return 0
200             return int(self.info.get("on_time", "0"))
201         else:
202             if self.info is None:
203                 return 0
204             for chi in self.info.get("children", {}):
205                 if chi["id"] == child:
206                     return int(chi.get("on_time", "0"))
207         return 0
208
209
210 class GoogleOutlet(BaseOutlet):
211     def __init__(self, name: str, mac: str, keywords: str = "") -> None:
212         super().__init__(name.strip(), mac.strip(), keywords)
213         self.info = None
214
215     def goog_name(self) -> str:
216         name = self.get_name()
217         return name.replace('_', ' ')
218
219     @staticmethod
220     def parse_google_response(response: GoogleResponse) -> bool:
221         return response.success
222
223     @overrides
224     def turn_on(self) -> bool:
225         return GoogleOutlet.parse_google_response(ask_google(f'turn {self.goog_name()} on'))
226
227     @overrides
228     def turn_off(self) -> bool:
229         return GoogleOutlet.parse_google_response(ask_google(f'turn {self.goog_name()} off'))
230
231     @overrides
232     def is_on(self) -> bool:
233         r = ask_google(f'is {self.goog_name()} on?')
234         if not r.success:
235             return False
236         if r.audio_transcription is not None:
237             return 'is on' in r.audio_transcription
238         raise Exception('Can\'t talk to Google right now!?')
239
240     @overrides
241     def is_off(self) -> bool:
242         return not self.is_on()
243
244
245 @decorator_utils.singleton
246 class MerossWrapper(object):
247     """Global singleton helper class for MerossOutlets.  Note that
248     instantiating this class causes HTTP traffic with an external
249     Meross server.  Meross blocks customers who hit their servers too
250     aggressively so MerossOutlet is lazy about creating instances of
251     this class.
252
253     """
254
255     def __init__(self):
256         self.loop = asyncio.get_event_loop()
257         self.email = os.environ.get('MEROSS_EMAIL') or scott_secrets.MEROSS_EMAIL
258         self.password = os.environ.get('MEROSS_PASSWORD') or scott_secrets.MEROSS_PASSWORD
259         self.devices = self.loop.run_until_complete(self.find_meross_devices())
260         atexit.register(self.loop.close)
261
262     async def find_meross_devices(self) -> List[Any]:
263         http_api_client = await MerossHttpClient.async_from_user_password(
264             email=self.email, password=self.password
265         )
266
267         # Setup and start the device manager
268         manager = MerossManager(http_client=http_api_client)
269         await manager.async_init()
270
271         # Discover devices
272         await manager.async_device_discovery()
273         devices = manager.find_devices()
274         for device in devices:
275             await device.async_update()
276         return devices
277
278     def get_meross_device_by_name(self, name: str) -> Optional[Any]:
279         name = name.lower()
280         name = name.replace('_', ' ')
281         for device in self.devices:
282             if device.name.lower() == name:
283                 return device
284         return None
285
286
287 class MerossOutlet(BaseOutlet):
288     def __init__(self, name: str, mac: str, keywords: str = '') -> None:
289         super().__init__(name, mac, keywords)
290         self.meross_wrapper: Optional[MerossWrapper] = None
291         self.device: Optional[Any] = None
292
293     def lazy_initialize_device(self):
294         """If we make too many calls to Meross they will block us; only talk
295         to them when someone actually wants to control a device."""
296         if self.meross_wrapper is None:
297             self.meross_wrapper = MerossWrapper()
298             self.device = self.meross_wrapper.get_meross_device_by_name(self.name)
299             if self.device is None:
300                 raise Exception(f'{self.name} is not a known Meross device?!')
301
302     @overrides
303     def turn_on(self) -> bool:
304         self.lazy_initialize_device()
305         assert self.meross_wrapper is not None
306         assert self.device is not None
307         self.meross_wrapper.loop.run_until_complete(self.device.async_turn_on())
308         return True
309
310     @overrides
311     def turn_off(self) -> bool:
312         self.lazy_initialize_device()
313         assert self.meross_wrapper is not None
314         assert self.device is not None
315         self.meross_wrapper.loop.run_until_complete(self.device.async_turn_off())
316         return True
317
318     @overrides
319     def is_on(self) -> bool:
320         self.lazy_initialize_device()
321         assert self.device is not None
322         return self.device.is_on()
323
324     @overrides
325     def is_off(self) -> bool:
326         return not self.is_on()