Moved more logic from tplink.py into a library.
authorScott Gasch <[email protected]>
Sat, 26 Feb 2022 18:45:11 +0000 (10:45 -0800)
committerScott Gasch <[email protected]>
Sat, 26 Feb 2022 18:45:11 +0000 (10:45 -0800)
smart_home/tplink_utils.py

index 932a3c76b62ae86555104d6e70ff4f9647c89929..9b8eb6da926e5325cbf1da79e7f45a67a725b2ec 100644 (file)
@@ -30,14 +30,31 @@ import subprocess
 import sys
 import time
 from struct import pack
-from typing import Any, Dict, Optional
+from typing import Dict, List, Optional, Tuple
 
 import logging_utils
 import string_utils
-from decorator_utils import retry_if_false, timeout
+from decorator_utils import timeout
 
 logger = logging.getLogger(__name__)
 
+commands = {
+    "info": '{"system":{"get_sysinfo":{}}}',
+    "on": '{"system":{"set_relay_state":{"state":1}}}',
+    "off": '{"system":{"set_relay_state":{"state":0}}}',
+    "offon": '{"system":{"set_relay_state":{"state":0}}};wait;{"system":{"set_relay_state":{"state":1}}}',
+    "onoff": '{"system":{"set_relay_state":{"state":1}}};wait;{"system":{"set_relay_state":{"state":0}}}',
+    "cloudinfo": '{"cnCloud":{"get_info":{}}}',
+    "wlanscan": '{"netif":{"get_scaninfo":{"refresh":0}}}',
+    "time": '{"time":{"get_time":{}}}',
+    "schedule": '{"schedule":{"get_rules":{}}}',
+    "countdown": '{"count_down":{"get_rules":{}}}',
+    "antitheft": '{"anti_theft":{"get_rules":{}}}',
+    "reboot": '{"system":{"reboot":{"delay":1}}}',
+    "reset": '{"system":{"reset":{"delay":1}}}',
+    "energy": '{"emeter":{"get_realtime":{}}}',
+}
+
 
 @timeout(10.0, use_signals=False, error_message="Timed out waiting for tplink.py")
 def tplink_command(command: str) -> bool:
@@ -115,43 +132,29 @@ def decrypt(string: bytes) -> str:
     return result
 
 
-def item_generator(json_input: Any, lookup_key: str):
-    """Walk through some JSON recursively looking for a key.  Used by the query
-    parameter in communicate_with_device (see below).
-
-    """
-    if isinstance(json_input, dict):
-        for k, v in json_input.items():
-            if k == lookup_key:
-                yield v
-            else:
-                yield from item_generator(v, lookup_key)
-    elif isinstance(json_input, list):
-        for item in json_input:
-            yield from item_generator(item, lookup_key)
-
-
 @timeout(10, error_message="Timed out comunicating with device.")
-@retry_if_false(3)
 def communicate_with_device(
     ip: str,
     port: int,
     cmd: str,
-    query: Optional[str] = None,
     *,
     quiet: bool = False,
     brief: bool = True,
-) -> bool:
+) -> Tuple[bool, List[str]]:
+    """Given an IP address and port, open a socket, encrypt cmd, and sent it to
+    the device.  Read a response, decrypt it and parse it.
 
+    """
     if string_utils.is_none_or_empty(ip) or not string_utils.is_ip_v4(ip):
         raise ValueError(f"Invalid IP address. ({ip})")
     if string_utils.is_none_or_empty(cmd):
         raise ValueError(f"Invalid cmd ({cmd}).")
+    all_responses = []
 
     try:
         sock_tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
         sock_tcp.connect((ip, port))
-        logger.info('Connected to %s:%s; running %s...', ip, port, cmd)
+        logger.debug('Connected to %s:%s; running %s...', ip, port, cmd)
         for c in cmd.split(";"):
             if c == "wait":
                 if not quiet and not brief:
@@ -163,10 +166,13 @@ def communicate_with_device(
             encrypted_raw_request = encrypt(raw_request)
             sock_tcp.send(encrypted_raw_request)
             raw_response = sock_tcp.recv(2048)
+            decrypted_raw_response = decrypt(raw_response[4:])
+            all_responses.append(decrypted_raw_response)
+
             if not quiet:
                 json_request = json.loads(raw_request)
                 json_request = json.dumps(json_request, sort_keys=True, indent=4)
-                logger.debug('Sent: "%s"', json_request)
+                print(f'Sent: "{json_request}"')
                 if not brief:
                     raw = ''
                     for b in encrypted_raw_request:
@@ -174,35 +180,26 @@ def communicate_with_device(
                     logger.debug('Sent raw: "%s"', raw)
 
                 # Note: 4 bytes of garbage (the key)
-                decrypted_raw_response = decrypt(raw_response[4:])
-                if not quiet:
-                    json_response = json.loads(decrypted_raw_response)
-                    json_response = json.dumps(json_response, sort_keys=True, indent=4)
-                    logger.debug('Received: "%s"', json_response)
-                    if not brief:
-                        raw = ''
-                        for b in raw_response:
-                            raw += '%02X ' % b
-                        logger.debug('Received raw: "%s"', raw)
-
-                if query is not None:
-                    j = json.loads(decrypted_raw_response)
-                    for subquery in query:
-                        for q in subquery.split(","):
-                            for v in item_generator(j, q):
-                                if not brief:
-                                    print(f"{q}:", end="")
-                                print(f"{v}")
-
-        sock_tcp.close()
-        if '"err_code":0' not in decrypted_raw_response:
-            if '"err_code": 0' not in decrypted_raw_response:
-                logger.error("Did not see clean err_code in response?!")
-                return False
-        return True
+                json_response = json.loads(decrypted_raw_response)
+                json_response = json.dumps(json_response, sort_keys=True, indent=4)
+                print(f'Received: "{json_response}"')
+                if not brief:
+                    raw = ''
+                    for b in raw_response:
+                        raw += '%02X ' % b
+                    logger.debug('Received raw: "%s"', raw)
+
+            if '"err_code":0' not in decrypted_raw_response:
+                if '"err_code": 0' not in decrypted_raw_response:
+                    logger.error("Did not see clean err_code in response?!")
+                    return (False, all_responses)
+        logger.debug('All commands succeeded, returning True.')
+        return (True, all_responses)
     except socket.error:
         logger.error("Cound not connect to host %s:%s", ip, port)
-        return False
+        return (False, all_responses)
+    finally:
+        sock_tcp.close()
 
 
 if __name__ == '__main__':