Smart outlets
[python_utils.git] / smart_home / outlets.py
diff --git a/smart_home/outlets.py b/smart_home/outlets.py
new file mode 100644 (file)
index 0000000..527c52c
--- /dev/null
@@ -0,0 +1,229 @@
+#!/usr/bin/env python3
+
+"""Utilities for dealing with the smart outlets."""
+
+from abc import abstractmethod
+import datetime
+import json
+import logging
+import os
+import re
+import subprocess
+import sys
+from typing import Any, Dict, List, Optional, Set
+
+import argparse_utils
+import config
+import logging_utils
+import smart_home.device as dev
+from google_assistant import ask_google, GoogleResponse
+from decorator_utils import timeout, memoized
+
+logger = logging.getLogger(__name__)
+
+parser = config.add_commandline_args(
+    f"Outlet Utils ({__file__})",
+    "Args related to smart outlets.",
+)
+parser.add_argument(
+    '--smart_outlets_tplink_location',
+    default='/home/scott/bin/tplink.py',
+    metavar='FILENAME',
+    help='The location of the tplink.py helper',
+    type=argparse_utils.valid_filename,
+)
+
+
+@timeout(
+    5.0, use_signals=False, error_message="Timed out waiting for tplink.py"
+)
+def tplink_outlet_command(command: str) -> bool:
+    result = os.system(command)
+    signal = result & 0xFF
+    if signal != 0:
+        logger.warning(f'{command} died with signal {signal}')
+        logging_utils.hlog("%s died with signal %d" % (command, signal))
+        return False
+    else:
+        exit_value = result >> 8
+        if exit_value != 0:
+            logger.warning(f'{command} failed, exited {exit_value}')
+            logging_utils.hlog("%s failed, exit %d" % (command, exit_value))
+            return False
+    logger.debug(f'{command} succeeded.')
+    return True
+
+
+class BaseOutlet(dev.Device):
+    def __init__(self, name: str, mac: str, keywords: str = "") -> None:
+        super().__init__(name.strip(), mac.strip(), keywords)
+        self.info = None
+
+    @abstractmethod
+    def turn_on(self) -> bool:
+        pass
+
+    @abstractmethod
+    def turn_off(self) -> bool:
+        pass
+
+    @abstractmethod
+    def is_on(self) -> bool:
+        pass
+
+    @abstractmethod
+    def is_off(self) -> bool:
+        pass
+
+
+class TPLinkOutlet(BaseOutlet):
+    def __init__(self, name: str, mac: str, keywords: str = '') -> None:
+        super().__init__(name, mac, keywords)
+        self.info: Optional[Dict] = None
+        self.info_ts: Optional[datetime.datetime] = None
+
+    @memoized
+    def get_tplink_name(self) -> Optional[str]:
+        self.info = self.get_info()
+        if self.info is not None:
+            return self.info["alias"]
+        return None
+
+    def get_cmdline(self) -> str:
+        cmd = (
+            f"{config.config['smart_outlets_tplink_location']} -m {self.mac} "
+            f"--no_logging_console "
+        )
+        return cmd
+
+    def command(self, cmd: str, extra_args: str = None) -> bool:
+        cmd = self.get_cmdline() + f"-c {cmd}"
+        if extra_args is not None:
+            cmd += f" {extra_args}"
+        return tplink_outlet_command(cmd)
+
+    def turn_on(self) -> bool:
+        return self.command('on')
+
+    def turn_off(self) -> bool:
+        return self.command('off')
+
+    def is_on(self) -> bool:
+        return self.get_on_duration_seconds() > 0
+
+    def is_off(self) -> bool:
+        return not self.is_on()
+
+    @timeout(
+        10.0, use_signals=False, error_message="Timed out waiting for tplink.py"
+    )
+    def get_info(self) -> Optional[Dict]:
+        cmd = self.get_cmdline() + "-c info"
+        out = subprocess.getoutput(cmd)
+        out = re.sub("Sent:.*\n", "", out)
+        out = re.sub("Received: *", "", out)
+        try:
+            self.info = json.loads(out)["system"]["get_sysinfo"]
+            self.info_ts = datetime.datetime.now()
+            return self.info
+        except Exception as e:
+            logger.exception(e)
+            print(out, file=sys.stderr)
+            self.info = None
+            self.info_ts = None
+            return None
+
+    def get_on_duration_seconds(self) -> int:
+        self.info = self.get_info()
+        if self.info is None:
+            return 0
+        return int(self.info.get("on_time", "0"))
+
+
+class TPLinkOutletWithChildren(TPLinkOutlet):
+    def __init__(self, name: str, mac: str, keywords: str = "") -> None:
+        super().__init__(name, mac, keywords)
+        self.children: List[str] = []
+        self.info: Optional[Dict] = None
+        self.info_ts: Optional[datetime.datetime] = None
+        assert "children" in self.keywords
+        self.info = self.get_info()
+        if self.info is not None:
+            for child in self.info["children"]:
+                self.children.append(child["id"])
+
+    # override
+    def get_cmdline(self, child: Optional[str] = None) -> str:
+        cmd = (
+            f"{config.config['smart_outlets_tplink_location']} -m {self.mac} "
+            f"--no_logging_console "
+        )
+        if child is not None:
+            cmd += f"-x {child} "
+        return cmd
+
+    # override
+    def command(
+        self, cmd: str, child: str = None, extra_args: str = None
+    ) -> bool:
+        cmd = self.get_cmdline(child) + f"-c {cmd}"
+        if extra_args is not None:
+            cmd += f" {extra_args}"
+        logger.debug(f'About to execute {cmd}')
+        return tplink_light_command(cmd)
+
+    def get_children(self) -> List[str]:
+        return self.children
+
+    def turn_on(self, child: str = None) -> bool:
+        return self.command("on", child)
+
+    def turn_off(self, child: str = None) -> bool:
+        return self.command("off", child)
+
+    def get_on_duration_seconds(self, child: str = None) -> int:
+        self.info = self.get_info()
+        if child is None:
+            if self.info is None:
+                return 0
+            return int(self.info.get("on_time", "0"))
+        else:
+            if self.info is None:
+                return 0
+            for chi in self.info.get("children", {}):
+                if chi["id"] == child:
+                    return int(chi.get("on_time", "0"))
+        return 0
+
+
+class GoogleOutlet(BaseOutlet):
+    def __init__(self, name: str, mac: str, keywords: str = "") -> None:
+        super().__init__(name.strip(), mac.strip(), keywords)
+        self.info = None
+
+    def goog_name(self) -> str:
+        name = self.get_name()
+        return name.replace('_', ' ')
+
+    @staticmethod
+    def parse_google_response(response: GoogleResponse) -> bool:
+        return response.success
+
+    def turn_on(self) -> bool:
+        return GoogleOutlet.parse_google_response(
+            ask_google('turn {self.goog_name()} on')
+        )
+
+    def turn_off(self) -> bool:
+        return GoogleOutlet.parse_google_response(
+            ask_google('turn {self.goog_name()} off')
+        )
+
+    def is_on(self) -> bool:
+        r = ask_google(f'is {self.goog_name()} on?')
+        if not r.success:
+            return False
+        return 'is on' in r.audio_transcription
+
+    def is_off(self) -> bool:
+        return not self.is_on()