More cleanup.
[python_utils.git] / smart_home / outlets.py
1 #!/usr/bin/env python3
2
3 """Utilities for dealing with the smart outlets."""
4
5 import asyncio
6 import atexit
7 import datetime
8 import json
9 import logging
10 import os
11 import re
12 import subprocess
13 import sys
14 from abc import abstractmethod
15 from typing import Any, Dict, List, Optional
16
17 from meross_iot.http_api import MerossHttpClient
18 from meross_iot.manager import MerossManager
19 from overrides import overrides
20
21 import argparse_utils
22 import config
23 import decorator_utils
24 import logging_utils
25 import scott_secrets
26 import smart_home.device as dev
27 from decorator_utils import memoized, timeout
28 from google_assistant import GoogleResponse, ask_google
29
30 logger = logging.getLogger(__name__)
31
32 parser = config.add_commandline_args(
33     f"Outlet Utils ({__file__})",
34     "Args related to smart outlets.",
35 )
36 parser.add_argument(
37     '--smart_outlets_tplink_location',
38     default='/home/scott/bin/tplink.py',
39     metavar='FILENAME',
40     help='The location of the tplink.py helper',
41     type=argparse_utils.valid_filename,
42 )
43
44
45 @timeout(5.0, use_signals=False, error_message="Timed out waiting for tplink.py")
46 def tplink_outlet_command(command: str) -> bool:
47     result = os.system(command)
48     signal = result & 0xFF
49     if signal != 0:
50         msg = f'{command} died with signal {signal}'
51         logger.warning(msg)
52         logging_utils.hlog(msg)
53         return False
54     else:
55         exit_value = result >> 8
56         if exit_value != 0:
57             msg = f'{command} failed, exited {exit_value}'
58             logger.warning(msg)
59             logging_utils.hlog(msg)
60             return False
61     logger.debug('%s succeeded.', command)
62     return True
63
64
65 class BaseOutlet(dev.Device):
66     """An abstract base class for smart outlets."""
67
68     def __init__(self, name: str, mac: str, keywords: str = "") -> None:
69         super().__init__(name.strip(), mac.strip(), keywords)
70
71     @abstractmethod
72     def turn_on(self) -> bool:
73         pass
74
75     @abstractmethod
76     def turn_off(self) -> bool:
77         pass
78
79     @abstractmethod
80     def is_on(self) -> bool:
81         pass
82
83     @abstractmethod
84     def is_off(self) -> bool:
85         pass
86
87
88 class TPLinkOutlet(BaseOutlet):
89     """A TPLink smart outlet."""
90
91     def __init__(self, name: str, mac: str, keywords: str = '') -> None:
92         super().__init__(name, mac, keywords)
93         self.info: Optional[Dict] = None
94         self.info_ts: Optional[datetime.datetime] = None
95
96     @memoized
97     def get_tplink_name(self) -> Optional[str]:
98         self.info = self.get_info()
99         if self.info is not None:
100             return self.info["alias"]
101         return None
102
103     def get_cmdline(self) -> str:
104         cmd = (
105             f"{config.config['smart_outlets_tplink_location']} -m {self.mac} "
106             f"--no_logging_console "
107         )
108         return cmd
109
110     def command(self, cmd: str, extra_args: str = None, **kwargs) -> bool:
111         cmd = self.get_cmdline() + f"-c {cmd}"
112         if extra_args is not None:
113             cmd += f" {extra_args}"
114         return tplink_outlet_command(cmd)
115
116     @overrides
117     def turn_on(self) -> bool:
118         return self.command('on')
119
120     @overrides
121     def turn_off(self) -> bool:
122         return self.command('off')
123
124     @overrides
125     def is_on(self) -> bool:
126         return self.get_on_duration_seconds() > 0
127
128     @overrides
129     def is_off(self) -> bool:
130         return not self.is_on()
131
132     @timeout(10.0, use_signals=False, error_message="Timed out waiting for tplink.py")
133     def get_info(self) -> Optional[Dict]:
134         cmd = self.get_cmdline() + "-c info"
135         out = subprocess.getoutput(cmd)
136         out = re.sub("Sent:.*\n", "", out)
137         out = re.sub("Received: *", "", out)
138         try:
139             self.info = json.loads(out)["system"]["get_sysinfo"]
140             self.info_ts = datetime.datetime.now()
141             return self.info
142         except Exception as e:
143             logger.exception(e)
144             print(out, file=sys.stderr)
145             self.info = None
146             self.info_ts = None
147             return None
148
149     def get_on_duration_seconds(self) -> int:
150         self.info = self.get_info()
151         if self.info is None:
152             return 0
153         return int(self.info.get("on_time", "0"))
154
155
156 class TPLinkOutletWithChildren(TPLinkOutlet):
157     """A TPLink outlet where the top and bottom plus are individually
158     controllable."""
159
160     def __init__(self, name: str, mac: str, keywords: str = "") -> None:
161         super().__init__(name, mac, keywords)
162         self.children: List[str] = []
163         self.info: Optional[Dict] = None
164         self.info_ts: Optional[datetime.datetime] = None
165         assert self.keywords is not None
166         assert "children" in self.keywords
167         self.info = self.get_info()
168         if self.info is not None:
169             for child in self.info["children"]:
170                 self.children.append(child["id"])
171
172     @overrides
173     def get_cmdline(self, child: Optional[str] = None) -> str:
174         cmd = (
175             f"{config.config['smart_outlets_tplink_location']} -m {self.mac} "
176             f"--no_logging_console "
177         )
178         if child is not None:
179             cmd += f"-x {child} "
180         return cmd
181
182     @overrides
183     def command(self, cmd: str, extra_args: str = None, **kwargs) -> bool:
184         child: Optional[str] = kwargs.get('child', None)
185         cmd = self.get_cmdline(child) + f"-c {cmd}"
186         if extra_args is not None:
187             cmd += f" {extra_args}"
188         logger.debug('About to execute: %s', cmd)
189         return tplink_outlet_command(cmd)
190
191     def get_children(self) -> List[str]:
192         return self.children
193
194     @overrides
195     def turn_on(self, child: str = None) -> bool:
196         return self.command("on", child)
197
198     @overrides
199     def turn_off(self, child: str = None) -> bool:
200         return self.command("off", child)
201
202     def get_on_duration_seconds(self, child: str = None) -> int:
203         self.info = self.get_info()
204         if child is None:
205             if self.info is None:
206                 return 0
207             return int(self.info.get("on_time", "0"))
208         else:
209             if self.info is None:
210                 return 0
211             for chi in self.info.get("children", {}):
212                 if chi["id"] == child:
213                     return int(chi.get("on_time", "0"))
214         return 0
215
216
217 class GoogleOutlet(BaseOutlet):
218     """A smart outlet controlled via Google Assistant."""
219
220     def __init__(self, name: str, mac: str, keywords: str = "") -> None:
221         super().__init__(name.strip(), mac.strip(), keywords)
222         self.info = None
223
224     def goog_name(self) -> str:
225         name = self.get_name()
226         return name.replace('_', ' ')
227
228     @staticmethod
229     def parse_google_response(response: GoogleResponse) -> bool:
230         return response.success
231
232     @overrides
233     def turn_on(self) -> bool:
234         return GoogleOutlet.parse_google_response(ask_google(f'turn {self.goog_name()} on'))
235
236     @overrides
237     def turn_off(self) -> bool:
238         return GoogleOutlet.parse_google_response(ask_google(f'turn {self.goog_name()} off'))
239
240     @overrides
241     def is_on(self) -> bool:
242         r = ask_google(f'is {self.goog_name()} on?')
243         if not r.success:
244             return False
245         if r.audio_transcription is not None:
246             return 'is on' in r.audio_transcription
247         raise Exception('Can\'t talk to Google right now!?')
248
249     @overrides
250     def is_off(self) -> bool:
251         return not self.is_on()
252
253
254 @decorator_utils.singleton
255 class MerossWrapper(object):
256     """Global singleton helper class for MerossOutlets.  Note that
257     instantiating this class causes HTTP traffic with an external
258     Meross server.  Meross blocks customers who hit their servers too
259     aggressively so MerossOutlet is lazy about creating instances of
260     this class.
261
262     """
263
264     def __init__(self):
265         self.loop = asyncio.get_event_loop()
266         self.email = os.environ.get('MEROSS_EMAIL') or scott_secrets.MEROSS_EMAIL
267         self.password = os.environ.get('MEROSS_PASSWORD') or scott_secrets.MEROSS_PASSWORD
268         self.devices = self.loop.run_until_complete(self.find_meross_devices())
269         atexit.register(self.loop.close)
270
271     async def find_meross_devices(self) -> List[Any]:
272         http_api_client = await MerossHttpClient.async_from_user_password(
273             email=self.email, password=self.password
274         )
275
276         # Setup and start the device manager
277         manager = MerossManager(http_client=http_api_client)
278         await manager.async_init()
279
280         # Discover devices
281         await manager.async_device_discovery()
282         devices = manager.find_devices()
283         for device in devices:
284             await device.async_update()
285         return devices
286
287     def get_meross_device_by_name(self, name: str) -> Optional[Any]:
288         name = name.lower()
289         name = name.replace('_', ' ')
290         for device in self.devices:
291             if device.name.lower() == name:
292                 return device
293         return None
294
295
296 class MerossOutlet(BaseOutlet):
297     """A Meross smart outlet class."""
298
299     def __init__(self, name: str, mac: str, keywords: str = '') -> None:
300         super().__init__(name, mac, keywords)
301         self.meross_wrapper: Optional[MerossWrapper] = None
302         self.device: Optional[Any] = None
303
304     def lazy_initialize_device(self):
305         """If we make too many calls to Meross they will block us; only talk
306         to them when someone actually wants to control a device."""
307         if self.meross_wrapper is None:
308             self.meross_wrapper = MerossWrapper()
309             self.device = self.meross_wrapper.get_meross_device_by_name(self.name)
310             if self.device is None:
311                 raise Exception(f'{self.name} is not a known Meross device?!')
312
313     @overrides
314     def turn_on(self) -> bool:
315         self.lazy_initialize_device()
316         assert self.meross_wrapper is not None
317         assert self.device is not None
318         self.meross_wrapper.loop.run_until_complete(self.device.async_turn_on())
319         return True
320
321     @overrides
322     def turn_off(self) -> bool:
323         self.lazy_initialize_device()
324         assert self.meross_wrapper is not None
325         assert self.device is not None
326         self.meross_wrapper.loop.run_until_complete(self.device.async_turn_off())
327         return True
328
329     @overrides
330     def is_on(self) -> bool:
331         self.lazy_initialize_device()
332         assert self.device is not None
333         return self.device.is_on()
334
335     @overrides
336     def is_off(self) -> bool:
337         return not self.is_on()