X-Git-Url: https://wannabe.guru.org/gitweb/?a=blobdiff_plain;f=smart_home%2Ftplink_utils.py;h=abf4970c3c9593f76b641114716fcff88a782627;hb=532df2c5b57c7517dfb3dddd8c1358fbadf8baf3;hp=91cdd755903670d0013d5de7e19353c8cc119b70;hpb=1220ffcf56e968de31b62f6d0c5af250fed9028e;p=python_utils.git diff --git a/smart_home/tplink_utils.py b/smart_home/tplink_utils.py index 91cdd75..abf4970 100644 --- a/smart_home/tplink_utils.py +++ b/smart_home/tplink_utils.py @@ -1,23 +1,60 @@ #!/usr/bin/env python3 -"""Wrapper functions for calling tplink.py""" +"""Wrapper functions for dealing with TPLink devices. Based on code +written by Lubomir Stroetmann and Copyright 2016 softScheck GmbH which +was covered by the Apache 2.0 license: + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Modifications by Scott Gasch Copyright © 2020-2022 also released under +the Apache 2.0 license as required by the license (see above). + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +""" import json import logging import os -import re -import subprocess -import sys -from typing import Dict, Optional +import socket +import time +from struct import pack +from typing import Dict, List, Optional, Tuple import logging_utils +import string_utils from decorator_utils import timeout logger = logging.getLogger(__name__) +commands = { + "info": '{"system":{"get_sysinfo":{}}}', + "on": '{"system":{"set_relay_state":{"state":1}}}', + "off": '{"system":{"set_relay_state":{"state":0}}}', + "offon": '{"system":{"set_relay_state":{"state":0}}};wait;{"system":{"set_relay_state":{"state":1}}}', + "onoff": '{"system":{"set_relay_state":{"state":1}}};wait;{"system":{"set_relay_state":{"state":0}}}', + "cloudinfo": '{"cnCloud":{"get_info":{}}}', + "wlanscan": '{"netif":{"get_scaninfo":{"refresh":0}}}', + "time": '{"time":{"get_time":{}}}', + "schedule": '{"schedule":{"get_rules":{}}}', + "countdown": '{"count_down":{"get_rules":{}}}', + "antitheft": '{"anti_theft":{"get_rules":{}}}', + "reboot": '{"system":{"reboot":{"delay":1}}}', + "reset": '{"system":{"reset":{"delay":1}}}', + "energy": '{"emeter":{"get_realtime":{}}}', +} + @timeout(10.0, use_signals=False, error_message="Timed out waiting for tplink.py") -def tplink_command(command: str) -> bool: +def tplink_command_wrapper(command: str) -> bool: result = os.system(command) signal = result & 0xFF if signal != 0: @@ -36,18 +73,128 @@ def tplink_command(command: str) -> bool: return True -@timeout(10.0, use_signals=False, error_message="Timed out waiting for tplink.py") -def tplink_get_info(cmd: str) -> Optional[Dict]: - logger.debug('Getting tplink device status via "%s"', cmd) - try: - out = subprocess.getoutput(cmd) - logger.debug('RAW OUT> %s', out) - out = re.sub("Sent:.*\n", "", out) - out = re.sub("Received: *", "", out) - info = json.loads(out)["system"]["get_sysinfo"] +def tplink_get_info(ip: str, port: int = 9999) -> Optional[Dict]: + success, response = communicate_with_device(ip, port, commands['info'], quiet=True) + if success: + assert len(response) == 1 + info = json.loads(response[0])["system"]["get_sysinfo"] logger.debug("%s", json.dumps(info, indent=4, sort_keys=True)) return info - except Exception as e: - logger.exception(e) - print(out, file=sys.stderr) - return None + return None + + +def encrypt(string: str) -> bytes: + """Encryption and Decryption of TP-Link Smart Home Protocol. Note: + this introduces one dword (4 bytes) or crap in the front of the + message, the 171 key. + + XOR Autokey Cipher with starting key = 171 + + >>> " ".join(hex(b).replace('0x', '') for b in encrypt('{"system":{"get_sysinfo":{}}}')) + '0 0 0 1d d0 f2 81 f8 8b ff 9a f7 d5 ef 94 b6 d1 b4 c0 9f ec 95 e6 8f e1 87 e8 ca f0 8b f6 8b f6' + + """ + key = 171 + result = pack(">I", len(string)) + for i in string: + a = key ^ ord(i) + key = a + result += bytes([a]) + return result + + +def decrypt(string: bytes) -> str: + """Opposite of encrypt (above). Note: encrypt introduces 4 bytes of + crap in the front of the message. + + >>> b = encrypt('hi, mom') + >>> s = decrypt(b[4:]) + >>> s + 'hi, mom' + + """ + key = 171 + result = "" + for i in string: + a = key ^ i + key = i + result += chr(a) + return result + + +@timeout(10, error_message="Timed out comunicating with device.") +def communicate_with_device( + ip: str, + port: int, + cmd: str, + *, + quiet: bool = False, + brief: bool = True, +) -> Tuple[bool, List[str]]: + """Given an IP address and port, open a socket, encrypt cmd, and sent it to + the device. Read a response, decrypt it and parse it. + + """ + if string_utils.is_none_or_empty(ip) or not string_utils.is_ip_v4(ip): + raise ValueError(f"Invalid IP address. ({ip})") + if string_utils.is_none_or_empty(cmd): + raise ValueError(f"Invalid cmd ({cmd}).") + all_responses = [] + + try: + sock_tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock_tcp.connect((ip, port)) + logger.debug('Connected to %s:%s; running %s...', ip, port, cmd) + for c in cmd.split(";"): + if c == "wait": + if not quiet and not brief: + print("Sleeping 3000ms...") + time.sleep(3.0) + continue + + raw_request = c + encrypted_raw_request = encrypt(raw_request) + sock_tcp.send(encrypted_raw_request) + raw_response = sock_tcp.recv(2048) + decrypted_raw_response = decrypt(raw_response[4:]) + all_responses.append(decrypted_raw_response) + + if not quiet: + json_request = json.loads(raw_request) + json_request = json.dumps(json_request, sort_keys=True, indent=4) + print(f'Sent: "{json_request}"') + if not brief: + raw = '' + for b in encrypted_raw_request: + raw += f'{b:02X} ' + logger.debug('Sent raw: "%s"', raw) + + # Note: 4 bytes of garbage (the key) + json_response = json.loads(decrypted_raw_response) + json_response = json.dumps(json_response, sort_keys=True, indent=4) + print(f'Received: "{json_response}"') + if not brief: + raw = '' + for b in raw_response: + raw += f'{b:02X} ' + logger.debug('Received raw: "%s"', raw) + + if ( + '"err_code":0' not in decrypted_raw_response + and '"err_code": 0' not in decrypted_raw_response + ): + logger.error("Did not see clean err_code in response?!") + return (False, all_responses) + logger.debug('All commands succeeded.') + return (True, all_responses) + except socket.error: + logger.error("Cound not connect to host %s:%s", ip, port) + return (False, all_responses) + finally: + sock_tcp.close() + + +if __name__ == '__main__': + import doctest + + doctest.testmod()