import logging
import os
import re
+import socket
import subprocess
import sys
+import time
from struct import pack
-from typing import Dict, Optional
+from typing import Any, Dict, Optional
import logging_utils
-from decorator_utils import timeout
+import string_utils
+from decorator_utils import retry_if_false, timeout
logger = logging.getLogger(__name__)
def encrypt(string: str) -> bytes:
- """Encryption and Decryption of TP-Link Smart Home Protocol.
+ """Encryption and Decryption of TP-Link Smart Home Protocol. Note:
+ this introduces one dword (4 bytes) or crap in the front of the
+ message, the 171 key.
XOR Autokey Cipher with starting key = 171
def decrypt(string: bytes) -> str:
- """Opposite of encrypt (above).
+ """Opposite of encrypt (above). Note: encrypt introduces 4 bytes of
+ crap in the front of the message.
>>> b = encrypt('hi, mom')
>>> s = decrypt(b[4:])
return result
+def item_generator(json_input: Any, lookup_key: str):
+ """Walk through some JSON recursively looking for a key. Used by the query
+ parameter in communicate_with_device (see below).
+
+ """
+ if isinstance(json_input, dict):
+ for k, v in json_input.items():
+ if k == lookup_key:
+ yield v
+ else:
+ yield from item_generator(v, lookup_key)
+ elif isinstance(json_input, list):
+ for item in json_input:
+ yield from item_generator(item, lookup_key)
+
+
+@timeout(10, error_message="Timed out comunicating with device.")
+@retry_if_false(3)
+def communicate_with_device(
+ ip: str,
+ port: int,
+ cmd: str,
+ query: Optional[str] = None,
+ *,
+ quiet: bool = False,
+ brief: bool = True,
+) -> bool:
+
+ if string_utils.is_none_or_empty(ip) or not string_utils.is_ip_v4(ip):
+ raise ValueError(f"Invalid IP address. ({ip})")
+ if string_utils.is_none_or_empty(cmd):
+ raise ValueError(f"Invalid cmd ({cmd}).")
+
+ try:
+ sock_tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ sock_tcp.connect((ip, port))
+ logger.info('Connected to %s:%s; running %s...', ip, port, cmd)
+ for c in cmd.split(";"):
+ if c == "wait":
+ if not quiet and not brief:
+ print("Sleeping 3000ms...")
+ time.sleep(3.0)
+ continue
+
+ raw_request = c
+ encrypted_raw_request = encrypt(raw_request)
+ sock_tcp.send(encrypted_raw_request)
+ raw_response = sock_tcp.recv(2048)
+ if not quiet:
+ json_request = json.loads(raw_request)
+ json_request = json.dumps(json_request, sort_keys=True, indent=4)
+ logger.debug('Sent: "%s"', json_request)
+ if not brief:
+ raw = ''
+ for b in encrypted_raw_request:
+ raw += '%02X ' % b
+ logger.debug('Sent raw: "%s"', raw)
+
+ # Note: 4 bytes of garbage (the key)
+ decrypted_raw_response = decrypt(raw_response[4:])
+ if not quiet:
+ json_response = json.loads(decrypted_raw_response)
+ json_response = json.dumps(json_response, sort_keys=True, indent=4)
+ logger.debug('Received: "%s"', json_response)
+ if not brief:
+ raw = ''
+ for b in raw_response:
+ raw += '%02X ' % b
+ logger.debug('Received raw: "%s"', raw)
+
+ if query is not None:
+ j = json.loads(decrypted_raw_response)
+ for subquery in query:
+ for q in subquery.split(","):
+ for v in item_generator(j, q):
+ if not brief:
+ print(f"{q}:", end="")
+ print(f"{v}")
+
+ sock_tcp.close()
+ if '"err_code":0' not in decrypted_raw_response:
+ if '"err_code": 0' not in decrypted_raw_response:
+ logger.error("Did not see clean err_code in response?!")
+ return False
+ return True
+ except socket.error:
+ logger.error("Cound not connect to host %s:%s", ip, port)
+ return False
+
+
if __name__ == '__main__':
import doctest