Scale back warnings.warn and add stacklevels= where appropriate.
[python_utils.git] / smart_home / outlets.py
1 #!/usr/bin/env python3
2
3 """Utilities for dealing with the smart outlets."""
4
5 from abc import abstractmethod
6 import asyncio
7 import atexit
8 import datetime
9 import json
10 import logging
11 import os
12 import re
13 import subprocess
14 import sys
15 from typing import Any, Dict, List, Optional
16
17 from meross_iot.http_api import MerossHttpClient
18 from meross_iot.manager import MerossManager
19
20 import argparse_utils
21 import config
22 import decorator_utils
23 import logging_utils
24 import scott_secrets
25 import smart_home.device as dev
26 from google_assistant import ask_google, GoogleResponse
27 from decorator_utils import timeout, memoized
28
29 logger = logging.getLogger(__name__)
30
31 parser = config.add_commandline_args(
32     f"Outlet Utils ({__file__})",
33     "Args related to smart outlets.",
34 )
35 parser.add_argument(
36     '--smart_outlets_tplink_location',
37     default='/home/scott/bin/tplink.py',
38     metavar='FILENAME',
39     help='The location of the tplink.py helper',
40     type=argparse_utils.valid_filename,
41 )
42
43
44 @timeout(
45     5.0, use_signals=False, error_message="Timed out waiting for tplink.py"
46 )
47 def tplink_outlet_command(command: str) -> bool:
48     result = os.system(command)
49     signal = result & 0xFF
50     if signal != 0:
51         msg = f'{command} died with signal {signal}'
52         logger.warning(msg)
53         logging_utils.hlog(msg)
54         return False
55     else:
56         exit_value = result >> 8
57         if exit_value != 0:
58             msg = f'{command} failed, exited {exit_value}'
59             logger.warning(msg)
60             logging_utils.hlog(msg)
61             return False
62     logger.debug(f'{command} succeeded.')
63     return True
64
65
66 class BaseOutlet(dev.Device):
67     def __init__(self, name: str, mac: str, keywords: str = "") -> None:
68         super().__init__(name.strip(), mac.strip(), keywords)
69         self.info = None
70
71     @abstractmethod
72     def turn_on(self) -> bool:
73         pass
74
75     @abstractmethod
76     def turn_off(self) -> bool:
77         pass
78
79     @abstractmethod
80     def is_on(self) -> bool:
81         pass
82
83     @abstractmethod
84     def is_off(self) -> bool:
85         pass
86
87
88 class TPLinkOutlet(BaseOutlet):
89     def __init__(self, name: str, mac: str, keywords: str = '') -> None:
90         super().__init__(name, mac, keywords)
91         self.info: Optional[Dict] = None
92         self.info_ts: Optional[datetime.datetime] = None
93
94     @memoized
95     def get_tplink_name(self) -> Optional[str]:
96         self.info = self.get_info()
97         if self.info is not None:
98             return self.info["alias"]
99         return None
100
101     def get_cmdline(self) -> str:
102         cmd = (
103             f"{config.config['smart_outlets_tplink_location']} -m {self.mac} "
104             f"--no_logging_console "
105         )
106         return cmd
107
108     def command(self, cmd: str, extra_args: str = None) -> bool:
109         cmd = self.get_cmdline() + f"-c {cmd}"
110         if extra_args is not None:
111             cmd += f" {extra_args}"
112         return tplink_outlet_command(cmd)
113
114     def turn_on(self) -> bool:
115         return self.command('on')
116
117     def turn_off(self) -> bool:
118         return self.command('off')
119
120     def is_on(self) -> bool:
121         return self.get_on_duration_seconds() > 0
122
123     def is_off(self) -> bool:
124         return not self.is_on()
125
126     @timeout(
127         10.0, use_signals=False, error_message="Timed out waiting for tplink.py"
128     )
129     def get_info(self) -> Optional[Dict]:
130         cmd = self.get_cmdline() + "-c info"
131         out = subprocess.getoutput(cmd)
132         out = re.sub("Sent:.*\n", "", out)
133         out = re.sub("Received: *", "", out)
134         try:
135             self.info = json.loads(out)["system"]["get_sysinfo"]
136             self.info_ts = datetime.datetime.now()
137             return self.info
138         except Exception as e:
139             logger.exception(e)
140             print(out, file=sys.stderr)
141             self.info = None
142             self.info_ts = None
143             return None
144
145     def get_on_duration_seconds(self) -> int:
146         self.info = self.get_info()
147         if self.info is None:
148             return 0
149         return int(self.info.get("on_time", "0"))
150
151
152 class TPLinkOutletWithChildren(TPLinkOutlet):
153     def __init__(self, name: str, mac: str, keywords: str = "") -> None:
154         super().__init__(name, mac, keywords)
155         self.children: List[str] = []
156         self.info: Optional[Dict] = None
157         self.info_ts: Optional[datetime.datetime] = None
158         assert "children" in self.keywords
159         self.info = self.get_info()
160         if self.info is not None:
161             for child in self.info["children"]:
162                 self.children.append(child["id"])
163
164     # override
165     def get_cmdline(self, child: Optional[str] = None) -> str:
166         cmd = (
167             f"{config.config['smart_outlets_tplink_location']} -m {self.mac} "
168             f"--no_logging_console "
169         )
170         if child is not None:
171             cmd += f"-x {child} "
172         return cmd
173
174     # override
175     def command(
176         self, cmd: str, child: str = None, extra_args: str = None
177     ) -> bool:
178         cmd = self.get_cmdline(child) + f"-c {cmd}"
179         if extra_args is not None:
180             cmd += f" {extra_args}"
181         logger.debug(f'About to execute {cmd}')
182         return tplink_outlet_command(cmd)
183
184     def get_children(self) -> List[str]:
185         return self.children
186
187     def turn_on(self, child: str = None) -> bool:
188         return self.command("on", child)
189
190     def turn_off(self, child: str = None) -> bool:
191         return self.command("off", child)
192
193     def get_on_duration_seconds(self, child: str = None) -> int:
194         self.info = self.get_info()
195         if child is None:
196             if self.info is None:
197                 return 0
198             return int(self.info.get("on_time", "0"))
199         else:
200             if self.info is None:
201                 return 0
202             for chi in self.info.get("children", {}):
203                 if chi["id"] == child:
204                     return int(chi.get("on_time", "0"))
205         return 0
206
207
208 class GoogleOutlet(BaseOutlet):
209     def __init__(self, name: str, mac: str, keywords: str = "") -> None:
210         super().__init__(name.strip(), mac.strip(), keywords)
211         self.info = None
212
213     def goog_name(self) -> str:
214         name = self.get_name()
215         return name.replace('_', ' ')
216
217     @staticmethod
218     def parse_google_response(response: GoogleResponse) -> bool:
219         return response.success
220
221     def turn_on(self) -> bool:
222         return GoogleOutlet.parse_google_response(
223             ask_google('turn {self.goog_name()} on')
224         )
225
226     def turn_off(self) -> bool:
227         return GoogleOutlet.parse_google_response(
228             ask_google('turn {self.goog_name()} off')
229         )
230
231     def is_on(self) -> bool:
232         r = ask_google(f'is {self.goog_name()} on?')
233         if not r.success:
234             return False
235         return 'is on' in r.audio_transcription
236
237     def is_off(self) -> bool:
238         return not self.is_on()
239
240
241 @decorator_utils.singleton
242 class MerossWrapper(object):
243     """Global singleton helper class for MerossOutlets.  Note that
244     instantiating this class causes HTTP traffic with an external
245     Meross server.  Meross blocks customers who hit their servers too
246     aggressively so MerossOutlet is lazy about creating instances of
247     this class.
248
249     """
250     def __init__(self):
251         self.loop = asyncio.get_event_loop()
252         self.email = os.environ.get('MEROSS_EMAIL') or scott_secrets.MEROSS_EMAIL
253         self.password = os.environ.get('MEROSS_PASSWORD') or scott_secrets.MEROSS_PASSWORD
254         self.devices = self.loop.run_until_complete(self.find_meross_devices())
255         atexit.register(self.loop.close)
256
257     async def find_meross_devices(self) -> List[Any]:
258         http_api_client = await MerossHttpClient.async_from_user_password(
259             email=self.email, password=self.password
260         )
261
262         # Setup and start the device manager
263         manager = MerossManager(http_client=http_api_client)
264         await manager.async_init()
265
266         # Discover devices
267         await manager.async_device_discovery()
268         devices = manager.find_devices()
269         for device in devices:
270             await device.async_update()
271         return devices
272
273     def get_meross_device_by_name(self, name: str) -> Optional[Any]:
274         name = name.lower()
275         name = name.replace('_', ' ')
276         for device in self.devices:
277             if device.name.lower() == name:
278                 return device
279         return None
280
281
282 class MerossOutlet(BaseOutlet):
283     def __init__(self, name: str, mac: str, keywords: str = '') -> None:
284         super().__init__(name, mac, keywords)
285         self.meross_wrapper = None
286         self.device = None
287
288     def lazy_initialize_device(self):
289         """If we make too many calls to Meross they will block us; only talk
290         to them when someone actually wants to control a device."""
291         if self.meross_wrapper is None:
292             self.meross_wrapper = MerossWrapper()
293             self.device = self.meross_wrapper.get_meross_device_by_name(self.name)
294             if self.device is None:
295                 raise Exception(f'{self.name} is not a known Meross device?!')
296
297     def turn_on(self) -> bool:
298         self.lazy_initialize_device()
299         self.meross_wrapper.loop.run_until_complete(
300             self.device.async_turn_on()
301         )
302         return True
303
304     def turn_off(self) -> bool:
305         self.lazy_initialize_device()
306         self.meross_wrapper.loop.run_until_complete(
307             self.device.async_turn_off()
308         )
309         return True
310
311     def is_on(self) -> bool:
312         self.lazy_initialize_device()
313         return self.device.is_on()
314
315     def is_off(self) -> bool:
316         return not self.is_on()