X-Git-Url: https://wannabe.guru.org/gitweb/?a=blobdiff_plain;f=smart_home%2Foutlets.py;h=fcc3c4eb8f7fa74d428d31c7ebe4814c5cb11cbd;hb=532df2c5b57c7517dfb3dddd8c1358fbadf8baf3;hp=81e10a71fa0cd4960c2e5bf1a03789c0663375e3;hpb=8bc64f43dac0b56e2ef734e183490e840d7382d6;p=python_utils.git diff --git a/smart_home/outlets.py b/smart_home/outlets.py index 81e10a7..fcc3c4e 100644 --- a/smart_home/outlets.py +++ b/smart_home/outlets.py @@ -1,23 +1,29 @@ #!/usr/bin/env python3 +# © Copyright 2021-2022, Scott Gasch + """Utilities for dealing with the smart outlets.""" -from abc import abstractmethod +import asyncio +import atexit import datetime -import json import logging import os -import re -import subprocess -import sys -from typing import Any, Dict, List, Optional, Set +from abc import abstractmethod +from typing import Any, Dict, List, Optional + +from meross_iot.http_api import MerossHttpClient +from meross_iot.manager import MerossManager +from overrides import overrides import argparse_utils import config -import logging_utils +import decorator_utils +import scott_secrets import smart_home.device as dev -from google_assistant import ask_google, GoogleResponse -from decorator_utils import timeout, memoized +import smart_home.tplink_utils as tplink +from decorator_utils import memoized +from google_assistant import GoogleResponse, ask_google logger = logging.getLogger(__name__) @@ -34,30 +40,11 @@ parser.add_argument( ) -@timeout( - 5.0, use_signals=False, error_message="Timed out waiting for tplink.py" -) -def tplink_outlet_command(command: str) -> bool: - result = os.system(command) - signal = result & 0xFF - if signal != 0: - logger.warning(f'{command} died with signal {signal}') - logging_utils.hlog("%s died with signal %d" % (command, signal)) - return False - else: - exit_value = result >> 8 - if exit_value != 0: - logger.warning(f'{command} failed, exited {exit_value}') - logging_utils.hlog("%s failed, exit %d" % (command, exit_value)) - return False - logger.debug(f'{command} succeeded.') - return True - - class BaseOutlet(dev.Device): + """An abstract base class for smart outlets.""" + def __init__(self, name: str, mac: str, keywords: str = "") -> None: super().__init__(name.strip(), mac.strip(), keywords) - self.info = None @abstractmethod def turn_on(self) -> bool: @@ -77,6 +64,8 @@ class BaseOutlet(dev.Device): class TPLinkOutlet(BaseOutlet): + """A TPLink smart outlet.""" + def __init__(self, name: str, mac: str, keywords: str = '') -> None: super().__init__(name, mac, keywords) self.info: Optional[Dict] = None @@ -96,42 +85,38 @@ class TPLinkOutlet(BaseOutlet): ) return cmd - def command(self, cmd: str, extra_args: str = None) -> bool: + def command(self, cmd: str, extra_args: str = None, **kwargs) -> bool: cmd = self.get_cmdline() + f"-c {cmd}" if extra_args is not None: cmd += f" {extra_args}" - return tplink_outlet_command(cmd) + return tplink.tplink_command_wrapper(cmd) + @overrides def turn_on(self) -> bool: return self.command('on') + @overrides def turn_off(self) -> bool: return self.command('off') + @overrides def is_on(self) -> bool: return self.get_on_duration_seconds() > 0 + @overrides def is_off(self) -> bool: return not self.is_on() - @timeout( - 10.0, use_signals=False, error_message="Timed out waiting for tplink.py" - ) def get_info(self) -> Optional[Dict]: - cmd = self.get_cmdline() + "-c info" - out = subprocess.getoutput(cmd) - out = re.sub("Sent:.*\n", "", out) - out = re.sub("Received: *", "", out) - try: - self.info = json.loads(out)["system"]["get_sysinfo"] - self.info_ts = datetime.datetime.now() + ip = self.get_ip() + if ip is not None: + self.info = tplink.tplink_get_info(ip) + if self.info is not None: + self.info_ts = datetime.datetime.now() + else: + self.info_ts = None return self.info - except Exception as e: - logger.exception(e) - print(out, file=sys.stderr) - self.info = None - self.info_ts = None - return None + return None def get_on_duration_seconds(self) -> int: self.info = self.get_info() @@ -141,53 +126,58 @@ class TPLinkOutlet(BaseOutlet): class TPLinkOutletWithChildren(TPLinkOutlet): + """A TPLink outlet where the top and bottom plus are individually + controllable.""" + def __init__(self, name: str, mac: str, keywords: str = "") -> None: super().__init__(name, mac, keywords) self.children: List[str] = [] self.info: Optional[Dict] = None self.info_ts: Optional[datetime.datetime] = None + assert self.keywords is not None assert "children" in self.keywords self.info = self.get_info() if self.info is not None: for child in self.info["children"]: self.children.append(child["id"]) - # override - def get_cmdline(self, child: Optional[str] = None) -> str: - cmd = ( - f"{config.config['smart_outlets_tplink_location']} -m {self.mac} " - f"--no_logging_console " - ) + def get_cmdline_with_child(self, child: Optional[str] = None) -> str: + cmd = super().get_cmdline() if child is not None: cmd += f"-x {child} " return cmd - # override - def command( - self, cmd: str, child: str = None, extra_args: str = None - ) -> bool: - cmd = self.get_cmdline(child) + f"-c {cmd}" + @overrides + def command(self, cmd: str, extra_args: str = None, **kwargs) -> bool: + child: Optional[str] = kwargs.get('child', None) + cmd = self.get_cmdline_with_child(child) + f"-c {cmd}" if extra_args is not None: cmd += f" {extra_args}" - logger.debug(f'About to execute {cmd}') - return tplink_outlet_command(cmd) + logger.debug('About to execute: %s', cmd) + return tplink.tplink_command_wrapper(cmd) def get_children(self) -> List[str]: return self.children - def turn_on(self, child: str = None) -> bool: + @overrides + def turn_on(self) -> bool: + return self.command("on", None) + + @overrides + def turn_off(self) -> bool: + return self.command("off", None) + + def turn_on_child(self, child: str = None) -> bool: return self.command("on", child) - def turn_off(self, child: str = None) -> bool: + def turn_off_child(self, child: str = None) -> bool: return self.command("off", child) - def get_on_duration_seconds(self, child: str = None) -> int: - self.info = self.get_info() + def get_child_on_duration_seconds(self, child: str = None) -> int: if child is None: - if self.info is None: - return 0 - return int(self.info.get("on_time", "0")) + return super().get_on_duration_seconds() else: + self.info = self.get_info() if self.info is None: return 0 for chi in self.info.get("children", {}): @@ -197,6 +187,8 @@ class TPLinkOutletWithChildren(TPLinkOutlet): class GoogleOutlet(BaseOutlet): + """A smart outlet controlled via Google Assistant.""" + def __init__(self, name: str, mac: str, keywords: str = "") -> None: super().__init__(name.strip(), mac.strip(), keywords) self.info = None @@ -209,21 +201,109 @@ class GoogleOutlet(BaseOutlet): def parse_google_response(response: GoogleResponse) -> bool: return response.success + @overrides def turn_on(self) -> bool: - return GoogleOutlet.parse_google_response( - ask_google('turn {self.goog_name()} on') - ) + return GoogleOutlet.parse_google_response(ask_google(f'turn {self.goog_name()} on')) + @overrides def turn_off(self) -> bool: - return GoogleOutlet.parse_google_response( - ask_google('turn {self.goog_name()} off') - ) + return GoogleOutlet.parse_google_response(ask_google(f'turn {self.goog_name()} off')) + @overrides def is_on(self) -> bool: r = ask_google(f'is {self.goog_name()} on?') if not r.success: return False - return 'is on' in r.audio_transcription + if r.audio_transcription is not None: + return 'is on' in r.audio_transcription + raise Exception('Can\'t talk to Google right now!?') + + @overrides + def is_off(self) -> bool: + return not self.is_on() + + +@decorator_utils.singleton +class MerossWrapper(object): + """Global singleton helper class for MerossOutlets. Note that + instantiating this class causes HTTP traffic with an external + Meross server. Meross blocks customers who hit their servers too + aggressively so MerossOutlet is lazy about creating instances of + this class. + + """ + + def __init__(self): + self.loop = asyncio.get_event_loop() + self.email = os.environ.get('MEROSS_EMAIL') or scott_secrets.MEROSS_EMAIL + self.password = os.environ.get('MEROSS_PASSWORD') or scott_secrets.MEROSS_PASSWORD + self.devices = self.loop.run_until_complete(self.find_meross_devices()) + atexit.register(self.loop.close) + + async def find_meross_devices(self) -> List[Any]: + http_api_client = await MerossHttpClient.async_from_user_password( + email=self.email, password=self.password + ) + + # Setup and start the device manager + manager = MerossManager(http_client=http_api_client) + await manager.async_init() + + # Discover devices + await manager.async_device_discovery() + devices = manager.find_devices() + for device in devices: + await device.async_update() + return devices + + def get_meross_device_by_name(self, name: str) -> Optional[Any]: + name = name.lower() + name = name.replace('_', ' ') + for device in self.devices: + if device.name.lower() == name: + return device + return None + + +class MerossOutlet(BaseOutlet): + """A Meross smart outlet class.""" + + def __init__(self, name: str, mac: str, keywords: str = '') -> None: + super().__init__(name, mac, keywords) + self.meross_wrapper: Optional[MerossWrapper] = None + self.device: Optional[Any] = None + + def lazy_initialize_device(self): + """If we make too many calls to Meross they will block us; only talk + to them when someone actually wants to control a device.""" + if self.meross_wrapper is None: + self.meross_wrapper = MerossWrapper() + self.device = self.meross_wrapper.get_meross_device_by_name(self.name) + if self.device is None: + raise Exception(f'{self.name} is not a known Meross device?!') + + @overrides + def turn_on(self) -> bool: + self.lazy_initialize_device() + assert self.meross_wrapper is not None + assert self.device is not None + self.meross_wrapper.loop.run_until_complete(self.device.async_turn_on()) + return True + + @overrides + def turn_off(self) -> bool: + self.lazy_initialize_device() + assert self.meross_wrapper is not None + assert self.device is not None + self.meross_wrapper.loop.run_until_complete(self.device.async_turn_off()) + return True + + @overrides + def is_on(self) -> bool: + self.lazy_initialize_device() + assert self.device is not None + return self.device.is_on() + @overrides def is_off(self) -> bool: return not self.is_on()