http://www.apache.org/licenses/LICENSE-2.0
-Modifications by Scott Gasch Copyright 2020-2022 also released under
+Modifications by Scott Gasch Copyright © 2020-2022 also released under
the Apache 2.0 license as required by the license (see above).
Unless required by applicable law or agreed to in writing, software
import json
import logging
import os
-import re
-import subprocess
-import sys
+import socket
+import time
from struct import pack
-from typing import Dict, Optional
+from typing import Dict, List, Optional, Tuple
import logging_utils
+import string_utils
from decorator_utils import timeout
logger = logging.getLogger(__name__)
+commands = {
+ "info": '{"system":{"get_sysinfo":{}}}',
+ "on": '{"system":{"set_relay_state":{"state":1}}}',
+ "off": '{"system":{"set_relay_state":{"state":0}}}',
+ "offon": '{"system":{"set_relay_state":{"state":0}}};wait;{"system":{"set_relay_state":{"state":1}}}',
+ "onoff": '{"system":{"set_relay_state":{"state":1}}};wait;{"system":{"set_relay_state":{"state":0}}}',
+ "cloudinfo": '{"cnCloud":{"get_info":{}}}',
+ "wlanscan": '{"netif":{"get_scaninfo":{"refresh":0}}}',
+ "time": '{"time":{"get_time":{}}}',
+ "schedule": '{"schedule":{"get_rules":{}}}',
+ "countdown": '{"count_down":{"get_rules":{}}}',
+ "antitheft": '{"anti_theft":{"get_rules":{}}}',
+ "reboot": '{"system":{"reboot":{"delay":1}}}',
+ "reset": '{"system":{"reset":{"delay":1}}}',
+ "energy": '{"emeter":{"get_realtime":{}}}',
+}
+
@timeout(10.0, use_signals=False, error_message="Timed out waiting for tplink.py")
-def tplink_command(command: str) -> bool:
+def tplink_command_wrapper(command: str) -> bool:
result = os.system(command)
signal = result & 0xFF
if signal != 0:
return True
-@timeout(10.0, use_signals=False, error_message="Timed out waiting for tplink.py")
-def tplink_get_info(cmd: str) -> Optional[Dict]:
- logger.debug('Getting tplink device status via "%s"', cmd)
- try:
- out = subprocess.getoutput(cmd)
- logger.debug('RAW OUT> %s', out)
- out = re.sub("Sent:.*\n", "", out)
- out = re.sub("Received: *", "", out)
- info = json.loads(out)["system"]["get_sysinfo"]
+def tplink_get_info(ip: str, port: int = 9999) -> Optional[Dict]:
+ success, response = communicate_with_device(ip, port, commands['info'], quiet=True)
+ if success:
+ assert len(response) == 1
+ info = json.loads(response[0])["system"]["get_sysinfo"]
logger.debug("%s", json.dumps(info, indent=4, sort_keys=True))
return info
- except Exception as e:
- logger.exception(e)
- print(out, file=sys.stderr)
- return None
+ return None
def encrypt(string: str) -> bytes:
- """Encryption and Decryption of TP-Link Smart Home Protocol.
+ """Encryption and Decryption of TP-Link Smart Home Protocol. Note:
+ this introduces one dword (4 bytes) or crap in the front of the
+ message, the 171 key.
XOR Autokey Cipher with starting key = 171
def decrypt(string: bytes) -> str:
- """Opposite of encrypt (above).
+ """Opposite of encrypt (above). Note: encrypt introduces 4 bytes of
+ crap in the front of the message.
>>> b = encrypt('hi, mom')
>>> s = decrypt(b[4:])
return result
+@timeout(10, error_message="Timed out comunicating with device.")
+def communicate_with_device(
+ ip: str,
+ port: int,
+ cmd: str,
+ *,
+ quiet: bool = False,
+ brief: bool = True,
+) -> Tuple[bool, List[str]]:
+ """Given an IP address and port, open a socket, encrypt cmd, and sent it to
+ the device. Read a response, decrypt it and parse it.
+
+ """
+ if string_utils.is_none_or_empty(ip) or not string_utils.is_ip_v4(ip):
+ raise ValueError(f"Invalid IP address. ({ip})")
+ if string_utils.is_none_or_empty(cmd):
+ raise ValueError(f"Invalid cmd ({cmd}).")
+ all_responses = []
+
+ try:
+ sock_tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ sock_tcp.connect((ip, port))
+ logger.debug('Connected to %s:%s; running %s...', ip, port, cmd)
+ for c in cmd.split(";"):
+ if c == "wait":
+ if not quiet and not brief:
+ print("Sleeping 3000ms...")
+ time.sleep(3.0)
+ continue
+
+ raw_request = c
+ encrypted_raw_request = encrypt(raw_request)
+ sock_tcp.send(encrypted_raw_request)
+ raw_response = sock_tcp.recv(2048)
+ decrypted_raw_response = decrypt(raw_response[4:])
+ all_responses.append(decrypted_raw_response)
+
+ if not quiet:
+ json_request = json.loads(raw_request)
+ json_request = json.dumps(json_request, sort_keys=True, indent=4)
+ print(f'Sent: "{json_request}"')
+ if not brief:
+ raw = ''
+ for b in encrypted_raw_request:
+ raw += f'{b:02X} '
+ logger.debug('Sent raw: "%s"', raw)
+
+ # Note: 4 bytes of garbage (the key)
+ json_response = json.loads(decrypted_raw_response)
+ json_response = json.dumps(json_response, sort_keys=True, indent=4)
+ print(f'Received: "{json_response}"')
+ if not brief:
+ raw = ''
+ for b in raw_response:
+ raw += f'{b:02X} '
+ logger.debug('Received raw: "%s"', raw)
+
+ if (
+ '"err_code":0' not in decrypted_raw_response
+ and '"err_code": 0' not in decrypted_raw_response
+ ):
+ logger.error("Did not see clean err_code in response?!")
+ return (False, all_responses)
+ logger.debug('All commands succeeded.')
+ return (True, all_responses)
+ except socket.error:
+ logger.error("Cound not connect to host %s:%s", ip, port)
+ return (False, all_responses)
+ finally:
+ sock_tcp.close()
+
+
if __name__ == '__main__':
import doctest