#!/usr/bin/env python3 """Wrapper functions for dealing with TPLink devices. Based on code written by Lubomir Stroetmann and Copyright 2016 softScheck GmbH which was covered by the Apache 2.0 license: Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Modifications by Scott Gasch Copyright 2020-2022 also released under the Apache 2.0 license as required by the license (see above). Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ import json import logging import os import re import socket import subprocess import sys import time from struct import pack from typing import Dict, List, Optional, Tuple import logging_utils import string_utils from decorator_utils import timeout logger = logging.getLogger(__name__) commands = { "info": '{"system":{"get_sysinfo":{}}}', "on": '{"system":{"set_relay_state":{"state":1}}}', "off": '{"system":{"set_relay_state":{"state":0}}}', "offon": '{"system":{"set_relay_state":{"state":0}}};wait;{"system":{"set_relay_state":{"state":1}}}', "onoff": '{"system":{"set_relay_state":{"state":1}}};wait;{"system":{"set_relay_state":{"state":0}}}', "cloudinfo": '{"cnCloud":{"get_info":{}}}', "wlanscan": '{"netif":{"get_scaninfo":{"refresh":0}}}', "time": '{"time":{"get_time":{}}}', "schedule": '{"schedule":{"get_rules":{}}}', "countdown": '{"count_down":{"get_rules":{}}}', "antitheft": '{"anti_theft":{"get_rules":{}}}', "reboot": '{"system":{"reboot":{"delay":1}}}', "reset": '{"system":{"reset":{"delay":1}}}', "energy": '{"emeter":{"get_realtime":{}}}', } @timeout(10.0, use_signals=False, error_message="Timed out waiting for tplink.py") def tplink_command(command: str) -> bool: result = os.system(command) signal = result & 0xFF if signal != 0: msg = f'{command} died with signal {signal}' logger.warning(msg) logging_utils.hlog(msg) return False else: exit_value = result >> 8 if exit_value != 0: msg = f'{command} failed, exited {exit_value}' logger.warning(msg) logging_utils.hlog(msg) return False logger.debug('%s succeeded.', command) return True @timeout(10.0, use_signals=False, error_message="Timed out waiting for tplink.py") def tplink_get_info(cmd: str) -> Optional[Dict]: logger.debug('Getting tplink device status via "%s"', cmd) try: out = subprocess.getoutput(cmd) logger.debug('RAW OUT> %s', out) out = re.sub("Sent:.*\n", "", out) out = re.sub("Received: *", "", out) info = json.loads(out)["system"]["get_sysinfo"] logger.debug("%s", json.dumps(info, indent=4, sort_keys=True)) return info except Exception as e: logger.exception(e) print(out, file=sys.stderr) return None def encrypt(string: str) -> bytes: """Encryption and Decryption of TP-Link Smart Home Protocol. Note: this introduces one dword (4 bytes) or crap in the front of the message, the 171 key. XOR Autokey Cipher with starting key = 171 >>> " ".join(hex(b).replace('0x', '') for b in encrypt('{"system":{"get_sysinfo":{}}}')) '0 0 0 1d d0 f2 81 f8 8b ff 9a f7 d5 ef 94 b6 d1 b4 c0 9f ec 95 e6 8f e1 87 e8 ca f0 8b f6 8b f6' """ key = 171 result = pack(">I", len(string)) for i in string: a = key ^ ord(i) key = a result += bytes([a]) return result def decrypt(string: bytes) -> str: """Opposite of encrypt (above). Note: encrypt introduces 4 bytes of crap in the front of the message. >>> b = encrypt('hi, mom') >>> s = decrypt(b[4:]) >>> s 'hi, mom' """ key = 171 result = "" for i in string: a = key ^ i key = i result += chr(a) return result @timeout(10, error_message="Timed out comunicating with device.") def communicate_with_device( ip: str, port: int, cmd: str, *, quiet: bool = False, brief: bool = True, ) -> Tuple[bool, List[str]]: """Given an IP address and port, open a socket, encrypt cmd, and sent it to the device. Read a response, decrypt it and parse it. """ if string_utils.is_none_or_empty(ip) or not string_utils.is_ip_v4(ip): raise ValueError(f"Invalid IP address. ({ip})") if string_utils.is_none_or_empty(cmd): raise ValueError(f"Invalid cmd ({cmd}).") all_responses = [] try: sock_tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock_tcp.connect((ip, port)) logger.debug('Connected to %s:%s; running %s...', ip, port, cmd) for c in cmd.split(";"): if c == "wait": if not quiet and not brief: print("Sleeping 3000ms...") time.sleep(3.0) continue raw_request = c encrypted_raw_request = encrypt(raw_request) sock_tcp.send(encrypted_raw_request) raw_response = sock_tcp.recv(2048) decrypted_raw_response = decrypt(raw_response[4:]) all_responses.append(decrypted_raw_response) if not quiet: json_request = json.loads(raw_request) json_request = json.dumps(json_request, sort_keys=True, indent=4) print(f'Sent: "{json_request}"') if not brief: raw = '' for b in encrypted_raw_request: raw += '%02X ' % b logger.debug('Sent raw: "%s"', raw) # Note: 4 bytes of garbage (the key) json_response = json.loads(decrypted_raw_response) json_response = json.dumps(json_response, sort_keys=True, indent=4) print(f'Received: "{json_response}"') if not brief: raw = '' for b in raw_response: raw += '%02X ' % b logger.debug('Received raw: "%s"', raw) if '"err_code":0' not in decrypted_raw_response: if '"err_code": 0' not in decrypted_raw_response: logger.error("Did not see clean err_code in response?!") return (False, all_responses) logger.debug('All commands succeeded, returning True.') return (True, all_responses) except socket.error: logger.error("Cound not connect to host %s:%s", ip, port) return (False, all_responses) finally: sock_tcp.close() if __name__ == '__main__': import doctest doctest.testmod()