import json
import logging
import os
-import re
import socket
-import subprocess
-import sys
import time
from struct import pack
from typing import Dict, List, Optional, Tuple
@timeout(10.0, use_signals=False, error_message="Timed out waiting for tplink.py")
-def tplink_command(command: str) -> bool:
+def tplink_command_wrapper(command: str) -> bool:
result = os.system(command)
signal = result & 0xFF
if signal != 0:
return True
-@timeout(10.0, use_signals=False, error_message="Timed out waiting for tplink.py")
-def tplink_get_info(cmd: str) -> Optional[Dict]:
- logger.debug('Getting tplink device status via "%s"', cmd)
- try:
- out = subprocess.getoutput(cmd)
- logger.debug('RAW OUT> %s', out)
- out = re.sub("Sent:.*\n", "", out)
- out = re.sub("Received: *", "", out)
- info = json.loads(out)["system"]["get_sysinfo"]
+def tplink_get_info(ip: str, port: int = 9999) -> Optional[Dict]:
+ success, response = communicate_with_device(ip, port, commands['info'], quiet=True)
+ if success:
+ assert len(response) == 1
+ info = json.loads(response[0])["system"]["get_sysinfo"]
logger.debug("%s", json.dumps(info, indent=4, sort_keys=True))
return info
- except Exception as e:
- logger.exception(e)
- print(out, file=sys.stderr)
- return None
+ return None
def encrypt(string: str) -> bytes:
if not brief:
raw = ''
for b in encrypted_raw_request:
- raw += '%02X ' % b
+ raw += f'{b:02X} '
logger.debug('Sent raw: "%s"', raw)
# Note: 4 bytes of garbage (the key)
if not brief:
raw = ''
for b in raw_response:
- raw += '%02X ' % b
+ raw += f'{b:02X} '
logger.debug('Received raw: "%s"', raw)
- if '"err_code":0' not in decrypted_raw_response:
- if '"err_code": 0' not in decrypted_raw_response:
- logger.error("Did not see clean err_code in response?!")
- return (False, all_responses)
- logger.debug('All commands succeeded, returning True.')
+ if (
+ '"err_code":0' not in decrypted_raw_response
+ and '"err_code": 0' not in decrypted_raw_response
+ ):
+ logger.error("Did not see clean err_code in response?!")
+ return (False, all_responses)
+ logger.debug('All commands succeeded.')
return (True, all_responses)
except socket.error:
logger.error("Cound not connect to host %s:%s", ip, port)