#!/usr/bin/env python3 """Utilities for dealing with the smart outlets.""" from abc import abstractmethod import datetime import json import logging import os import re import subprocess import sys from typing import Any, Dict, List, Optional, Set import argparse_utils import config import logging_utils import smart_home.device as dev from google_assistant import ask_google, GoogleResponse from decorator_utils import timeout, memoized logger = logging.getLogger(__name__) parser = config.add_commandline_args( f"Outlet Utils ({__file__})", "Args related to smart outlets.", ) parser.add_argument( '--smart_outlets_tplink_location', default='/home/scott/bin/tplink.py', metavar='FILENAME', help='The location of the tplink.py helper', type=argparse_utils.valid_filename, ) @timeout( 5.0, use_signals=False, error_message="Timed out waiting for tplink.py" ) def tplink_outlet_command(command: str) -> bool: result = os.system(command) signal = result & 0xFF if signal != 0: logger.warning(f'{command} died with signal {signal}') logging_utils.hlog("%s died with signal %d" % (command, signal)) return False else: exit_value = result >> 8 if exit_value != 0: logger.warning(f'{command} failed, exited {exit_value}') logging_utils.hlog("%s failed, exit %d" % (command, exit_value)) return False logger.debug(f'{command} succeeded.') return True class BaseOutlet(dev.Device): def __init__(self, name: str, mac: str, keywords: str = "") -> None: super().__init__(name.strip(), mac.strip(), keywords) self.info = None @abstractmethod def turn_on(self) -> bool: pass @abstractmethod def turn_off(self) -> bool: pass @abstractmethod def is_on(self) -> bool: pass @abstractmethod def is_off(self) -> bool: pass class TPLinkOutlet(BaseOutlet): def __init__(self, name: str, mac: str, keywords: str = '') -> None: super().__init__(name, mac, keywords) self.info: Optional[Dict] = None self.info_ts: Optional[datetime.datetime] = None @memoized def get_tplink_name(self) -> Optional[str]: self.info = self.get_info() if self.info is not None: return self.info["alias"] return None def get_cmdline(self) -> str: cmd = ( f"{config.config['smart_outlets_tplink_location']} -m {self.mac} " f"--no_logging_console " ) return cmd def command(self, cmd: str, extra_args: str = None) -> bool: cmd = self.get_cmdline() + f"-c {cmd}" if extra_args is not None: cmd += f" {extra_args}" return tplink_outlet_command(cmd) def turn_on(self) -> bool: return self.command('on') def turn_off(self) -> bool: return self.command('off') def is_on(self) -> bool: return self.get_on_duration_seconds() > 0 def is_off(self) -> bool: return not self.is_on() @timeout( 10.0, use_signals=False, error_message="Timed out waiting for tplink.py" ) def get_info(self) -> Optional[Dict]: cmd = self.get_cmdline() + "-c info" out = subprocess.getoutput(cmd) out = re.sub("Sent:.*\n", "", out) out = re.sub("Received: *", "", out) try: self.info = json.loads(out)["system"]["get_sysinfo"] self.info_ts = datetime.datetime.now() return self.info except Exception as e: logger.exception(e) print(out, file=sys.stderr) self.info = None self.info_ts = None return None def get_on_duration_seconds(self) -> int: self.info = self.get_info() if self.info is None: return 0 return int(self.info.get("on_time", "0")) class TPLinkOutletWithChildren(TPLinkOutlet): def __init__(self, name: str, mac: str, keywords: str = "") -> None: super().__init__(name, mac, keywords) self.children: List[str] = [] self.info: Optional[Dict] = None self.info_ts: Optional[datetime.datetime] = None assert "children" in self.keywords self.info = self.get_info() if self.info is not None: for child in self.info["children"]: self.children.append(child["id"]) # override def get_cmdline(self, child: Optional[str] = None) -> str: cmd = ( f"{config.config['smart_outlets_tplink_location']} -m {self.mac} " f"--no_logging_console " ) if child is not None: cmd += f"-x {child} " return cmd # override def command( self, cmd: str, child: str = None, extra_args: str = None ) -> bool: cmd = self.get_cmdline(child) + f"-c {cmd}" if extra_args is not None: cmd += f" {extra_args}" logger.debug(f'About to execute {cmd}') return tplink_outlet_command(cmd) def get_children(self) -> List[str]: return self.children def turn_on(self, child: str = None) -> bool: return self.command("on", child) def turn_off(self, child: str = None) -> bool: return self.command("off", child) def get_on_duration_seconds(self, child: str = None) -> int: self.info = self.get_info() if child is None: if self.info is None: return 0 return int(self.info.get("on_time", "0")) else: if self.info is None: return 0 for chi in self.info.get("children", {}): if chi["id"] == child: return int(chi.get("on_time", "0")) return 0 class GoogleOutlet(BaseOutlet): def __init__(self, name: str, mac: str, keywords: str = "") -> None: super().__init__(name.strip(), mac.strip(), keywords) self.info = None def goog_name(self) -> str: name = self.get_name() return name.replace('_', ' ') @staticmethod def parse_google_response(response: GoogleResponse) -> bool: return response.success def turn_on(self) -> bool: return GoogleOutlet.parse_google_response( ask_google('turn {self.goog_name()} on') ) def turn_off(self) -> bool: return GoogleOutlet.parse_google_response( ask_google('turn {self.goog_name()} off') ) def is_on(self) -> bool: r = ask_google(f'is {self.goog_name()} on?') if not r.success: return False return 'is on' in r.audio_transcription def is_off(self) -> bool: return not self.is_on()