Moved more logic from tplink.py into a library.
[python_utils.git] / smart_home / tplink_utils.py
index d8a75a2c3dd6d1b67808e88f62c4b6c037b885c9..9b8eb6da926e5325cbf1da79e7f45a67a725b2ec 100644 (file)
@@ -25,16 +25,36 @@ import json
 import logging
 import os
 import re
+import socket
 import subprocess
 import sys
+import time
 from struct import pack
-from typing import Dict, Optional
+from typing import Dict, List, Optional, Tuple
 
 import logging_utils
+import string_utils
 from decorator_utils import timeout
 
 logger = logging.getLogger(__name__)
 
+commands = {
+    "info": '{"system":{"get_sysinfo":{}}}',
+    "on": '{"system":{"set_relay_state":{"state":1}}}',
+    "off": '{"system":{"set_relay_state":{"state":0}}}',
+    "offon": '{"system":{"set_relay_state":{"state":0}}};wait;{"system":{"set_relay_state":{"state":1}}}',
+    "onoff": '{"system":{"set_relay_state":{"state":1}}};wait;{"system":{"set_relay_state":{"state":0}}}',
+    "cloudinfo": '{"cnCloud":{"get_info":{}}}',
+    "wlanscan": '{"netif":{"get_scaninfo":{"refresh":0}}}',
+    "time": '{"time":{"get_time":{}}}',
+    "schedule": '{"schedule":{"get_rules":{}}}',
+    "countdown": '{"count_down":{"get_rules":{}}}',
+    "antitheft": '{"anti_theft":{"get_rules":{}}}',
+    "reboot": '{"system":{"reboot":{"delay":1}}}',
+    "reset": '{"system":{"reset":{"delay":1}}}',
+    "energy": '{"emeter":{"get_realtime":{}}}',
+}
+
 
 @timeout(10.0, use_signals=False, error_message="Timed out waiting for tplink.py")
 def tplink_command(command: str) -> bool:
@@ -74,7 +94,9 @@ def tplink_get_info(cmd: str) -> Optional[Dict]:
 
 
 def encrypt(string: str) -> bytes:
-    """Encryption and Decryption of TP-Link Smart Home Protocol.
+    """Encryption and Decryption of TP-Link Smart Home Protocol.  Note:
+    this introduces one dword (4 bytes) or crap in the front of the
+    message, the 171 key.
 
     XOR Autokey Cipher with starting key = 171
 
@@ -92,7 +114,8 @@ def encrypt(string: str) -> bytes:
 
 
 def decrypt(string: bytes) -> str:
-    """Opposite of encrypt (above).
+    """Opposite of encrypt (above).  Note: encrypt introduces 4 bytes of
+    crap in the front of the message.
 
     >>> b = encrypt('hi, mom')
     >>> s = decrypt(b[4:])
@@ -109,6 +132,76 @@ def decrypt(string: bytes) -> str:
     return result
 
 
+@timeout(10, error_message="Timed out comunicating with device.")
+def communicate_with_device(
+    ip: str,
+    port: int,
+    cmd: str,
+    *,
+    quiet: bool = False,
+    brief: bool = True,
+) -> Tuple[bool, List[str]]:
+    """Given an IP address and port, open a socket, encrypt cmd, and sent it to
+    the device.  Read a response, decrypt it and parse it.
+
+    """
+    if string_utils.is_none_or_empty(ip) or not string_utils.is_ip_v4(ip):
+        raise ValueError(f"Invalid IP address. ({ip})")
+    if string_utils.is_none_or_empty(cmd):
+        raise ValueError(f"Invalid cmd ({cmd}).")
+    all_responses = []
+
+    try:
+        sock_tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        sock_tcp.connect((ip, port))
+        logger.debug('Connected to %s:%s; running %s...', ip, port, cmd)
+        for c in cmd.split(";"):
+            if c == "wait":
+                if not quiet and not brief:
+                    print("Sleeping 3000ms...")
+                time.sleep(3.0)
+                continue
+
+            raw_request = c
+            encrypted_raw_request = encrypt(raw_request)
+            sock_tcp.send(encrypted_raw_request)
+            raw_response = sock_tcp.recv(2048)
+            decrypted_raw_response = decrypt(raw_response[4:])
+            all_responses.append(decrypted_raw_response)
+
+            if not quiet:
+                json_request = json.loads(raw_request)
+                json_request = json.dumps(json_request, sort_keys=True, indent=4)
+                print(f'Sent: "{json_request}"')
+                if not brief:
+                    raw = ''
+                    for b in encrypted_raw_request:
+                        raw += '%02X ' % b
+                    logger.debug('Sent raw: "%s"', raw)
+
+                # Note: 4 bytes of garbage (the key)
+                json_response = json.loads(decrypted_raw_response)
+                json_response = json.dumps(json_response, sort_keys=True, indent=4)
+                print(f'Received: "{json_response}"')
+                if not brief:
+                    raw = ''
+                    for b in raw_response:
+                        raw += '%02X ' % b
+                    logger.debug('Received raw: "%s"', raw)
+
+            if '"err_code":0' not in decrypted_raw_response:
+                if '"err_code": 0' not in decrypted_raw_response:
+                    logger.error("Did not see clean err_code in response?!")
+                    return (False, all_responses)
+        logger.debug('All commands succeeded, returning True.')
+        return (True, all_responses)
+    except socket.error:
+        logger.error("Cound not connect to host %s:%s", ip, port)
+        return (False, all_responses)
+    finally:
+        sock_tcp.close()
+
+
 if __name__ == '__main__':
     import doctest