Since this thing is on the innerwebs I suppose it should have a
[python_utils.git] / smart_home / tplink_utils.py
index 932a3c76b62ae86555104d6e70ff4f9647c89929..abf4970c3c9593f76b641114716fcff88a782627 100644 (file)
@@ -10,7 +10,7 @@ You may obtain a copy of the License at
 
      http://www.apache.org/licenses/LICENSE-2.0
 
-Modifications by Scott Gasch Copyright 2020-2022 also released under
+Modifications by Scott Gasch Copyright © 2020-2022 also released under
 the Apache 2.0 license as required by the license (see above).
 
 Unless required by applicable law or agreed to in writing, software
@@ -24,23 +24,37 @@ limitations under the License.
 import json
 import logging
 import os
-import re
 import socket
-import subprocess
-import sys
 import time
 from struct import pack
-from typing import Any, Dict, Optional
+from typing import Dict, List, Optional, Tuple
 
 import logging_utils
 import string_utils
-from decorator_utils import retry_if_false, timeout
+from decorator_utils import timeout
 
 logger = logging.getLogger(__name__)
 
+commands = {
+    "info": '{"system":{"get_sysinfo":{}}}',
+    "on": '{"system":{"set_relay_state":{"state":1}}}',
+    "off": '{"system":{"set_relay_state":{"state":0}}}',
+    "offon": '{"system":{"set_relay_state":{"state":0}}};wait;{"system":{"set_relay_state":{"state":1}}}',
+    "onoff": '{"system":{"set_relay_state":{"state":1}}};wait;{"system":{"set_relay_state":{"state":0}}}',
+    "cloudinfo": '{"cnCloud":{"get_info":{}}}',
+    "wlanscan": '{"netif":{"get_scaninfo":{"refresh":0}}}',
+    "time": '{"time":{"get_time":{}}}',
+    "schedule": '{"schedule":{"get_rules":{}}}',
+    "countdown": '{"count_down":{"get_rules":{}}}',
+    "antitheft": '{"anti_theft":{"get_rules":{}}}',
+    "reboot": '{"system":{"reboot":{"delay":1}}}',
+    "reset": '{"system":{"reset":{"delay":1}}}',
+    "energy": '{"emeter":{"get_realtime":{}}}',
+}
+
 
 @timeout(10.0, use_signals=False, error_message="Timed out waiting for tplink.py")
-def tplink_command(command: str) -> bool:
+def tplink_command_wrapper(command: str) -> bool:
     result = os.system(command)
     signal = result & 0xFF
     if signal != 0:
@@ -59,21 +73,14 @@ def tplink_command(command: str) -> bool:
     return True
 
 
-@timeout(10.0, use_signals=False, error_message="Timed out waiting for tplink.py")
-def tplink_get_info(cmd: str) -> Optional[Dict]:
-    logger.debug('Getting tplink device status via "%s"', cmd)
-    try:
-        out = subprocess.getoutput(cmd)
-        logger.debug('RAW OUT> %s', out)
-        out = re.sub("Sent:.*\n", "", out)
-        out = re.sub("Received: *", "", out)
-        info = json.loads(out)["system"]["get_sysinfo"]
+def tplink_get_info(ip: str, port: int = 9999) -> Optional[Dict]:
+    success, response = communicate_with_device(ip, port, commands['info'], quiet=True)
+    if success:
+        assert len(response) == 1
+        info = json.loads(response[0])["system"]["get_sysinfo"]
         logger.debug("%s", json.dumps(info, indent=4, sort_keys=True))
         return info
-    except Exception as e:
-        logger.exception(e)
-        print(out, file=sys.stderr)
-        return None
+    return None
 
 
 def encrypt(string: str) -> bytes:
@@ -115,43 +122,29 @@ def decrypt(string: bytes) -> str:
     return result
 
 
