Since this thing is on the innerwebs I suppose it should have a
[python_utils.git] / smart_home / tplink_utils.py
index d8a75a2c3dd6d1b67808e88f62c4b6c037b885c9..abf4970c3c9593f76b641114716fcff88a782627 100644 (file)
@@ -10,7 +10,7 @@ You may obtain a copy of the License at
 
      http://www.apache.org/licenses/LICENSE-2.0
 
-Modifications by Scott Gasch Copyright 2020-2022 also released under
+Modifications by Scott Gasch Copyright © 2020-2022 also released under
 the Apache 2.0 license as required by the license (see above).
 
 Unless required by applicable law or agreed to in writing, software
@@ -24,20 +24,37 @@ limitations under the License.
 import json
 import logging
 import os
-import re
-import subprocess
-import sys
+import socket
+import time
 from struct import pack
-from typing import Dict, Optional
+from typing import Dict, List, Optional, Tuple
 
 import logging_utils
+import string_utils
 from decorator_utils import timeout
 
 logger = logging.getLogger(__name__)
 
+commands = {
+    "info": '{"system":{"get_sysinfo":{}}}',
+    "on": '{"system":{"set_relay_state":{"state":1}}}',
+    "off": '{"system":{"set_relay_state":{"state":0}}}',
+    "offon": '{"system":{"set_relay_state":{"state":0}}};wait;{"system":{"set_relay_state":{"state":1}}}',
+    "onoff": '{"system":{"set_relay_state":{"state":1}}};wait;{"system":{"set_relay_state":{"state":0}}}',
+    "cloudinfo": '{"cnCloud":{"get_info":{}}}',
+    "wlanscan": '{"netif":{"get_scaninfo":{"refresh":0}}}',
+    "time": '{"time":{"get_time":{}}}',
+    "schedule": '{"schedule":{"get_rules":{}}}',
+    "countdown": '{"count_down":{"get_rules":{}}}',
+    "antitheft": '{"anti_theft":{"get_rules":{}}}',
+    "reboot": '{"system":{"reboot":{"delay":1}}}',
+    "reset": '{"system":{"reset":{"delay":1}}}',
+    "energy": '{"emeter":{"get_realtime":{}}}',
+}
+
 
 @timeout(10.0, use_signals=False, error_message="Timed out waiting for tplink.py")
-def tplink_command(command: str) -> bool:
+def tplink_command_wrapper(command: str) -> bool:
     result = os.system(command)
     signal = result & 0xFF
     if signal != 0:
@@ -56,25 +73,20 @@ def tplink_command(command: str) -> bool:
     return True
 
 
-@timeout(10.0, use_signals=False, error_message="Timed out waiting for tplink.py")
-def tplink_get_info(cmd: str) -> Optional[Dict]:
-    logger.debug('Getting tplink device status via "%s"', cmd)
-    try:
-        out = subprocess.getoutput(cmd)
-        logger.debug('RAW OUT> %s', out)
-        out = re.sub("Sent:.*\n", "", out)
-        out = re.sub("Received: *", "", out)
-        info = json.loads(out)["system"]["get_sysinfo"]
+def tplink_get_info(ip: str, port: int = 9999) -> Optional[Dict]:
+    success, response = communicate_with_device(ip, port, commands['info'], quiet=True)
+    if success:
+        assert len(response) == 1
+        info = json.loads(response[0])["system"]["get_sysinfo"]
         logger.debug("%s", json.dumps(info, indent=4, sort_keys=True))
         return info
-    except Exception as e:
-        logger.exception(e)
-        print(out, file=sys.stderr)
-        return None
+    return None
 
 
 def encrypt(string: str) -> bytes:
-    """Encryption and Decryption of TP-Link Smart Home Protocol.
+    """Encryption and Decryption of TP-Link Smart Home Protocol.  Note:
+    this introduces one dword (4 bytes) or crap in the front of the
+    message, the 171 key.
 
     XOR Autokey Cipher with starting key = 171
 
@@ -92,7 +104,8 @@ def encrypt(string: str) -> bytes:
 
 
 def decrypt(string: bytes) -> str:
-    """Opposite of encrypt (above).
+    """Opposite of encrypt (above).  Note: encrypt introduces 4 bytes of
+    crap in the front of the message.
 
     >>> b = encrypt('hi, mom')
     >>> s = decrypt(b[4:])
@@ -109,6 +122,78 @@ def decrypt(string: bytes) -> str:
     return result
 
 
+@timeout(10, error_message="Timed out comunicating with device.")
+def communicate_with_device(
+    ip: str,
+    port: int,
+    cmd: str,
+    *,
+    quiet: bool = False,
+    brief: bool = True,
+) -> Tuple[bool, List[str]]:
+    """Given an IP address and port, open a socket, encrypt cmd, and sent it to
+    the device.  Read a response, decrypt it and parse it.
+
+    """
+    if string_utils.is_none_or_empty(ip) or not string_utils.is_ip_v4(ip):
+        raise ValueError(f"Invalid IP address. ({ip})")
+    if string_utils.is_none_or_empty(cmd):
+        raise ValueError(f"Invalid cmd ({cmd}).")
+    all_responses = []
+
+    try:
+        sock_tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        sock_tcp.connect((ip, port))
+        logger.debug('Connected to %s:%s; running %s...', ip, port, cmd)
+        for c in cmd.split(";"):
+            if c == "wait":
+                if not quiet and not brief:
+                    print("Sleeping 3000ms...")
+                time.sleep(3.0)
+                continue
+
+            raw_request = c
+            encrypted_raw_request = encrypt(raw_request)
+            sock_tcp.send(encrypted_raw_request)
+            raw_response = sock_tcp.recv(2048)
+            decrypted_raw_response = decrypt(raw_response[4:])
+            all_responses.append(decrypted_raw_response)
+
+            if not quiet:
+                json_request = json.loads(raw_request)
+                json_request = json.dumps(json_request, sort_keys=True, indent=4)
+                print(f'Sent: "{json_request}"')
+                if not brief:
+                    raw = ''
+                    for b in encrypted_raw_request:
+                        raw += f'{b:02X} '
+                    logger.debug('Sent raw: "%s"', raw)
+
+                # Note: 4 bytes of garbage (the key)
+                json_response = json.loads(decrypted_raw_response)
+                json_response = json.dumps(json_response, sort_keys=True, indent=4)
+                print(f'Received: "{json_response}"')
+                if not brief:
+                    raw = ''
+                    for b in raw_response:
+                        raw += f'{b:02X} '
+                    logger.debug('Received raw: "%s"', raw)
+
+            if (
+                '"err_code":0' not in decrypted_raw_response
+                and '"err_code": 0' not in decrypted_raw_response
+            ):
+                logger.error("Did not see clean err_code in response?!")
+                return (False, all_responses)
+        logger.debug('All commands succeeded.')
+        return (True, all_responses)
+    except socket.error:
+        logger.error("Cound not connect to host %s:%s", ip, port)
+        return (False, all_responses)
+    finally:
+        sock_tcp.close()
+
+
 if __name__ == '__main__':
     import doctest