"""Utilities for dealing with the smart outlets."""
-from abc import abstractmethod
+import asyncio
+import atexit
import datetime
import json
import logging
import re
import subprocess
import sys
-from typing import Any, Dict, List, Optional, Set
+from abc import abstractmethod
+from typing import Any, Dict, List, Optional
+
+from meross_iot.http_api import MerossHttpClient
+from meross_iot.manager import MerossManager
+from overrides import overrides
import argparse_utils
import config
+import decorator_utils
import logging_utils
+import scott_secrets
import smart_home.device as dev
-from google_assistant import ask_google, GoogleResponse
-from decorator_utils import timeout, memoized
+from decorator_utils import memoized, timeout
+from google_assistant import GoogleResponse, ask_google
logger = logging.getLogger(__name__)
)
-@timeout(
- 5.0, use_signals=False, error_message="Timed out waiting for tplink.py"
-)
+@timeout(5.0, use_signals=False, error_message="Timed out waiting for tplink.py")
def tplink_outlet_command(command: str) -> bool:
result = os.system(command)
signal = result & 0xFF
if signal != 0:
- logger.warning(f'{command} died with signal {signal}')
- logging_utils.hlog("%s died with signal %d" % (command, signal))
+ msg = f'{command} died with signal {signal}'
+ logger.warning(msg)
+ logging_utils.hlog(msg)
return False
else:
exit_value = result >> 8
if exit_value != 0:
- logger.warning(f'{command} failed, exited {exit_value}')
- logging_utils.hlog("%s failed, exit %d" % (command, exit_value))
+ msg = f'{command} failed, exited {exit_value}'
+ logger.warning(msg)
+ logging_utils.hlog(msg)
return False
logger.debug(f'{command} succeeded.')
return True
class BaseOutlet(dev.Device):
def __init__(self, name: str, mac: str, keywords: str = "") -> None:
super().__init__(name.strip(), mac.strip(), keywords)
- self.info = None
@abstractmethod
def turn_on(self) -> bool:
)
return cmd
- def command(self, cmd: str, extra_args: str = None) -> bool:
+ def command(self, cmd: str, extra_args: str = None, **kwargs) -> bool:
cmd = self.get_cmdline() + f"-c {cmd}"
if extra_args is not None:
cmd += f" {extra_args}"
return tplink_outlet_command(cmd)
+ @overrides
def turn_on(self) -> bool:
return self.command('on')
+ @overrides
def turn_off(self) -> bool:
return self.command('off')
+ @overrides
def is_on(self) -> bool:
return self.get_on_duration_seconds() > 0
+ @overrides
def is_off(self) -> bool:
return not self.is_on()
- @timeout(
- 10.0, use_signals=False, error_message="Timed out waiting for tplink.py"
- )
+ @timeout(10.0, use_signals=False, error_message="Timed out waiting for tplink.py")
def get_info(self) -> Optional[Dict]:
cmd = self.get_cmdline() + "-c info"
out = subprocess.getoutput(cmd)
for child in self.info["children"]:
self.children.append(child["id"])
- # override
+ @overrides
def get_cmdline(self, child: Optional[str] = None) -> str:
cmd = (
f"{config.config['smart_outlets_tplink_location']} -m {self.mac} "
cmd += f"-x {child} "
return cmd
- # override
- def command(
- self, cmd: str, child: str = None, extra_args: str = None
- ) -> bool:
+ @overrides
+ def command(self, cmd: str, extra_args: str = None, **kwargs) -> bool:
+ child: Optional[str] = kwargs.get('child', None)
cmd = self.get_cmdline(child) + f"-c {cmd}"
if extra_args is not None:
cmd += f" {extra_args}"
logger.debug(f'About to execute {cmd}')
- return tplink_light_command(cmd)
+ return tplink_outlet_command(cmd)
def get_children(self) -> List[str]:
return self.children
+ @overrides
def turn_on(self, child: str = None) -> bool:
return self.command("on", child)
+ @overrides
def turn_off(self, child: str = None) -> bool:
return self.command("off", child)
def parse_google_response(response: GoogleResponse) -> bool:
return response.success
+ @overrides
def turn_on(self) -> bool:
return GoogleOutlet.parse_google_response(
- ask_google('turn {self.goog_name()} on')
+ ask_google(f'turn {self.goog_name()} on')
)
+ @overrides
def turn_off(self) -> bool:
return GoogleOutlet.parse_google_response(
- ask_google('turn {self.goog_name()} off')
+ ask_google(f'turn {self.goog_name()} off')
)
+ @overrides
def is_on(self) -> bool:
r = ask_google(f'is {self.goog_name()} on?')
if not r.success:
return False
- return 'is on' in r.audio_transcription
+ if r.audio_transcription is not None:
+ return 'is on' in r.audio_transcription
+ raise Exception('Can\'t talk to Google right now!?')
+
+ @overrides
+ def is_off(self) -> bool:
+ return not self.is_on()
+
+
+@decorator_utils.singleton
+class MerossWrapper(object):
+ """Global singleton helper class for MerossOutlets. Note that
+ instantiating this class causes HTTP traffic with an external
+ Meross server. Meross blocks customers who hit their servers too
+ aggressively so MerossOutlet is lazy about creating instances of
+ this class.
+
+ """
+
+ def __init__(self):
+ self.loop = asyncio.get_event_loop()
+ self.email = os.environ.get('MEROSS_EMAIL') or scott_secrets.MEROSS_EMAIL
+ self.password = (
+ os.environ.get('MEROSS_PASSWORD') or scott_secrets.MEROSS_PASSWORD
+ )
+ self.devices = self.loop.run_until_complete(self.find_meross_devices())
+ atexit.register(self.loop.close)
+
+ async def find_meross_devices(self) -> List[Any]:
+ http_api_client = await MerossHttpClient.async_from_user_password(
+ email=self.email, password=self.password
+ )
+
+ # Setup and start the device manager
+ manager = MerossManager(http_client=http_api_client)
+ await manager.async_init()
+
+ # Discover devices
+ await manager.async_device_discovery()
+ devices = manager.find_devices()
+ for device in devices:
+ await device.async_update()
+ return devices
+
+ def get_meross_device_by_name(self, name: str) -> Optional[Any]:
+ name = name.lower()
+ name = name.replace('_', ' ')
+ for device in self.devices:
+ if device.name.lower() == name:
+ return device
+ return None
+
+
+class MerossOutlet(BaseOutlet):
+ def __init__(self, name: str, mac: str, keywords: str = '') -> None:
+ super().__init__(name, mac, keywords)
+ self.meross_wrapper: Optional[MerossWrapper] = None
+ self.device: Optional[Any] = None
+
+ def lazy_initialize_device(self):
+ """If we make too many calls to Meross they will block us; only talk
+ to them when someone actually wants to control a device."""
+ if self.meross_wrapper is None:
+ self.meross_wrapper = MerossWrapper()
+ self.device = self.meross_wrapper.get_meross_device_by_name(self.name)
+ if self.device is None:
+ raise Exception(f'{self.name} is not a known Meross device?!')
+
+ @overrides
+ def turn_on(self) -> bool:
+ self.lazy_initialize_device()
+ assert self.meross_wrapper is not None
+ assert self.device is not None
+ self.meross_wrapper.loop.run_until_complete(self.device.async_turn_on())
+ return True
+
+ @overrides
+ def turn_off(self) -> bool:
+ self.lazy_initialize_device()
+ assert self.meross_wrapper is not None
+ assert self.device is not None
+ self.meross_wrapper.loop.run_until_complete(self.device.async_turn_off())
+ return True
+
+ @overrides
+ def is_on(self) -> bool:
+ self.lazy_initialize_device()
+ assert self.device is not None
+ return self.device.is_on()
+ @overrides
def is_off(self) -> bool:
return not self.is_on()