import sys
import time
from struct import pack
-from typing import Any, Dict, Optional
+from typing import Dict, List, Optional, Tuple
import logging_utils
import string_utils
-from decorator_utils import retry_if_false, timeout
+from decorator_utils import timeout
logger = logging.getLogger(__name__)
+commands = {
+ "info": '{"system":{"get_sysinfo":{}}}',
+ "on": '{"system":{"set_relay_state":{"state":1}}}',
+ "off": '{"system":{"set_relay_state":{"state":0}}}',
+ "offon": '{"system":{"set_relay_state":{"state":0}}};wait;{"system":{"set_relay_state":{"state":1}}}',
+ "onoff": '{"system":{"set_relay_state":{"state":1}}};wait;{"system":{"set_relay_state":{"state":0}}}',
+ "cloudinfo": '{"cnCloud":{"get_info":{}}}',
+ "wlanscan": '{"netif":{"get_scaninfo":{"refresh":0}}}',
+ "time": '{"time":{"get_time":{}}}',
+ "schedule": '{"schedule":{"get_rules":{}}}',
+ "countdown": '{"count_down":{"get_rules":{}}}',
+ "antitheft": '{"anti_theft":{"get_rules":{}}}',
+ "reboot": '{"system":{"reboot":{"delay":1}}}',
+ "reset": '{"system":{"reset":{"delay":1}}}',
+ "energy": '{"emeter":{"get_realtime":{}}}',
+}
+
@timeout(10.0, use_signals=False, error_message="Timed out waiting for tplink.py")
def tplink_command(command: str) -> bool:
return result
-def item_generator(json_input: Any, lookup_key: str):
- """Walk through some JSON recursively looking for a key. Used by the query
- parameter in communicate_with_device (see below).
-
- """
- if isinstance(json_input, dict):
- for k, v in json_input.items():
- if k == lookup_key:
- yield v
- else:
- yield from item_generator(v, lookup_key)
- elif isinstance(json_input, list):
- for item in json_input:
- yield from item_generator(item, lookup_key)
-
-
@timeout(10, error_message="Timed out comunicating with device.")
-@retry_if_false(3)
def communicate_with_device(
ip: str,
port: int,
cmd: str,
- query: Optional[str] = None,
*,
quiet: bool = False,
brief: bool = True,
-) -> bool:
+) -> Tuple[bool, List[str]]:
+ """Given an IP address and port, open a socket, encrypt cmd, and sent it to
+ the device. Read a response, decrypt it and parse it.
+ """
if string_utils.is_none_or_empty(ip) or not string_utils.is_ip_v4(ip):
raise ValueError(f"Invalid IP address. ({ip})")
if string_utils.is_none_or_empty(cmd):
raise ValueError(f"Invalid cmd ({cmd}).")
+ all_responses = []
try:
sock_tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock_tcp.connect((ip, port))
- logger.info('Connected to %s:%s; running %s...', ip, port, cmd)
+ logger.debug('Connected to %s:%s; running %s...', ip, port, cmd)
for c in cmd.split(";"):
if c == "wait":
if not quiet and not brief:
encrypted_raw_request = encrypt(raw_request)
sock_tcp.send(encrypted_raw_request)
raw_response = sock_tcp.recv(2048)
+ decrypted_raw_response = decrypt(raw_response[4:])
+ all_responses.append(decrypted_raw_response)
+
if not quiet:
json_request = json.loads(raw_request)
json_request = json.dumps(json_request, sort_keys=True, indent=4)
- logger.debug('Sent: "%s"', json_request)
+ print(f'Sent: "{json_request}"')
if not brief:
raw = ''
for b in encrypted_raw_request:
logger.debug('Sent raw: "%s"', raw)
# Note: 4 bytes of garbage (the key)
- decrypted_raw_response = decrypt(raw_response[4:])
- if not quiet:
- json_response = json.loads(decrypted_raw_response)
- json_response = json.dumps(json_response, sort_keys=True, indent=4)
- logger.debug('Received: "%s"', json_response)
- if not brief:
- raw = ''
- for b in raw_response:
- raw += '%02X ' % b
- logger.debug('Received raw: "%s"', raw)
-
- if query is not None:
- j = json.loads(decrypted_raw_response)
- for subquery in query:
- for q in subquery.split(","):
- for v in item_generator(j, q):
- if not brief:
- print(f"{q}:", end="")
- print(f"{v}")
-
- sock_tcp.close()
- if '"err_code":0' not in decrypted_raw_response:
- if '"err_code": 0' not in decrypted_raw_response:
- logger.error("Did not see clean err_code in response?!")
- return False
- return True
+ json_response = json.loads(decrypted_raw_response)
+ json_response = json.dumps(json_response, sort_keys=True, indent=4)
+ print(f'Received: "{json_response}"')
+ if not brief:
+ raw = ''
+ for b in raw_response:
+ raw += '%02X ' % b
+ logger.debug('Received raw: "%s"', raw)
+
+ if '"err_code":0' not in decrypted_raw_response:
+ if '"err_code": 0' not in decrypted_raw_response:
+ logger.error("Did not see clean err_code in response?!")
+ return (False, all_responses)
+ logger.debug('All commands succeeded, returning True.')
+ return (True, all_responses)
except socket.error:
logger.error("Cound not connect to host %s:%s", ip, port)
- return False
+ return (False, all_responses)
+ finally:
+ sock_tcp.close()
if __name__ == '__main__':