From: Scott Gasch Date: Sat, 26 Feb 2022 18:45:11 +0000 (-0800) Subject: Moved more logic from tplink.py into a library. X-Git-Url: https://wannabe.guru.org/gitweb/?a=commitdiff_plain;h=7627b3efb55f489f92fb718394b5651a137974f5;p=python_utils.git Moved more logic from tplink.py into a library. --- diff --git a/smart_home/tplink_utils.py b/smart_home/tplink_utils.py index 932a3c7..9b8eb6d 100644 --- a/smart_home/tplink_utils.py +++ b/smart_home/tplink_utils.py @@ -30,14 +30,31 @@ import subprocess import sys import time from struct import pack -from typing import Any, Dict, Optional +from typing import Dict, List, Optional, Tuple import logging_utils import string_utils -from decorator_utils import retry_if_false, timeout +from decorator_utils import timeout logger = logging.getLogger(__name__) +commands = { + "info": '{"system":{"get_sysinfo":{}}}', + "on": '{"system":{"set_relay_state":{"state":1}}}', + "off": '{"system":{"set_relay_state":{"state":0}}}', + "offon": '{"system":{"set_relay_state":{"state":0}}};wait;{"system":{"set_relay_state":{"state":1}}}', + "onoff": '{"system":{"set_relay_state":{"state":1}}};wait;{"system":{"set_relay_state":{"state":0}}}', + "cloudinfo": '{"cnCloud":{"get_info":{}}}', + "wlanscan": '{"netif":{"get_scaninfo":{"refresh":0}}}', + "time": '{"time":{"get_time":{}}}', + "schedule": '{"schedule":{"get_rules":{}}}', + "countdown": '{"count_down":{"get_rules":{}}}', + "antitheft": '{"anti_theft":{"get_rules":{}}}', + "reboot": '{"system":{"reboot":{"delay":1}}}', + "reset": '{"system":{"reset":{"delay":1}}}', + "energy": '{"emeter":{"get_realtime":{}}}', +} + @timeout(10.0, use_signals=False, error_message="Timed out waiting for tplink.py") def tplink_command(command: str) -> bool: @@ -115,43 +132,29 @@ def decrypt(string: bytes) -> str: return result -def item_generator(json_input: Any, lookup_key: str): - """Walk through some JSON recursively looking for a key. Used by the query - parameter in communicate_with_device (see below). - - """ - if isinstance(json_input, dict): - for k, v in json_input.items(): - if k == lookup_key: - yield v - else: - yield from item_generator(v, lookup_key) - elif isinstance(json_input, list): - for item in json_input: - yield from item_generator(item, lookup_key) - - @timeout(10, error_message="Timed out comunicating with device.") -@retry_if_false(3) def communicate_with_device( ip: str, port: int, cmd: str, - query: Optional[str] = None, *, quiet: bool = False, brief: bool = True, -) -> bool: +) -> Tuple[bool, List[str]]: + """Given an IP address and port, open a socket, encrypt cmd, and sent it to + the device. Read a response, decrypt it and parse it. + """ if string_utils.is_none_or_empty(ip) or not string_utils.is_ip_v4(ip): raise ValueError(f"Invalid IP address. ({ip})") if string_utils.is_none_or_empty(cmd): raise ValueError(f"Invalid cmd ({cmd}).") + all_responses = [] try: sock_tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock_tcp.connect((ip, port)) - logger.info('Connected to %s:%s; running %s...', ip, port, cmd) + logger.debug('Connected to %s:%s; running %s...', ip, port, cmd) for c in cmd.split(";"): if c == "wait": if not quiet and not brief: @@ -163,10 +166,13 @@ def communicate_with_device( encrypted_raw_request = encrypt(raw_request) sock_tcp.send(encrypted_raw_request) raw_response = sock_tcp.recv(2048) + decrypted_raw_response = decrypt(raw_response[4:]) + all_responses.append(decrypted_raw_response) + if not quiet: json_request = json.loads(raw_request) json_request = json.dumps(json_request, sort_keys=True, indent=4) - logger.debug('Sent: "%s"', json_request) + print(f'Sent: "{json_request}"') if not brief: raw = '' for b in encrypted_raw_request: @@ -174,35 +180,26 @@ def communicate_with_device( logger.debug('Sent raw: "%s"', raw) # Note: 4 bytes of garbage (the key) - decrypted_raw_response = decrypt(raw_response[4:]) - if not quiet: - json_response = json.loads(decrypted_raw_response) - json_response = json.dumps(json_response, sort_keys=True, indent=4) - logger.debug('Received: "%s"', json_response) - if not brief: - raw = '' - for b in raw_response: - raw += '%02X ' % b - logger.debug('Received raw: "%s"', raw) - - if query is not None: - j = json.loads(decrypted_raw_response) - for subquery in query: - for q in subquery.split(","): - for v in item_generator(j, q): - if not brief: - print(f"{q}:", end="") - print(f"{v}") - - sock_tcp.close() - if '"err_code":0' not in decrypted_raw_response: - if '"err_code": 0' not in decrypted_raw_response: - logger.error("Did not see clean err_code in response?!") - return False - return True + json_response = json.loads(decrypted_raw_response) + json_response = json.dumps(json_response, sort_keys=True, indent=4) + print(f'Received: "{json_response}"') + if not brief: + raw = '' + for b in raw_response: + raw += '%02X ' % b + logger.debug('Received raw: "%s"', raw) + + if '"err_code":0' not in decrypted_raw_response: + if '"err_code": 0' not in decrypted_raw_response: + logger.error("Did not see clean err_code in response?!") + return (False, all_responses) + logger.debug('All commands succeeded, returning True.') + return (True, all_responses) except socket.error: logger.error("Cound not connect to host %s:%s", ip, port) - return False + return (False, all_responses) + finally: + sock_tcp.close() if __name__ == '__main__':