-def item_generator(json_input: Any, lookup_key: str):
-    """Walk through some JSON recursively looking for a key.  Used by the query
-    parameter in communicate_with_device (see below).
-
-    """
-    if isinstance(json_input, dict):
-        for k, v in json_input.items():
-            if k == lookup_key:
-                yield v
-            else:
-                yield from item_generator(v, lookup_key)
-    elif isinstance(json_input, list):
-        for item in json_input:
-            yield from item_generator(item, lookup_key)
-
-
 @timeout(10, error_message="Timed out comunicating with device.")
-@retry_if_false(3)
 def communicate_with_device(
     ip: str,
     port: int,
     cmd: str,
-    query: Optional[str] = None,
     *,
     quiet: bool = False,
     brief: bool = True,
-) -> bool:
+) -> Tuple[bool, List[str]]:
+    """Given an IP address and port, open a socket, encrypt cmd, and sent it to
+    the device.  Read a response, decrypt it and parse it.
 
+    """
     if string_utils.is_none_or_empty(ip) or not string_utils.is_ip_v4(ip):
         raise ValueError(f"Invalid IP address. ({ip})")
     if string_utils.is_none_or_empty(cmd):
         raise ValueError(f"Invalid cmd ({cmd}).")
+    all_responses = []
 
     try:
         sock_tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
         sock_tcp.connect((ip, port))
-        logger.info('Connected to %s:%s; running %s...', ip, port, cmd)
+        logger.debug('Connected to %s:%s; running %s...', ip, port, cmd)
         for c in cmd.split(";"):
             if c == "wait":
                 if not quiet and not brief:
@@ -163,46 +156,42 @@ def communicate_with_device(
             encrypted_raw_request = encrypt(raw_request)
             sock_tcp.send(encrypted_raw_request)
             raw_response = sock_tcp.recv(2048)
+            decrypted_raw_response = decrypt(raw_response[4:])
+            all_responses.append(decrypted_raw_response)
+
             if not quiet:
                 json_request = json.loads(raw_request)
                 json_request = json.dumps(json_request, sort_keys=True, indent=4)
-                logger.debug('Sent: "%s"', json_request)
+                print(f'Sent: "{json_request}"')
                 if not brief:
                     raw = ''
                     for b in encrypted_raw_request:
-                        raw += '%02X ' % b
+                        raw += f'{b:02X} '
                     logger.debug('Sent raw: "%s"', raw)
 
                 # Note: 4 bytes of garbage (the key)
-                decrypted_raw_response = decrypt(raw_response[4:])
-                if not quiet:
-                    json_response = json.loads(decrypted_raw_response)
-                    json_response = json.dumps(json_response, sort_keys=True, indent=4)
-                    logger.debug('Received: "%s"', json_response)
-                    if not brief:
-                        raw = ''
-                        for b in raw_response:
-                            raw += '%02X ' % b
-                        logger.debug('Received raw: "%s"', raw)
-
-                if query is not None:
-                    j = json.loads(decrypted_raw_response)
-                    for subquery in query:
-                        for q in subquery.split(","):
-                            for v in item_generator(j, q):
-                                if not brief:
-                                    print(f"{q}:", end="")
-                                print(f"{v}")
-
-        sock_tcp.close()
-        if '"err_code":0' not in decrypted_raw_response:
-            if '"err_code": 0' not in decrypted_raw_response:
+                json_response = json.loads(decrypted_raw_response)
+                json_response = json.dumps(json_response, sort_keys=True, indent=4)
+                print(f'Received: "{json_response}"')
+                if not brief:
+                    raw = ''
+                    for b in raw_response:
+                        raw += f'{b:02X} '
+                    logger.debug('Received raw: "%s"', raw)
+
+            if (
+                '"err_code":0' not in decrypted_raw_response
+                and '"err_code": 0' not in decrypted_raw_response
+            ):
                 logger.error("Did not see clean err_code in response?!")
-                return False
-        return True
+                return (False, all_responses)
+        logger.debug('All commands succeeded.')
+        return (True, all_responses)
     except socket.error:
         logger.error("Cound not connect to host %s:%s", ip, port)
-        return False
+        return (False, all_responses)
+    finally:
+        sock_tcp.close()
 
 
 if __name__ == '__main__':