Since this thing is on the innerwebs I suppose it should have a
[python_utils.git] / smart_home / outlets.py
index 8fd09487a48b5c49478bb5d84b91d481dc36adfc..fcc3c4eb8f7fa74d428d31c7ebe4814c5cb11cbd 100644 (file)
@@ -1,30 +1,29 @@
 #!/usr/bin/env python3
 
+# © Copyright 2021-2022, Scott Gasch
+
 """Utilities for dealing with the smart outlets."""
 
-from abc import abstractmethod
 import asyncio
 import atexit
 import datetime
-import json
 import logging
 import os
-import re
-import subprocess
-import sys
+from abc import abstractmethod
 from typing import Any, Dict, List, Optional
 
 from meross_iot.http_api import MerossHttpClient
 from meross_iot.manager import MerossManager
+from overrides import overrides
 
 import argparse_utils
 import config
 import decorator_utils
-import logging_utils
 import scott_secrets
 import smart_home.device as dev
-from google_assistant import ask_google, GoogleResponse
-from decorator_utils import timeout, memoized
+import smart_home.tplink_utils as tplink
+from decorator_utils import memoized
+from google_assistant import GoogleResponse, ask_google
 
 logger = logging.getLogger(__name__)
 
@@ -41,30 +40,11 @@ parser.add_argument(
 )
 
 
-@timeout(
-    5.0, use_signals=False, error_message="Timed out waiting for tplink.py"
-)
-def tplink_outlet_command(command: str) -> bool:
-    result = os.system(command)
-    signal = result & 0xFF
-    if signal != 0:
-        logger.warning(f'{command} died with signal {signal}')
-        logging_utils.hlog("%s died with signal %d" % (command, signal))
-        return False
-    else:
-        exit_value = result >> 8
-        if exit_value != 0:
-            logger.warning(f'{command} failed, exited {exit_value}')
-            logging_utils.hlog("%s failed, exit %d" % (command, exit_value))
-            return False
-    logger.debug(f'{command} succeeded.')
-    return True
-
-
 class BaseOutlet(dev.Device):
+    """An abstract base class for smart outlets."""
+
     def __init__(self, name: str, mac: str, keywords: str = "") -> None:
         super().__init__(name.strip(), mac.strip(), keywords)
-        self.info = None
 
     @abstractmethod
     def turn_on(self) -> bool:
@@ -84,6 +64,8 @@ class BaseOutlet(dev.Device):
 
 
 class TPLinkOutlet(BaseOutlet):
+    """A TPLink smart outlet."""
+
     def __init__(self, name: str, mac: str, keywords: str = '') -> None:
         super().__init__(name, mac, keywords)
         self.info: Optional[Dict] = None
@@ -103,42 +85,38 @@ class TPLinkOutlet(BaseOutlet):
         )
         return cmd
 
-    def command(self, cmd: str, extra_args: str = None) -> bool:
+    def command(self, cmd: str, extra_args: str = None, **kwargs) -> bool:
         cmd = self.get_cmdline() + f"-c {cmd}"
         if extra_args is not None:
             cmd += f" {extra_args}"
-        return tplink_outlet_command(cmd)
+        return tplink.tplink_command_wrapper(cmd)
 
+    @overrides
     def turn_on(self) -> bool:
         return self.command('on')
 
+    @overrides
     def turn_off(self) -> bool:
         return self.command('off')
 
+    @overrides
     def is_on(self) -> bool:
         return self.get_on_duration_seconds() > 0
 
+    @overrides
     def is_off(self) -> bool:
         return not self.is_on()
 
-    @timeout(
-        10.0, use_signals=False, error_message="Timed out waiting for tplink.py"
-    )
     def get_info(self) -> Optional[Dict]:
-        cmd = self.get_cmdline() + "-c info"
-        out = subprocess.getoutput(cmd)
-        out = re.sub("Sent:.*\n", "", out)
-        out = re.sub("Received: *", "", out)
-        try:
-            self.info = json.loads(out)["system"]["get_sysinfo"]
-            self.info_ts = datetime.datetime.now()
+        ip = self.get_ip()
+        if ip is not None:
+            self.info = tplink.tplink_get_info(ip)
+            if self.info is not None:
+                self.info_ts = datetime.datetime.now()
+            else:
+                self.info_ts = None
             return self.info
-        except Exception as e:
-            logger.exception(e)
-            print(out, file=sys.stderr)
-            self.info = None
-            self.info_ts = None
-            return None
+        return None
 
     def get_on_duration_seconds(self) -> int:
         self.info = self.get_info()
@@ -148,53 +126,58 @@ class TPLinkOutlet(BaseOutlet):
 
 
 class TPLinkOutletWithChildren(TPLinkOutlet):
+    """A TPLink outlet where the top and bottom plus are individually
+    controllable."""
+
     def __init__(self, name: str, mac: str, keywords: str = "") -> None:
         super().__init__(name, mac, keywords)
         self.children: List[str] = []
         self.info: Optional[Dict] = None
         self.info_ts: Optional[datetime.datetime] = None
+        assert self.keywords is not None
         assert "children" in self.keywords
         self.info = self.get_info()
         if self.info is not None:
             for child in self.info["children"]:
                 self.children.append(child["id"])
 
