http://www.apache.org/licenses/LICENSE-2.0
-Modifications by Scott Gasch Copyright 2020-2022 also released under
+Modifications by Scott Gasch Copyright © 2020-2022 also released under
the Apache 2.0 license as required by the license (see above).
Unless required by applicable law or agreed to in writing, software
import json
import logging
import os
-import re
import socket
-import subprocess
-import sys
import time
from struct import pack
from typing import Dict, List, Optional, Tuple
@timeout(10.0, use_signals=False, error_message="Timed out waiting for tplink.py")
-def tplink_command(command: str) -> bool:
+def tplink_command_wrapper(command: str) -> bool:
result = os.system(command)
signal = result & 0xFF
if signal != 0:
return True
-@timeout(10.0, use_signals=False, error_message="Timed out waiting for tplink.py")
-def tplink_get_info(cmd: str) -> Optional[Dict]:
- logger.debug('Getting tplink device status via "%s"', cmd)
- try:
- out = subprocess.getoutput(cmd)
- logger.debug('RAW OUT> %s', out)
- out = re.sub("Sent:.*\n", "", out)
- out = re.sub("Received: *", "", out)
- info = json.loads(out)["system"]["get_sysinfo"]
+def tplink_get_info(ip: str, port: int = 9999) -> Optional[Dict]:
+ success, response = communicate_with_device(ip, port, commands['info'], quiet=True)
+ if success:
+ assert len(response) == 1
+ info = json.loads(response[0])["system"]["get_sysinfo"]
logger.debug("%s", json.dumps(info, indent=4, sort_keys=True))
return info
- except Exception as e:
- logger.exception(e)
- print(out, file=sys.stderr)
- return None
+ return None
def encrypt(string: str) -> bytes:
if not brief:
raw = ''
for b in encrypted_raw_request:
- raw += '%02X ' % b
+ raw += f'{b:02X} '
logger.debug('Sent raw: "%s"', raw)
# Note: 4 bytes of garbage (the key)
if not brief:
raw = ''
for b in raw_response:
- raw += '%02X ' % b
+ raw += f'{b:02X} '
logger.debug('Received raw: "%s"', raw)
- if '"err_code":0' not in decrypted_raw_response:
- if '"err_code": 0' not in decrypted_raw_response:
- logger.error("Did not see clean err_code in response?!")
- return (False, all_responses)
- logger.debug('All commands succeeded, returning True.')
+ if (
+ '"err_code":0' not in decrypted_raw_response
+ and '"err_code": 0' not in decrypted_raw_response
+ ):
+ logger.error("Did not see clean err_code in response?!")
+ return (False, all_responses)
+ logger.debug('All commands succeeded.')
return (True, all_responses)
except socket.error:
logger.error("Cound not connect to host %s:%s", ip, port)