#!/usr/bin/env python3
-"""Wrapper functions for calling tplink.py"""
+"""Wrapper functions for dealing with TPLink devices. Based on code
+written by Lubomir Stroetmann and Copyright 2016 softScheck GmbH which
+was covered by the Apache 2.0 license:
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Modifications by Scott Gasch Copyright © 2020-2022 also released under
+the Apache 2.0 license as required by the license (see above).
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+"""
import json
import logging
import os
-import re
-import subprocess
-import sys
-from typing import Dict, Optional
+import socket
+import time
+from struct import pack
+from typing import Dict, List, Optional, Tuple
import logging_utils
+import string_utils
from decorator_utils import timeout
logger = logging.getLogger(__name__)
+commands = {
+ "info": '{"system":{"get_sysinfo":{}}}',
+ "on": '{"system":{"set_relay_state":{"state":1}}}',
+ "off": '{"system":{"set_relay_state":{"state":0}}}',
+ "offon": '{"system":{"set_relay_state":{"state":0}}};wait;{"system":{"set_relay_state":{"state":1}}}',
+ "onoff": '{"system":{"set_relay_state":{"state":1}}};wait;{"system":{"set_relay_state":{"state":0}}}',
+ "cloudinfo": '{"cnCloud":{"get_info":{}}}',
+ "wlanscan": '{"netif":{"get_scaninfo":{"refresh":0}}}',
+ "time": '{"time":{"get_time":{}}}',
+ "schedule": '{"schedule":{"get_rules":{}}}',
+ "countdown": '{"count_down":{"get_rules":{}}}',
+ "antitheft": '{"anti_theft":{"get_rules":{}}}',
+ "reboot": '{"system":{"reboot":{"delay":1}}}',
+ "reset": '{"system":{"reset":{"delay":1}}}',
+ "energy": '{"emeter":{"get_realtime":{}}}',
+}
+
@timeout(10.0, use_signals=False, error_message="Timed out waiting for tplink.py")
-def tplink_command(command: str) -> bool:
+def tplink_command_wrapper(command: str) -> bool:
result = os.system(command)
signal = result & 0xFF
if signal != 0:
return True
-@timeout(10.0, use_signals=False, error_message="Timed out waiting for tplink.py")
-def tplink_get_info(cmd: str) -> Optional[Dict]:
- logger.debug('Getting tplink device status via "%s"', cmd)
- try:
- out = subprocess.getoutput(cmd)
- logger.debug('RAW OUT> %s', out)
- out = re.sub("Sent:.*\n", "", out)
- out = re.sub("Received: *", "", out)
- info = json.loads(out)["system"]["get_sysinfo"]
+def tplink_get_info(ip: str, port: int = 9999) -> Optional[Dict]:
+ success, response = communicate_with_device(ip, port, commands['info'], quiet=True)
+ if success:
+ assert len(response) == 1
+ info = json.loads(response[0])["system"]["get_sysinfo"]
logger.debug("%s", json.dumps(info, indent=4, sort_keys=True))
return info
- except Exception as e:
- logger.exception(e)
- print(out, file=sys.stderr)
- return None
+ return None
+
+
+def encrypt(string: str) -> bytes:
+ """Encryption and Decryption of TP-Link Smart Home Protocol. Note:
+ this introduces one dword (4 bytes) or crap in the front of the
+ message, the 171 key.
+
+ XOR Autokey Cipher with starting key = 171
+
+ >>> " ".join(hex(b).replace('0x', '') for b in encrypt('{"system":{"get_sysinfo":{}}}'))
+ '0 0 0 1d d0 f2 81 f8 8b ff 9a f7 d5 ef 94 b6 d1 b4 c0 9f ec 95 e6 8f e1 87 e8 ca f0 8b f6 8b f6'
+
+ """
+ key = 171
+ result = pack(">I", len(string))
+ for i in string:
+ a = key ^ ord(i)
+ key = a
+ result += bytes([a])
+ return result
+
+
+def decrypt(string: bytes) -> str:
+ """Opposite of encrypt (above). Note: encrypt introduces 4 bytes of
+ crap in the front of the message.
+
+ >>> b = encrypt('hi, mom')
+ >>> s = decrypt(b[4:])
+ >>> s
+ 'hi, mom'
+
+ """
+ key = 171
+ result = ""
+ for i in string:
+ a = key ^ i
+ key = i
+ result += chr(a)
+ return result
+
+
+@timeout(10, error_message="Timed out comunicating with device.")
+def communicate_with_device(
+ ip: str,
+ port: int,
+ cmd: str,
+ *,
+ quiet: bool = False,
+ brief: bool = True,
+) -> Tuple[bool, List[str]]:
+ """Given an IP address and port, open a socket, encrypt cmd, and sent it to
+ the device. Read a response, decrypt it and parse it.
+
+ """
+ if string_utils.is_none_or_empty(ip) or not string_utils.is_ip_v4(ip):
+ raise ValueError(f"Invalid IP address. ({ip})")
+ if string_utils.is_none_or_empty(cmd):
+ raise ValueError(f"Invalid cmd ({cmd}).")
+ all_responses = []
+
+ try:
+ sock_tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ sock_tcp.connect((ip, port))
+ logger.debug('Connected to %s:%s; running %s...', ip, port, cmd)
+ for c in cmd.split(";"):
+ if c == "wait":
+ if not quiet and not brief:
+ print("Sleeping 3000ms...")
+ time.sleep(3.0)
+ continue
+
+ raw_request = c
+ encrypted_raw_request = encrypt(raw_request)
+ sock_tcp.send(encrypted_raw_request)
+ raw_response = sock_tcp.recv(2048)
+ decrypted_raw_response = decrypt(raw_response[4:])
+ all_responses.append(decrypted_raw_response)
+
+ if not quiet:
+ json_request = json.loads(raw_request)
+ json_request = json.dumps(json_request, sort_keys=True, indent=4)
+ print(f'Sent: "{json_request}"')
+ if not brief:
+ raw = ''
+ for b in encrypted_raw_request:
+ raw += f'{b:02X} '
+ logger.debug('Sent raw: "%s"', raw)
+
+ # Note: 4 bytes of garbage (the key)
+ json_response = json.loads(decrypted_raw_response)
+ json_response = json.dumps(json_response, sort_keys=True, indent=4)
+ print(f'Received: "{json_response}"')
+ if not brief:
+ raw = ''
+ for b in raw_response:
+ raw += f'{b:02X} '
+ logger.debug('Received raw: "%s"', raw)
+
+ if (
+ '"err_code":0' not in decrypted_raw_response
+ and '"err_code": 0' not in decrypted_raw_response
+ ):
+ logger.error("Did not see clean err_code in response?!")
+ return (False, all_responses)
+ logger.debug('All commands succeeded.')
+ return (True, all_responses)
+ except socket.error:
+ logger.error("Cound not connect to host %s:%s", ip, port)
+ return (False, all_responses)
+ finally:
+ sock_tcp.close()
+
+
+if __name__ == '__main__':
+ import doctest
+
+ doctest.testmod()