A bunch of changes...
[python_utils.git] / smart_home / outlets.py
1 #!/usr/bin/env python3
2
3 """Utilities for dealing with the smart outlets."""
4
5 from abc import abstractmethod
6 import asyncio
7 import atexit
8 import datetime
9 import json
10 import logging
11 import os
12 import re
13 import subprocess
14 import sys
15 from typing import Any, Dict, List, Optional
16
17 from meross_iot.http_api import MerossHttpClient
18 from meross_iot.manager import MerossManager
19
20 import argparse_utils
21 import config
22 import decorator_utils
23 import logging_utils
24 import scott_secrets
25 import smart_home.device as dev
26 from google_assistant import ask_google, GoogleResponse
27 from decorator_utils import timeout, memoized
28
29 logger = logging.getLogger(__name__)
30
31 parser = config.add_commandline_args(
32     f"Outlet Utils ({__file__})",
33     "Args related to smart outlets.",
34 )
35 parser.add_argument(
36     '--smart_outlets_tplink_location',
37     default='/home/scott/bin/tplink.py',
38     metavar='FILENAME',
39     help='The location of the tplink.py helper',
40     type=argparse_utils.valid_filename,
41 )
42
43
44 @timeout(
45     5.0, use_signals=False, error_message="Timed out waiting for tplink.py"
46 )
47 def tplink_outlet_command(command: str) -> bool:
48     result = os.system(command)
49     signal = result & 0xFF
50     if signal != 0:
51         logger.warning(f'{command} died with signal {signal}')
52         logging_utils.hlog("%s died with signal %d" % (command, signal))
53         return False
54     else:
55         exit_value = result >> 8
56         if exit_value != 0:
57             logger.warning(f'{command} failed, exited {exit_value}')
58             logging_utils.hlog("%s failed, exit %d" % (command, exit_value))
59             return False
60     logger.debug(f'{command} succeeded.')
61     return True
62
63
64 class BaseOutlet(dev.Device):
65     def __init__(self, name: str, mac: str, keywords: str = "") -> None:
66         super().__init__(name.strip(), mac.strip(), keywords)
67         self.info = None
68
69     @abstractmethod
70     def turn_on(self) -> bool:
71         pass
72
73     @abstractmethod
74     def turn_off(self) -> bool:
75         pass
76
77     @abstractmethod
78     def is_on(self) -> bool:
79         pass
80
81     @abstractmethod
82     def is_off(self) -> bool:
83         pass
84
85
86 class TPLinkOutlet(BaseOutlet):
87     def __init__(self, name: str, mac: str, keywords: str = '') -> None:
88         super().__init__(name, mac, keywords)
89         self.info: Optional[Dict] = None
90         self.info_ts: Optional[datetime.datetime] = None
91
92     @memoized
93     def get_tplink_name(self) -> Optional[str]:
94         self.info = self.get_info()
95         if self.info is not None:
96             return self.info["alias"]
97         return None
98
99     def get_cmdline(self) -> str:
100         cmd = (
101             f"{config.config['smart_outlets_tplink_location']} -m {self.mac} "
102             f"--no_logging_console "
103         )
104         return cmd
105
106     def command(self, cmd: str, extra_args: str = None) -> bool:
107         cmd = self.get_cmdline() + f"-c {cmd}"
108         if extra_args is not None:
109             cmd += f" {extra_args}"
110         return tplink_outlet_command(cmd)
111
112     def turn_on(self) -> bool:
113         return self.command('on')
114
115     def turn_off(self) -> bool:
116         return self.command('off')
117
118     def is_on(self) -> bool:
119         return self.get_on_duration_seconds() > 0
120
121     def is_off(self) -> bool:
122         return not self.is_on()
123
124     @timeout(
125         10.0, use_signals=False, error_message="Timed out waiting for tplink.py"
126     )
127     def get_info(self) -> Optional[Dict]:
128         cmd = self.get_cmdline() + "-c info"
129         out = subprocess.getoutput(cmd)
130         out = re.sub("Sent:.*\n", "", out)
131         out = re.sub("Received: *", "", out)
132         try:
133             self.info = json.loads(out)["system"]["get_sysinfo"]
134             self.info_ts = datetime.datetime.now()
135             return self.info
136         except Exception as e:
137             logger.exception(e)
138             print(out, file=sys.stderr)
139             self.info = None
140             self.info_ts = None
141             return None
142
143     def get_on_duration_seconds(self) -> int:
144         self.info = self.get_info()
145         if self.info is None:
146             return 0
147         return int(self.info.get("on_time", "0"))
148
149
150 class TPLinkOutletWithChildren(TPLinkOutlet):
151     def __init__(self, name: str, mac: str, keywords: str = "") -> None:
152         super().__init__(name, mac, keywords)
153         self.children: List[str] = []
154         self.info: Optional[Dict] = None
155         self.info_ts: Optional[datetime.datetime] = None
156         assert "children" in self.keywords
157         self.info = self.get_info()
158         if self.info is not None:
159             for child in self.info["children"]:
160                 self.children.append(child["id"])
161
162     # override
163     def get_cmdline(self, child: Optional[str] = None) -> str:
164         cmd = (
165             f"{config.config['smart_outlets_tplink_location']} -m {self.mac} "
166             f"--no_logging_console "
167         )
168         if child is not None:
169             cmd += f"-x {child} "
170         return cmd
171
172     # override
173     def command(
174         self, cmd: str, child: str = None, extra_args: str = None
175     ) -> bool:
176         cmd = self.get_cmdline(child) + f"-c {cmd}"
177         if extra_args is not None:
178             cmd += f" {extra_args}"
179         logger.debug(f'About to execute {cmd}')
180         return tplink_outlet_command(cmd)
181
182     def get_children(self) -> List[str]:
183         return self.children
184
185     def turn_on(self, child: str = None) -> bool:
186         return self.command("on", child)
187
188     def turn_off(self, child: str = None) -> bool:
189         return self.command("off", child)
190
191     def get_on_duration_seconds(self, child: str = None) -> int:
192         self.info = self.get_info()
193         if child is None:
194             if self.info is None:
195                 return 0
196             return int(self.info.get("on_time", "0"))
197         else:
198             if self.info is None:
199                 return 0
200             for chi in self.info.get("children", {}):
201                 if chi["id"] == child:
202                     return int(chi.get("on_time", "0"))
203         return 0
204
205
206 class GoogleOutlet(BaseOutlet):
207     def __init__(self, name: str, mac: str, keywords: str = "") -> None:
208         super().__init__(name.strip(), mac.strip(), keywords)
209         self.info = None
210
211     def goog_name(self) -> str:
212         name = self.get_name()
213         return name.replace('_', ' ')
214
215     @staticmethod
216     def parse_google_response(response: GoogleResponse) -> bool:
217         return response.success
218
219     def turn_on(self) -> bool:
220         return GoogleOutlet.parse_google_response(
221             ask_google('turn {self.goog_name()} on')
222         )
223
224     def turn_off(self) -> bool:
225         return GoogleOutlet.parse_google_response(
226             ask_google('turn {self.goog_name()} off')
227         )
228
229     def is_on(self) -> bool:
230         r = ask_google(f'is {self.goog_name()} on?')
231         if not r.success:
232             return False
233         return 'is on' in r.audio_transcription
234
235     def is_off(self) -> bool:
236         return not self.is_on()
237
238
239 @decorator_utils.singleton
240 class MerossWrapper(object):
241     """Note that instantiating this class causes HTTP traffic with an
242     external Meross server.  Meross blocks customers who hit their
243     servers too aggressively so MerossOutlet is lazy about creating
244     instances of this class.
245
246     """
247
248     def __init__(self):
249         self.loop = asyncio.get_event_loop()
250         self.email = os.environ.get('MEROSS_EMAIL') or scott_secrets.MEROSS_EMAIL
251         self.password = os.environ.get('MEROSS_PASSWORD') or scott_secrets.MEROSS_PASSWORD
252         self.devices = self.loop.run_until_complete(self.find_meross_devices())
253         atexit.register(self.loop.close)
254
255     async def find_meross_devices(self) -> List[Any]:
256         http_api_client = await MerossHttpClient.async_from_user_password(
257             email=self.email, password=self.password
258         )
259
260         # Setup and start the device manager
261         manager = MerossManager(http_client=http_api_client)
262         await manager.async_init()
263
264         # Discover devices
265         await manager.async_device_discovery()
266         devices = manager.find_devices()
267         for device in devices:
268             await device.async_update()
269         return devices
270
271     def get_meross_device_by_name(self, name: str) -> Optional[Any]:
272         name = name.lower()
273         name = name.replace('_', ' ')
274         for device in self.devices:
275             if device.name.lower() == name:
276                 return device
277         return None
278
279
280 class MerossOutlet(BaseOutlet):
281     def __init__(self, name: str, mac: str, keywords: str = '') -> None:
282         super().__init__(name, mac, keywords)
283         self.meross_wrapper = None
284         self.device = None
285
286     def lazy_initialize_device(self):
287         """If we make too many calls to Meross they will block us; only talk
288         to them when someone actually wants to control a device."""
289         if self.meross_wrapper is None:
290             self.meross_wrapper = MerossWrapper()
291             self.device = self.meross_wrapper.get_meross_device_by_name(self.name)
292             if self.device is None:
293                 raise Exception(f'{self.name} is not a known Meross device?!')
294
295     def turn_on(self) -> bool:
296         self.lazy_initialize_device()
297         self.meross_wrapper.loop.run_until_complete(
298             self.device.async_turn_on()
299         )
300         return True
301
302     def turn_off(self) -> bool:
303         self.lazy_initialize_device()
304         self.meross_wrapper.loop.run_until_complete(
305             self.device.async_turn_off()
306         )
307         return True
308
309     def is_on(self) -> bool:
310         self.lazy_initialize_device()
311         return self.device.is_on()
312
313     def is_off(self) -> bool:
314         return not self.is_on()