3 """Wrapper functions for dealing with TPLink devices. Based on code
4 written by Lubomir Stroetmann and Copyright 2016 softScheck GmbH which
5 was covered by the Apache 2.0 license:
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Modifications by Scott Gasch Copyright 2020-2022 also released under
14 the Apache 2.0 license as required by the license (see above).
16 Unless required by applicable law or agreed to in writing, software
17 distributed under the License is distributed on an "AS IS" BASIS,
18 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19 See the License for the specific language governing permissions and
20 limitations under the License.
32 from struct import pack
33 from typing import Dict, List, Optional, Tuple
37 from decorator_utils import timeout
39 logger = logging.getLogger(__name__)
42 "info": '{"system":{"get_sysinfo":{}}}',
43 "on": '{"system":{"set_relay_state":{"state":1}}}',
44 "off": '{"system":{"set_relay_state":{"state":0}}}',
45 "offon": '{"system":{"set_relay_state":{"state":0}}};wait;{"system":{"set_relay_state":{"state":1}}}',
46 "onoff": '{"system":{"set_relay_state":{"state":1}}};wait;{"system":{"set_relay_state":{"state":0}}}',
47 "cloudinfo": '{"cnCloud":{"get_info":{}}}',
48 "wlanscan": '{"netif":{"get_scaninfo":{"refresh":0}}}',
49 "time": '{"time":{"get_time":{}}}',
50 "schedule": '{"schedule":{"get_rules":{}}}',
51 "countdown": '{"count_down":{"get_rules":{}}}',
52 "antitheft": '{"anti_theft":{"get_rules":{}}}',
53 "reboot": '{"system":{"reboot":{"delay":1}}}',
54 "reset": '{"system":{"reset":{"delay":1}}}',
55 "energy": '{"emeter":{"get_realtime":{}}}',
59 @timeout(10.0, use_signals=False, error_message="Timed out waiting for tplink.py")
60 def tplink_command(command: str) -> bool:
61 result = os.system(command)
62 signal = result & 0xFF
64 msg = f'{command} died with signal {signal}'
66 logging_utils.hlog(msg)
69 exit_value = result >> 8
71 msg = f'{command} failed, exited {exit_value}'
73 logging_utils.hlog(msg)
75 logger.debug('%s succeeded.', command)
79 @timeout(10.0, use_signals=False, error_message="Timed out waiting for tplink.py")
80 def tplink_get_info(cmd: str) -> Optional[Dict]:
81 logger.debug('Getting tplink device status via "%s"', cmd)
83 out = subprocess.getoutput(cmd)
84 logger.debug('RAW OUT> %s', out)
85 out = re.sub("Sent:.*\n", "", out)
86 out = re.sub("Received: *", "", out)
87 info = json.loads(out)["system"]["get_sysinfo"]
88 logger.debug("%s", json.dumps(info, indent=4, sort_keys=True))
90 except Exception as e:
92 print(out, file=sys.stderr)
96 def encrypt(string: str) -> bytes:
97 """Encryption and Decryption of TP-Link Smart Home Protocol. Note:
98 this introduces one dword (4 bytes) or crap in the front of the
101 XOR Autokey Cipher with starting key = 171
103 >>> " ".join(hex(b).replace('0x', '') for b in encrypt('{"system":{"get_sysinfo":{}}}'))
104 '0 0 0 1d d0 f2 81 f8 8b ff 9a f7 d5 ef 94 b6 d1 b4 c0 9f ec 95 e6 8f e1 87 e8 ca f0 8b f6 8b f6'
108 result = pack(">I", len(string))
116 def decrypt(string: bytes) -> str:
117 """Opposite of encrypt (above). Note: encrypt introduces 4 bytes of
118 crap in the front of the message.
120 >>> b = encrypt('hi, mom')
121 >>> s = decrypt(b[4:])
135 @timeout(10, error_message="Timed out comunicating with device.")
136 def communicate_with_device(
143 ) -> Tuple[bool, List[str]]:
144 """Given an IP address and port, open a socket, encrypt cmd, and sent it to
145 the device. Read a response, decrypt it and parse it.
148 if string_utils.is_none_or_empty(ip) or not string_utils.is_ip_v4(ip):
149 raise ValueError(f"Invalid IP address. ({ip})")
150 if string_utils.is_none_or_empty(cmd):
151 raise ValueError(f"Invalid cmd ({cmd}).")
155 sock_tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
156 sock_tcp.connect((ip, port))
157 logger.debug('Connected to %s:%s; running %s...', ip, port, cmd)
158 for c in cmd.split(";"):
160 if not quiet and not brief:
161 print("Sleeping 3000ms...")
166 encrypted_raw_request = encrypt(raw_request)
167 sock_tcp.send(encrypted_raw_request)
168 raw_response = sock_tcp.recv(2048)
169 decrypted_raw_response = decrypt(raw_response[4:])
170 all_responses.append(decrypted_raw_response)
173 json_request = json.loads(raw_request)
174 json_request = json.dumps(json_request, sort_keys=True, indent=4)
175 print(f'Sent: "{json_request}"')
178 for b in encrypted_raw_request:
180 logger.debug('Sent raw: "%s"', raw)
182 # Note: 4 bytes of garbage (the key)
183 json_response = json.loads(decrypted_raw_response)
184 json_response = json.dumps(json_response, sort_keys=True, indent=4)
185 print(f'Received: "{json_response}"')
188 for b in raw_response:
190 logger.debug('Received raw: "%s"', raw)
192 if '"err_code":0' not in decrypted_raw_response:
193 if '"err_code": 0' not in decrypted_raw_response:
194 logger.error("Did not see clean err_code in response?!")
195 return (False, all_responses)
196 logger.debug('All commands succeeded, returning True.')
197 return (True, all_responses)
199 logger.error("Cound not connect to host %s:%s", ip, port)
200 return (False, all_responses)
205 if __name__ == '__main__':