X-Git-Url: https://wannabe.guru.org/gitweb/?a=blobdiff_plain;f=smart_home%2Ftplink_utils.py;h=abf4970c3c9593f76b641114716fcff88a782627;hb=02302bbd9363facb59c4df2c1f4013087702cfa6;hp=9b8eb6da926e5325cbf1da79e7f45a67a725b2ec;hpb=7627b3efb55f489f92fb718394b5651a137974f5;p=python_utils.git diff --git a/smart_home/tplink_utils.py b/smart_home/tplink_utils.py index 9b8eb6d..abf4970 100644 --- a/smart_home/tplink_utils.py +++ b/smart_home/tplink_utils.py @@ -10,7 +10,7 @@ You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 -Modifications by Scott Gasch Copyright 2020-2022 also released under +Modifications by Scott Gasch Copyright © 2020-2022 also released under the Apache 2.0 license as required by the license (see above). Unless required by applicable law or agreed to in writing, software @@ -24,10 +24,7 @@ limitations under the License. import json import logging import os -import re import socket -import subprocess -import sys import time from struct import pack from typing import Dict, List, Optional, Tuple @@ -57,7 +54,7 @@ commands = { @timeout(10.0, use_signals=False, error_message="Timed out waiting for tplink.py") -def tplink_command(command: str) -> bool: +def tplink_command_wrapper(command: str) -> bool: result = os.system(command) signal = result & 0xFF if signal != 0: @@ -76,21 +73,14 @@ def tplink_command(command: str) -> bool: return True -@timeout(10.0, use_signals=False, error_message="Timed out waiting for tplink.py") -def tplink_get_info(cmd: str) -> Optional[Dict]: - logger.debug('Getting tplink device status via "%s"', cmd) - try: - out = subprocess.getoutput(cmd) - logger.debug('RAW OUT> %s', out) - out = re.sub("Sent:.*\n", "", out) - out = re.sub("Received: *", "", out) - info = json.loads(out)["system"]["get_sysinfo"] +def tplink_get_info(ip: str, port: int = 9999) -> Optional[Dict]: + success, response = communicate_with_device(ip, port, commands['info'], quiet=True) + if success: + assert len(response) == 1 + info = json.loads(response[0])["system"]["get_sysinfo"] logger.debug("%s", json.dumps(info, indent=4, sort_keys=True)) return info - except Exception as e: - logger.exception(e) - print(out, file=sys.stderr) - return None + return None def encrypt(string: str) -> bytes: @@ -176,7 +166,7 @@ def communicate_with_device( if not brief: raw = '' for b in encrypted_raw_request: - raw += '%02X ' % b + raw += f'{b:02X} ' logger.debug('Sent raw: "%s"', raw) # Note: 4 bytes of garbage (the key) @@ -186,14 +176,16 @@ def communicate_with_device( if not brief: raw = '' for b in raw_response: - raw += '%02X ' % b + raw += f'{b:02X} ' logger.debug('Received raw: "%s"', raw) - if '"err_code":0' not in decrypted_raw_response: - if '"err_code": 0' not in decrypted_raw_response: - logger.error("Did not see clean err_code in response?!") - return (False, all_responses) - logger.debug('All commands succeeded, returning True.') + if ( + '"err_code":0' not in decrypted_raw_response + and '"err_code": 0' not in decrypted_raw_response + ): + logger.error("Did not see clean err_code in response?!") + return (False, all_responses) + logger.debug('All commands succeeded.') return (True, all_responses) except socket.error: logger.error("Cound not connect to host %s:%s", ip, port)