Moving towards not needing a separate tplink.py utility.
authorScott Gasch <[email protected]>
Fri, 25 Feb 2022 19:03:14 +0000 (11:03 -0800)
committerScott Gasch <[email protected]>
Fri, 25 Feb 2022 19:03:14 +0000 (11:03 -0800)
smart_home/tplink_utils.py

index d8a75a2c3dd6d1b67808e88f62c4b6c037b885c9..932a3c76b62ae86555104d6e70ff4f9647c89929 100644 (file)
@@ -25,13 +25,16 @@ import json
 import logging
 import os
 import re
+import socket
 import subprocess
 import sys
+import time
 from struct import pack
-from typing import Dict, Optional
+from typing import Any, Dict, Optional
 
 import logging_utils
-from decorator_utils import timeout
+import string_utils
+from decorator_utils import retry_if_false, timeout
 
 logger = logging.getLogger(__name__)
 
@@ -74,7 +77,9 @@ def tplink_get_info(cmd: str) -> Optional[Dict]:
 
 
 def encrypt(string: str) -> bytes:
-    """Encryption and Decryption of TP-Link Smart Home Protocol.
+    """Encryption and Decryption of TP-Link Smart Home Protocol.  Note:
+    this introduces one dword (4 bytes) or crap in the front of the
+    message, the 171 key.
 
     XOR Autokey Cipher with starting key = 171
 
@@ -92,7 +97,8 @@ def encrypt(string: str) -> bytes:
 
 
 def decrypt(string: bytes) -> str:
-    """Opposite of encrypt (above).
+    """Opposite of encrypt (above).  Note: encrypt introduces 4 bytes of
+    crap in the front of the message.
 
     >>> b = encrypt('hi, mom')
     >>> s = decrypt(b[4:])
@@ -109,6 +115,96 @@ def decrypt(string: bytes) -> str:
     return result
 
 
+def item_generator(json_input: Any, lookup_key: str):
+    """Walk through some JSON recursively looking for a key.  Used by the query
+    parameter in communicate_with_device (see below).
+
+    """
+    if isinstance(json_input, dict):
+        for k, v in json_input.items():
+            if k == lookup_key:
+                yield v
+            else:
+                yield from item_generator(v, lookup_key)
+    elif isinstance(json_input, list):
+        for item in json_input:
+            yield from item_generator(item, lookup_key)
+
+
+@timeout(10, error_message="Timed out comunicating with device.")
+@retry_if_false(3)
+def communicate_with_device(
+    ip: str,
+    port: int,
+    cmd: str,
+    query: Optional[str] = None,
+    *,
+    quiet: bool = False,
+    brief: bool = True,
+) -> bool:
+
+    if string_utils.is_none_or_empty(ip) or not string_utils.is_ip_v4(ip):
+        raise ValueError(f"Invalid IP address. ({ip})")
+    if string_utils.is_none_or_empty(cmd):
+        raise ValueError(f"Invalid cmd ({cmd}).")
+
+    try:
+        sock_tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        sock_tcp.connect((ip, port))
+        logger.info('Connected to %s:%s; running %s...', ip, port, cmd)
+        for c in cmd.split(";"):
+            if c == "wait":
+                if not quiet and not brief:
+                    print("Sleeping 3000ms...")
+                time.sleep(3.0)
+                continue
+
+            raw_request = c
+            encrypted_raw_request = encrypt(raw_request)
+            sock_tcp.send(encrypted_raw_request)
+            raw_response = sock_tcp.recv(2048)
+            if not quiet:
+                json_request = json.loads(raw_request)
+                json_request = json.dumps(json_request, sort_keys=True, indent=4)
+                logger.debug('Sent: "%s"', json_request)
+                if not brief:
+                    raw = ''
+                    for b in encrypted_raw_request:
+                        raw += '%02X ' % b
+                    logger.debug('Sent raw: "%s"', raw)
+
+                # Note: 4 bytes of garbage (the key)
+                decrypted_raw_response = decrypt(raw_response[4:])
+                if not quiet:
+                    json_response = json.loads(decrypted_raw_response)
+                    json_response = json.dumps(json_response, sort_keys=True, indent=4)
+                    logger.debug('Received: "%s"', json_response)
+                    if not brief:
+                        raw = ''
+                        for b in raw_response:
+                            raw += '%02X ' % b
+                        logger.debug('Received raw: "%s"', raw)
+
+                if query is not None:
+                    j = json.loads(decrypted_raw_response)
+                    for subquery in query:
+                        for q in subquery.split(","):
+                            for v in item_generator(j, q):
+                                if not brief:
+                                    print(f"{q}:", end="")
+                                print(f"{v}")
+
+        sock_tcp.close()
+        if '"err_code":0' not in decrypted_raw_response:
+            if '"err_code": 0' not in decrypted_raw_response:
+                logger.error("Did not see clean err_code in response?!")
+                return False
+        return True
+    except socket.error:
+        logger.error("Cound not connect to host %s:%s", ip, port)
+        return False
+
+
 if __name__ == '__main__':
     import doctest