Start using warnings from stdlib.
[python_utils.git] / smart_home / outlets.py
1 #!/usr/bin/env python3
2
3 """Utilities for dealing with the smart outlets."""
4
5 from abc import abstractmethod
6 import asyncio
7 import atexit
8 import datetime
9 import json
10 import logging
11 import os
12 import re
13 import subprocess
14 import sys
15 from typing import Any, Dict, List, Optional
16 import warnings
17
18 from meross_iot.http_api import MerossHttpClient
19 from meross_iot.manager import MerossManager
20
21 import argparse_utils
22 import config
23 import decorator_utils
24 import logging_utils
25 import scott_secrets
26 import smart_home.device as dev
27 from google_assistant import ask_google, GoogleResponse
28 from decorator_utils import timeout, memoized
29
30 logger = logging.getLogger(__name__)
31
32 parser = config.add_commandline_args(
33     f"Outlet Utils ({__file__})",
34     "Args related to smart outlets.",
35 )
36 parser.add_argument(
37     '--smart_outlets_tplink_location',
38     default='/home/scott/bin/tplink.py',
39     metavar='FILENAME',
40     help='The location of the tplink.py helper',
41     type=argparse_utils.valid_filename,
42 )
43
44
45 @timeout(
46     5.0, use_signals=False, error_message="Timed out waiting for tplink.py"
47 )
48 def tplink_outlet_command(command: str) -> bool:
49     result = os.system(command)
50     signal = result & 0xFF
51     if signal != 0:
52         msg = f'{command} died with signal {signal}'
53         logger.warning(msg)
54         warnings.warn(msg)
55         logging_utils.hlog(msg)
56         return False
57     else:
58         exit_value = result >> 8
59         if exit_value != 0:
60             msg = f'{command} failed, exited {exit_value}'
61             logger.warning(msg)
62             warnings.warn(msg)
63             logging_utils.hlog(msg)
64             return False
65     logger.debug(f'{command} succeeded.')
66     return True
67
68
69 class BaseOutlet(dev.Device):
70     def __init__(self, name: str, mac: str, keywords: str = "") -> None:
71         super().__init__(name.strip(), mac.strip(), keywords)
72         self.info = None
73
74     @abstractmethod
75     def turn_on(self) -> bool:
76         pass
77
78     @abstractmethod
79     def turn_off(self) -> bool:
80         pass
81
82     @abstractmethod
83     def is_on(self) -> bool:
84         pass
85
86     @abstractmethod
87     def is_off(self) -> bool:
88         pass
89
90
91 class TPLinkOutlet(BaseOutlet):
92     def __init__(self, name: str, mac: str, keywords: str = '') -> None:
93         super().__init__(name, mac, keywords)
94         self.info: Optional[Dict] = None
95         self.info_ts: Optional[datetime.datetime] = None
96
97     @memoized
98     def get_tplink_name(self) -> Optional[str]:
99         self.info = self.get_info()
100         if self.info is not None:
101             return self.info["alias"]
102         return None
103
104     def get_cmdline(self) -> str:
105         cmd = (
106             f"{config.config['smart_outlets_tplink_location']} -m {self.mac} "
107             f"--no_logging_console "
108         )
109         return cmd
110
111     def command(self, cmd: str, extra_args: str = None) -> bool:
112         cmd = self.get_cmdline() + f"-c {cmd}"
113         if extra_args is not None:
114             cmd += f" {extra_args}"
115         return tplink_outlet_command(cmd)
116
117     def turn_on(self) -> bool:
118         return self.command('on')
119
120     def turn_off(self) -> bool:
121         return self.command('off')
122
123     def is_on(self) -> bool:
124         return self.get_on_duration_seconds() > 0
125
126     def is_off(self) -> bool:
127         return not self.is_on()
128
129     @timeout(
130         10.0, use_signals=False, error_message="Timed out waiting for tplink.py"
131     )
132     def get_info(self) -> Optional[Dict]:
133         cmd = self.get_cmdline() + "-c info"
134         out = subprocess.getoutput(cmd)
135         out = re.sub("Sent:.*\n", "", out)
136         out = re.sub("Received: *", "", out)
137         try:
138             self.info = json.loads(out)["system"]["get_sysinfo"]
139             self.info_ts = datetime.datetime.now()
140             return self.info
141         except Exception as e:
142             logger.exception(e)
143             print(out, file=sys.stderr)
144             self.info = None
145             self.info_ts = None
146             return None
147
148     def get_on_duration_seconds(self) -> int:
149         self.info = self.get_info()
150         if self.info is None:
151             return 0
152         return int(self.info.get("on_time", "0"))
153
154
155 class TPLinkOutletWithChildren(TPLinkOutlet):
156     def __init__(self, name: str, mac: str, keywords: str = "") -> None:
157         super().__init__(name, mac, keywords)
158         self.children: List[str] = []
159         self.info: Optional[Dict] = None
160         self.info_ts: Optional[datetime.datetime] = None
161         assert "children" in self.keywords
162         self.info = self.get_info()
163         if self.info is not None:
164             for child in self.info["children"]:
165                 self.children.append(child["id"])
166
167     # override
168     def get_cmdline(self, child: Optional[str] = None) -> str:
169         cmd = (
170             f"{config.config['smart_outlets_tplink_location']} -m {self.mac} "
171             f"--no_logging_console "
172         )
173         if child is not None:
174             cmd += f"-x {child} "
175         return cmd
176
177     # override
178     def command(
179         self, cmd: str, child: str = None, extra_args: str = None
180     ) -> bool:
181         cmd = self.get_cmdline(child) + f"-c {cmd}"
182         if extra_args is not None:
183             cmd += f" {extra_args}"
184         logger.debug(f'About to execute {cmd}')
185         return tplink_outlet_command(cmd)
186
187     def get_children(self) -> List[str]:
188         return self.children
189
190     def turn_on(self, child: str = None) -> bool:
191         return self.command("on", child)
192
193     def turn_off(self, child: str = None) -> bool:
194         return self.command("off", child)
195
196     def get_on_duration_seconds(self, child: str = None) -> int:
197         self.info = self.get_info()
198         if child is None:
199             if self.info is None:
200                 return 0
201             return int(self.info.get("on_time", "0"))
202         else:
203             if self.info is None:
204                 return 0
205             for chi in self.info.get("children", {}):
206                 if chi["id"] == child:
207                     return int(chi.get("on_time", "0"))
208         return 0
209
210
211 class GoogleOutlet(BaseOutlet):
212     def __init__(self, name: str, mac: str, keywords: str = "") -> None:
213         super().__init__(name.strip(), mac.strip(), keywords)
214         self.info = None
215
216     def goog_name(self) -> str:
217         name = self.get_name()
218         return name.replace('_', ' ')
219
220     @staticmethod
221     def parse_google_response(response: GoogleResponse) -> bool:
222         return response.success
223
224     def turn_on(self) -> bool:
225         return GoogleOutlet.parse_google_response(
226             ask_google('turn {self.goog_name()} on')
227         )
228
229     def turn_off(self) -> bool:
230         return GoogleOutlet.parse_google_response(
231             ask_google('turn {self.goog_name()} off')
232         )
233
234     def is_on(self) -> bool:
235         r = ask_google(f'is {self.goog_name()} on?')
236         if not r.success:
237             return False
238         return 'is on' in r.audio_transcription
239
240     def is_off(self) -> bool:
241         return not self.is_on()
242
243
244 @decorator_utils.singleton
245 class MerossWrapper(object):
246     """Global singleton helper class for MerossOutlets.  Note that
247     instantiating this class causes HTTP traffic with an external
248     Meross server.  Meross blocks customers who hit their servers too
249     aggressively so MerossOutlet is lazy about creating instances of
250     this class.
251
252     """
253     def __init__(self):
254         self.loop = asyncio.get_event_loop()
255         self.email = os.environ.get('MEROSS_EMAIL') or scott_secrets.MEROSS_EMAIL
256         self.password = os.environ.get('MEROSS_PASSWORD') or scott_secrets.MEROSS_PASSWORD
257         self.devices = self.loop.run_until_complete(self.find_meross_devices())
258         atexit.register(self.loop.close)
259
260     async def find_meross_devices(self) -> List[Any]:
261         http_api_client = await MerossHttpClient.async_from_user_password(
262             email=self.email, password=self.password
263         )
264
265         # Setup and start the device manager
266         manager = MerossManager(http_client=http_api_client)
267         await manager.async_init()
268
269         # Discover devices
270         await manager.async_device_discovery()
271         devices = manager.find_devices()
272         for device in devices:
273             await device.async_update()
274         return devices
275
276     def get_meross_device_by_name(self, name: str) -> Optional[Any]:
277         name = name.lower()
278         name = name.replace('_', ' ')
279         for device in self.devices:
280             if device.name.lower() == name:
281                 return device
282         return None
283
284
285 class MerossOutlet(BaseOutlet):
286     def __init__(self, name: str, mac: str, keywords: str = '') -> None:
287         super().__init__(name, mac, keywords)
288         self.meross_wrapper = None
289         self.device = None
290
291     def lazy_initialize_device(self):
292         """If we make too many calls to Meross they will block us; only talk
293         to them when someone actually wants to control a device."""
294         if self.meross_wrapper is None:
295             self.meross_wrapper = MerossWrapper()
296             self.device = self.meross_wrapper.get_meross_device_by_name(self.name)
297             if self.device is None:
298                 raise Exception(f'{self.name} is not a known Meross device?!')
299
300     def turn_on(self) -> bool:
301         self.lazy_initialize_device()
302         self.meross_wrapper.loop.run_until_complete(
303             self.device.async_turn_on()
304         )
305         return True
306
307     def turn_off(self) -> bool:
308         self.lazy_initialize_device()
309         self.meross_wrapper.loop.run_until_complete(
310             self.device.async_turn_off()
311         )
312         return True
313
314     def is_on(self) -> bool:
315         self.lazy_initialize_device()
316         return self.device.is_on()
317
318     def is_off(self) -> bool:
319         return not self.is_on()