-    # override
-    def get_cmdline(self, child: Optional[str] = None) -> str:
-        cmd = (
-            f"{config.config['smart_outlets_tplink_location']} -m {self.mac} "
-            f"--no_logging_console "
-        )
+    def get_cmdline_with_child(self, child: Optional[str] = None) -> str:
+        cmd = super().get_cmdline()
         if child is not None:
             cmd += f"-x {child} "
         return cmd
 
-    # override
-    def command(
-        self, cmd: str, child: str = None, extra_args: str = None
-    ) -> bool:
-        cmd = self.get_cmdline(child) + f"-c {cmd}"
+    @overrides
+    def command(self, cmd: str, extra_args: str = None, **kwargs) -> bool:
+        child: Optional[str] = kwargs.get('child', None)
+        cmd = self.get_cmdline_with_child(child) + f"-c {cmd}"
         if extra_args is not None:
             cmd += f" {extra_args}"
-        logger.debug(f'About to execute {cmd}')
-        return tplink_outlet_command(cmd)
+        logger.debug('About to execute: %s', cmd)
+        return tplink.tplink_command_wrapper(cmd)
 
     def get_children(self) -> List[str]:
         return self.children
 
-    def turn_on(self, child: str = None) -> bool:
+    @overrides
+    def turn_on(self) -> bool:
+        return self.command("on", None)
+
+    @overrides
+    def turn_off(self) -> bool:
+        return self.command("off", None)
+
+    def turn_on_child(self, child: str = None) -> bool:
         return self.command("on", child)
 
-    def turn_off(self, child: str = None) -> bool:
+    def turn_off_child(self, child: str = None) -> bool:
         return self.command("off", child)
 
-    def get_on_duration_seconds(self, child: str = None) -> int:
-        self.info = self.get_info()
+    def get_child_on_duration_seconds(self, child: str = None) -> int:
         if child is None:
-            if self.info is None:
-                return 0
-            return int(self.info.get("on_time", "0"))
+            return super().get_on_duration_seconds()
         else:
+            self.info = self.get_info()
             if self.info is None:
                 return 0
             for chi in self.info.get("children", {}):
@@ -204,6 +187,8 @@ class TPLinkOutletWithChildren(TPLinkOutlet):
 
 
 class GoogleOutlet(BaseOutlet):
+    """A smart outlet controlled via Google Assistant."""
+
     def __init__(self, name: str, mac: str, keywords: str = "") -> None:
         super().__init__(name.strip(), mac.strip(), keywords)
         self.info = None
@@ -216,22 +201,24 @@ class GoogleOutlet(BaseOutlet):
     def parse_google_response(response: GoogleResponse) -> bool:
         return response.success
 
+    @overrides
     def turn_on(self) -> bool:
-        return GoogleOutlet.parse_google_response(
-            ask_google('turn {self.goog_name()} on')
-        )
+        return GoogleOutlet.parse_google_response(ask_google(f'turn {self.goog_name()} on'))
 
+    @overrides
     def turn_off(self) -> bool:
-        return GoogleOutlet.parse_google_response(
-            ask_google('turn {self.goog_name()} off')
-        )
+        return GoogleOutlet.parse_google_response(ask_google(f'turn {self.goog_name()} off'))
 
+    @overrides
     def is_on(self) -> bool:
         r = ask_google(f'is {self.goog_name()} on?')
         if not r.success:
             return False
-        return 'is on' in r.audio_transcription
+        if r.audio_transcription is not None:
+            return 'is on' in r.audio_transcription
+        raise Exception('Can\'t talk to Google right now!?')
 
+    @overrides
     def is_off(self) -> bool:
         return not self.is_on()
 
@@ -245,6 +232,7 @@ class MerossWrapper(object):
     this class.
 
     """
+
     def __init__(self):
         self.loop = asyncio.get_event_loop()
         self.email = os.environ.get('MEROSS_EMAIL') or scott_secrets.MEROSS_EMAIL
@@ -278,10 +266,12 @@ class MerossWrapper(object):
 
 
 class MerossOutlet(BaseOutlet):
+    """A Meross smart outlet class."""
+
     def __init__(self, name: str, mac: str, keywords: str = '') -> None:
         super().__init__(name, mac, keywords)
-        self.meross_wrapper = None
-        self.device = None
+        self.meross_wrapper: Optional[MerossWrapper] = None
+        self.device: Optional[Any] = None
 
     def lazy_initialize_device(self):
         """If we make too many calls to Meross they will block us; only talk
@@ -292,23 +282,28 @@ class MerossOutlet(BaseOutlet):
             if self.device is None:
                 raise Exception(f'{self.name} is not a known Meross device?!')
 
+    @overrides
     def turn_on(self) -> bool:
         self.lazy_initialize_device()
-        self.meross_wrapper.loop.run_until_complete(
-            self.device.async_turn_on()
-        )
+        assert self.meross_wrapper is not None
+        assert self.device is not None
+        self.meross_wrapper.loop.run_until_complete(self.device.async_turn_on())
         return True
 
+    @overrides
     def turn_off(self) -> bool:
         self.lazy_initialize_device()
-        self.meross_wrapper.loop.run_until_complete(
-            self.device.async_turn_off()
-        )
+        assert self.meross_wrapper is not None
+        assert self.device is not None
+        self.meross_wrapper.loop.run_until_complete(self.device.async_turn_off())
         return True
 
+    @overrides
     def is_on(self) -> bool:
         self.lazy_initialize_device()
+        assert self.device is not None
         return self.device.is_on()
 
+    @overrides
     def is_off(self) -> bool:
         return not self.is_on()