+#!/usr/bin/env python3
+
+"""Utilities for dealing with the smart outlets."""
+
+from abc import abstractmethod
+import datetime
+import json
+import logging
+import os
+import re
+import subprocess
+import sys
+from typing import Any, Dict, List, Optional, Set
+
+import argparse_utils
+import config
+import logging_utils
+import smart_home.device as dev
+from google_assistant import ask_google, GoogleResponse
+from decorator_utils import timeout, memoized
+
+logger = logging.getLogger(__name__)
+
+parser = config.add_commandline_args(
+ f"Outlet Utils ({__file__})",
+ "Args related to smart outlets.",
+)
+parser.add_argument(
+ '--smart_outlets_tplink_location',
+ default='/home/scott/bin/tplink.py',
+ metavar='FILENAME',
+ help='The location of the tplink.py helper',
+ type=argparse_utils.valid_filename,
+)
+
+
+@timeout(
+ 5.0, use_signals=False, error_message="Timed out waiting for tplink.py"
+)
+def tplink_outlet_command(command: str) -> bool:
+ result = os.system(command)
+ signal = result & 0xFF
+ if signal != 0:
+ logger.warning(f'{command} died with signal {signal}')
+ logging_utils.hlog("%s died with signal %d" % (command, signal))
+ return False
+ else:
+ exit_value = result >> 8
+ if exit_value != 0:
+ logger.warning(f'{command} failed, exited {exit_value}')
+ logging_utils.hlog("%s failed, exit %d" % (command, exit_value))
+ return False
+ logger.debug(f'{command} succeeded.')
+ return True
+
+
+class BaseOutlet(dev.Device):
+ def __init__(self, name: str, mac: str, keywords: str = "") -> None:
+ super().__init__(name.strip(), mac.strip(), keywords)
+ self.info = None
+
+ @abstractmethod
+ def turn_on(self) -> bool:
+ pass
+
+ @abstractmethod
+ def turn_off(self) -> bool:
+ pass
+
+ @abstractmethod
+ def is_on(self) -> bool:
+ pass
+
+ @abstractmethod
+ def is_off(self) -> bool:
+ pass
+
+
+class TPLinkOutlet(BaseOutlet):
+ def __init__(self, name: str, mac: str, keywords: str = '') -> None:
+ super().__init__(name, mac, keywords)
+ self.info: Optional[Dict] = None
+ self.info_ts: Optional[datetime.datetime] = None
+
+ @memoized
+ def get_tplink_name(self) -> Optional[str]:
+ self.info = self.get_info()
+ if self.info is not None:
+ return self.info["alias"]
+ return None
+
+ def get_cmdline(self) -> str:
+ cmd = (
+ f"{config.config['smart_outlets_tplink_location']} -m {self.mac} "
+ f"--no_logging_console "
+ )
+ return cmd
+
+ def command(self, cmd: str, extra_args: str = None) -> bool:
+ cmd = self.get_cmdline() + f"-c {cmd}"
+ if extra_args is not None:
+ cmd += f" {extra_args}"
+ return tplink_outlet_command(cmd)
+
+ def turn_on(self) -> bool:
+ return self.command('on')
+
+ def turn_off(self) -> bool:
+ return self.command('off')
+
+ def is_on(self) -> bool:
+ return self.get_on_duration_seconds() > 0
+
+ def is_off(self) -> bool:
+ return not self.is_on()
+
+ @timeout(
+ 10.0, use_signals=False, error_message="Timed out waiting for tplink.py"
+ )
+ def get_info(self) -> Optional[Dict]:
+ cmd = self.get_cmdline() + "-c info"
+ out = subprocess.getoutput(cmd)
+ out = re.sub("Sent:.*\n", "", out)
+ out = re.sub("Received: *", "", out)
+ try:
+ self.info = json.loads(out)["system"]["get_sysinfo"]
+ self.info_ts = datetime.datetime.now()
+ return self.info
+ except Exception as e:
+ logger.exception(e)
+ print(out, file=sys.stderr)
+ self.info = None
+ self.info_ts = None
+ return None
+
+ def get_on_duration_seconds(self) -> int:
+ self.info = self.get_info()
+ if self.info is None:
+ return 0
+ return int(self.info.get("on_time", "0"))
+
+
+class TPLinkOutletWithChildren(TPLinkOutlet):
+ def __init__(self, name: str, mac: str, keywords: str = "") -> None:
+ super().__init__(name, mac, keywords)
+ self.children: List[str] = []
+ self.info: Optional[Dict] = None
+ self.info_ts: Optional[datetime.datetime] = None
+ assert "children" in self.keywords
+ self.info = self.get_info()
+ if self.info is not None:
+ for child in self.info["children"]:
+ self.children.append(child["id"])
+
+ # override
+ def get_cmdline(self, child: Optional[str] = None) -> str:
+ cmd = (
+ f"{config.config['smart_outlets_tplink_location']} -m {self.mac} "
+ f"--no_logging_console "
+ )
+ if child is not None:
+ cmd += f"-x {child} "
+ return cmd
+
+ # override
+ def command(
+ self, cmd: str, child: str = None, extra_args: str = None
+ ) -> bool:
+ cmd = self.get_cmdline(child) + f"-c {cmd}"
+ if extra_args is not None:
+ cmd += f" {extra_args}"
+ logger.debug(f'About to execute {cmd}')
+ return tplink_light_command(cmd)
+
+ def get_children(self) -> List[str]:
+ return self.children
+
+ def turn_on(self, child: str = None) -> bool:
+ return self.command("on", child)
+
+ def turn_off(self, child: str = None) -> bool:
+ return self.command("off", child)
+
+ def get_on_duration_seconds(self, child: str = None) -> int:
+ self.info = self.get_info()
+ if child is None:
+ if self.info is None:
+ return 0
+ return int(self.info.get("on_time", "0"))
+ else:
+ if self.info is None:
+ return 0
+ for chi in self.info.get("children", {}):
+ if chi["id"] == child:
+ return int(chi.get("on_time", "0"))
+ return 0
+
+
+class GoogleOutlet(BaseOutlet):
+ def __init__(self, name: str, mac: str, keywords: str = "") -> None:
+ super().__init__(name.strip(), mac.strip(), keywords)
+ self.info = None
+
+ def goog_name(self) -> str:
+ name = self.get_name()
+ return name.replace('_', ' ')
+
+ @staticmethod
+ def parse_google_response(response: GoogleResponse) -> bool:
+ return response.success
+
+ def turn_on(self) -> bool:
+ return GoogleOutlet.parse_google_response(
+ ask_google('turn {self.goog_name()} on')
+ )
+
+ def turn_off(self) -> bool:
+ return GoogleOutlet.parse_google_response(
+ ask_google('turn {self.goog_name()} off')
+ )
+
+ def is_on(self) -> bool:
+ r = ask_google(f'is {self.goog_name()} on?')
+ if not r.success:
+ return False
+ return 'is on' in r.audio_transcription
+
+ def is_off(self) -> bool:
+ return not self.is_on()