#!/usr/bin/env python3 """Utilities for dealing with the smart outlets.""" from abc import abstractmethod import asyncio import atexit import datetime import json import logging import os import re import subprocess import sys from typing import Any, Dict, List, Optional from meross_iot.http_api import MerossHttpClient from meross_iot.manager import MerossManager import argparse_utils import config import decorator_utils import logging_utils import scott_secrets import smart_home.device as dev from google_assistant import ask_google, GoogleResponse from decorator_utils import timeout, memoized logger = logging.getLogger(__name__) parser = config.add_commandline_args( f"Outlet Utils ({__file__})", "Args related to smart outlets.", ) parser.add_argument( '--smart_outlets_tplink_location', default='/home/scott/bin/tplink.py', metavar='FILENAME', help='The location of the tplink.py helper', type=argparse_utils.valid_filename, ) @timeout( 5.0, use_signals=False, error_message="Timed out waiting for tplink.py" ) def tplink_outlet_command(command: str) -> bool: result = os.system(command) signal = result & 0xFF if signal != 0: msg = f'{command} died with signal {signal}' logger.warning(msg) logging_utils.hlog(msg) return False else: exit_value = result >> 8 if exit_value != 0: msg = f'{command} failed, exited {exit_value}' logger.warning(msg) logging_utils.hlog(msg) return False logger.debug(f'{command} succeeded.') return True class BaseOutlet(dev.Device): def __init__(self, name: str, mac: str, keywords: str = "") -> None: super().__init__(name.strip(), mac.strip(), keywords) self.info = None @abstractmethod def turn_on(self) -> bool: pass @abstractmethod def turn_off(self) -> bool: pass @abstractmethod def is_on(self) -> bool: pass @abstractmethod def is_off(self) -> bool: pass class TPLinkOutlet(BaseOutlet): def __init__(self, name: str, mac: str, keywords: str = '') -> None: super().__init__(name, mac, keywords) self.info: Optional[Dict] = None self.info_ts: Optional[datetime.datetime] = None @memoized def get_tplink_name(self) -> Optional[str]: self.info = self.get_info() if self.info is not None: return self.info["alias"] return None def get_cmdline(self) -> str: cmd = ( f"{config.config['smart_outlets_tplink_location']} -m {self.mac} " f"--no_logging_console " ) return cmd def command(self, cmd: str, extra_args: str = None) -> bool: cmd = self.get_cmdline() + f"-c {cmd}" if extra_args is not None: cmd += f" {extra_args}" return tplink_outlet_command(cmd) def turn_on(self) -> bool: return self.command('on') def turn_off(self) -> bool: return self.command('off') def is_on(self) -> bool: return self.get_on_duration_seconds() > 0 def is_off(self) -> bool: return not self.is_on() @timeout( 10.0, use_signals=False, error_message="Timed out waiting for tplink.py" ) def get_info(self) -> Optional[Dict]: cmd = self.get_cmdline() + "-c info" out = subprocess.getoutput(cmd) out = re.sub("Sent:.*\n", "", out) out = re.sub("Received: *", "", out) try: self.info = json.loads(out)["system"]["get_sysinfo"] self.info_ts = datetime.datetime.now() return self.info except Exception as e: logger.exception(e) print(out, file=sys.stderr) self.info = None self.info_ts = None return None def get_on_duration_seconds(self) -> int: self.info = self.get_info() if self.info is None: return 0 return int(self.info.get("on_time", "0")) class TPLinkOutletWithChildren(TPLinkOutlet): def __init__(self, name: str, mac: str, keywords: str = "") -> None: super().__init__(name, mac, keywords) self.children: List[str] = [] self.info: Optional[Dict] = None self.info_ts: Optional[datetime.datetime] = None assert "children" in self.keywords self.info = self.get_info() if self.info is not None: for child in self.info["children"]: self.children.append(child["id"]) # override def get_cmdline(self, child: Optional[str] = None) -> str: cmd = ( f"{config.config['smart_outlets_tplink_location']} -m {self.mac} " f"--no_logging_console " ) if child is not None: cmd += f"-x {child} " return cmd # override def command( self, cmd: str, child: str = None, extra_args: str = None ) -> bool: cmd = self.get_cmdline(child) + f"-c {cmd}" if extra_args is not None: cmd += f" {extra_args}" logger.debug(f'About to execute {cmd}') return tplink_outlet_command(cmd) def get_children(self) -> List[str]: return self.children def turn_on(self, child: str = None) -> bool: return self.command("on", child) def turn_off(self, child: str = None) -> bool: return self.command("off", child) def get_on_duration_seconds(self, child: str = None) -> int: self.info = self.get_info() if child is None: if self.info is None: return 0 return int(self.info.get("on_time", "0")) else: if self.info is None: return 0 for chi in self.info.get("children", {}): if chi["id"] == child: return int(chi.get("on_time", "0")) return 0 class GoogleOutlet(BaseOutlet): def __init__(self, name: str, mac: str, keywords: str = "") -> None: super().__init__(name.strip(), mac.strip(), keywords) self.info = None def goog_name(self) -> str: name = self.get_name() return name.replace('_', ' ') @staticmethod def parse_google_response(response: GoogleResponse) -> bool: return response.success def turn_on(self) -> bool: return GoogleOutlet.parse_google_response( ask_google('turn {self.goog_name()} on') ) def turn_off(self) -> bool: return GoogleOutlet.parse_google_response( ask_google('turn {self.goog_name()} off') ) def is_on(self) -> bool: r = ask_google(f'is {self.goog_name()} on?') if not r.success: return False return 'is on' in r.audio_transcription def is_off(self) -> bool: return not self.is_on() @decorator_utils.singleton class MerossWrapper(object): """Global singleton helper class for MerossOutlets. Note that instantiating this class causes HTTP traffic with an external Meross server. Meross blocks customers who hit their servers too aggressively so MerossOutlet is lazy about creating instances of this class. """ def __init__(self): self.loop = asyncio.get_event_loop() self.email = os.environ.get('MEROSS_EMAIL') or scott_secrets.MEROSS_EMAIL self.password = os.environ.get('MEROSS_PASSWORD') or scott_secrets.MEROSS_PASSWORD self.devices = self.loop.run_until_complete(self.find_meross_devices()) atexit.register(self.loop.close) async def find_meross_devices(self) -> List[Any]: http_api_client = await MerossHttpClient.async_from_user_password( email=self.email, password=self.password ) # Setup and start the device manager manager = MerossManager(http_client=http_api_client) await manager.async_init() # Discover devices await manager.async_device_discovery() devices = manager.find_devices() for device in devices: await device.async_update() return devices def get_meross_device_by_name(self, name: str) -> Optional[Any]: name = name.lower() name = name.replace('_', ' ') for device in self.devices: if device.name.lower() == name: return device return None class MerossOutlet(BaseOutlet): def __init__(self, name: str, mac: str, keywords: str = '') -> None: super().__init__(name, mac, keywords) self.meross_wrapper = None self.device = None def lazy_initialize_device(self): """If we make too many calls to Meross they will block us; only talk to them when someone actually wants to control a device.""" if self.meross_wrapper is None: self.meross_wrapper = MerossWrapper() self.device = self.meross_wrapper.get_meross_device_by_name(self.name) if self.device is None: raise Exception(f'{self.name} is not a known Meross device?!') def turn_on(self) -> bool: self.lazy_initialize_device() self.meross_wrapper.loop.run_until_complete( self.device.async_turn_on() ) return True def turn_off(self) -> bool: self.lazy_initialize_device() self.meross_wrapper.loop.run_until_complete( self.device.async_turn_off() ) return True def is_on(self) -> bool: self.lazy_initialize_device() return self.device.is_on() def is_off(self) -> bool: return not self.is_on()