3 """Wrapper functions for dealing with TPLink devices. Based on code
4 written by Lubomir Stroetmann and Copyright 2016 softScheck GmbH which
5 was covered by the Apache 2.0 license:
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Modifications by Scott Gasch Copyright © 2020-2022 also released under
14 the Apache 2.0 license as required by the license (see above).
16 Unless required by applicable law or agreed to in writing, software
17 distributed under the License is distributed on an "AS IS" BASIS,
18 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19 See the License for the specific language governing permissions and
20 limitations under the License.
29 from struct import pack
30 from typing import Dict, List, Optional, Tuple
34 from decorator_utils import timeout
36 logger = logging.getLogger(__name__)
39 "info": '{"system":{"get_sysinfo":{}}}',
40 "on": '{"system":{"set_relay_state":{"state":1}}}',
41 "off": '{"system":{"set_relay_state":{"state":0}}}',
42 "offon": '{"system":{"set_relay_state":{"state":0}}};wait;{"system":{"set_relay_state":{"state":1}}}',
43 "onoff": '{"system":{"set_relay_state":{"state":1}}};wait;{"system":{"set_relay_state":{"state":0}}}',
44 "cloudinfo": '{"cnCloud":{"get_info":{}}}',
45 "wlanscan": '{"netif":{"get_scaninfo":{"refresh":0}}}',
46 "time": '{"time":{"get_time":{}}}',
47 "schedule": '{"schedule":{"get_rules":{}}}',
48 "countdown": '{"count_down":{"get_rules":{}}}',
49 "antitheft": '{"anti_theft":{"get_rules":{}}}',
50 "reboot": '{"system":{"reboot":{"delay":1}}}',
51 "reset": '{"system":{"reset":{"delay":1}}}',
52 "energy": '{"emeter":{"get_realtime":{}}}',
56 @timeout(10.0, use_signals=False, error_message="Timed out waiting for tplink.py")
57 def tplink_command_wrapper(command: str) -> bool:
58 result = os.system(command)
59 signal = result & 0xFF
61 msg = f'{command} died with signal {signal}'
63 logging_utils.hlog(msg)
66 exit_value = result >> 8
68 msg = f'{command} failed, exited {exit_value}'
70 logging_utils.hlog(msg)
72 logger.debug('%s succeeded.', command)
76 def tplink_get_info(ip: str, port: int = 9999) -> Optional[Dict]:
77 success, response = communicate_with_device(ip, port, commands['info'], quiet=True)
79 assert len(response) == 1
80 info = json.loads(response[0])["system"]["get_sysinfo"]
81 logger.debug("%s", json.dumps(info, indent=4, sort_keys=True))
86 def encrypt(string: str) -> bytes:
87 """Encryption and Decryption of TP-Link Smart Home Protocol. Note:
88 this introduces one dword (4 bytes) or crap in the front of the
91 XOR Autokey Cipher with starting key = 171
93 >>> " ".join(hex(b).replace('0x', '') for b in encrypt('{"system":{"get_sysinfo":{}}}'))
94 '0 0 0 1d d0 f2 81 f8 8b ff 9a f7 d5 ef 94 b6 d1 b4 c0 9f ec 95 e6 8f e1 87 e8 ca f0 8b f6 8b f6'
98 result = pack(">I", len(string))
106 def decrypt(string: bytes) -> str:
107 """Opposite of encrypt (above). Note: encrypt introduces 4 bytes of
108 crap in the front of the message.
110 >>> b = encrypt('hi, mom')
111 >>> s = decrypt(b[4:])
125 @timeout(10, error_message="Timed out comunicating with device.")
126 def communicate_with_device(
133 ) -> Tuple[bool, List[str]]:
134 """Given an IP address and port, open a socket, encrypt cmd, and sent it to
135 the device. Read a response, decrypt it and parse it.
138 if string_utils.is_none_or_empty(ip) or not string_utils.is_ip_v4(ip):
139 raise ValueError(f"Invalid IP address. ({ip})")
140 if string_utils.is_none_or_empty(cmd):
141 raise ValueError(f"Invalid cmd ({cmd}).")
145 sock_tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
146 sock_tcp.connect((ip, port))
147 logger.debug('Connected to %s:%s; running %s...', ip, port, cmd)
148 for c in cmd.split(";"):
150 if not quiet and not brief:
151 print("Sleeping 3000ms...")
156 encrypted_raw_request = encrypt(raw_request)
157 sock_tcp.send(encrypted_raw_request)
158 raw_response = sock_tcp.recv(2048)
159 decrypted_raw_response = decrypt(raw_response[4:])
160 all_responses.append(decrypted_raw_response)
163 json_request = json.loads(raw_request)
164 json_request = json.dumps(json_request, sort_keys=True, indent=4)
165 print(f'Sent: "{json_request}"')
168 for b in encrypted_raw_request:
170 logger.debug('Sent raw: "%s"', raw)
172 # Note: 4 bytes of garbage (the key)
173 json_response = json.loads(decrypted_raw_response)
174 json_response = json.dumps(json_response, sort_keys=True, indent=4)
175 print(f'Received: "{json_response}"')
178 for b in raw_response:
180 logger.debug('Received raw: "%s"', raw)
183 '"err_code":0' not in decrypted_raw_response
184 and '"err_code": 0' not in decrypted_raw_response
186 logger.error("Did not see clean err_code in response?!")
187 return (False, all_responses)
188 logger.debug('All commands succeeded.')
189 return (True, all_responses)
191 logger.error("Cound not connect to host %s:%s", ip, port)
192 return (False, all_responses)
197 if __name__ == '__main__':