From 79db199260420d84a83788acabf20c1d3cf7cc18 Mon Sep 17 00:00:00 2001 From: Scott Gasch Date: Fri, 25 Feb 2022 11:03:14 -0800 Subject: [PATCH] Moving towards not needing a separate tplink.py utility. --- smart_home/tplink_utils.py | 104 +++++++++++++++++++++++++++++++++++-- 1 file changed, 100 insertions(+), 4 deletions(-) diff --git a/smart_home/tplink_utils.py b/smart_home/tplink_utils.py index d8a75a2..932a3c7 100644 --- a/smart_home/tplink_utils.py +++ b/smart_home/tplink_utils.py @@ -25,13 +25,16 @@ import json import logging import os import re +import socket import subprocess import sys +import time from struct import pack -from typing import Dict, Optional +from typing import Any, Dict, Optional import logging_utils -from decorator_utils import timeout +import string_utils +from decorator_utils import retry_if_false, timeout logger = logging.getLogger(__name__) @@ -74,7 +77,9 @@ def tplink_get_info(cmd: str) -> Optional[Dict]: def encrypt(string: str) -> bytes: - """Encryption and Decryption of TP-Link Smart Home Protocol. + """Encryption and Decryption of TP-Link Smart Home Protocol. Note: + this introduces one dword (4 bytes) or crap in the front of the + message, the 171 key. XOR Autokey Cipher with starting key = 171 @@ -92,7 +97,8 @@ def encrypt(string: str) -> bytes: def decrypt(string: bytes) -> str: - """Opposite of encrypt (above). + """Opposite of encrypt (above). Note: encrypt introduces 4 bytes of + crap in the front of the message. >>> b = encrypt('hi, mom') >>> s = decrypt(b[4:]) @@ -109,6 +115,96 @@ def decrypt(string: bytes) -> str: return result +def item_generator(json_input: Any, lookup_key: str): + """Walk through some JSON recursively looking for a key. Used by the query + parameter in communicate_with_device (see below). + + """ + if isinstance(json_input, dict): + for k, v in json_input.items(): + if k == lookup_key: + yield v + else: + yield from item_generator(v, lookup_key) + elif isinstance(json_input, list): + for item in json_input: + yield from item_generator(item, lookup_key) + + +@timeout(10, error_message="Timed out comunicating with device.") +@retry_if_false(3) +def communicate_with_device( + ip: str, + port: int, + cmd: str, + query: Optional[str] = None, + *, + quiet: bool = False, + brief: bool = True, +) -> bool: + + if string_utils.is_none_or_empty(ip) or not string_utils.is_ip_v4(ip): + raise ValueError(f"Invalid IP address. ({ip})") + if string_utils.is_none_or_empty(cmd): + raise ValueError(f"Invalid cmd ({cmd}).") + + try: + sock_tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock_tcp.connect((ip, port)) + logger.info('Connected to %s:%s; running %s...', ip, port, cmd) + for c in cmd.split(";"): + if c == "wait": + if not quiet and not brief: + print("Sleeping 3000ms...") + time.sleep(3.0) + continue + + raw_request = c + encrypted_raw_request = encrypt(raw_request) + sock_tcp.send(encrypted_raw_request) + raw_response = sock_tcp.recv(2048) + if not quiet: + json_request = json.loads(raw_request) + json_request = json.dumps(json_request, sort_keys=True, indent=4) + logger.debug('Sent: "%s"', json_request) + if not brief: + raw = '' + for b in encrypted_raw_request: + raw += '%02X ' % b + logger.debug('Sent raw: "%s"', raw) + + # Note: 4 bytes of garbage (the key) + decrypted_raw_response = decrypt(raw_response[4:]) + if not quiet: + json_response = json.loads(decrypted_raw_response) + json_response = json.dumps(json_response, sort_keys=True, indent=4) + logger.debug('Received: "%s"', json_response) + if not brief: + raw = '' + for b in raw_response: + raw += '%02X ' % b + logger.debug('Received raw: "%s"', raw) + + if query is not None: + j = json.loads(decrypted_raw_response) + for subquery in query: + for q in subquery.split(","): + for v in item_generator(j, q): + if not brief: + print(f"{q}:", end="") + print(f"{v}") + + sock_tcp.close() + if '"err_code":0' not in decrypted_raw_response: + if '"err_code": 0' not in decrypted_raw_response: + logger.error("Did not see clean err_code in response?!") + return False + return True + except socket.error: + logger.error("Cound not connect to host %s:%s", ip, port) + return False + + if __name__ == '__main__': import doctest -- 2.47.1