3 """Wrapper functions for dealing with TPLink devices. Based on code
4 written by Lubomir Stroetmann and Copyright 2016 softScheck GmbH which
5 was covered by the Apache 2.0 license:
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
11 http://www.apache.org/licenses/LICENSE-2.0
13 Modifications by Scott Gasch Copyright 2020-2022 also released under
14 the Apache 2.0 license as required by the license (see above).
16 Unless required by applicable law or agreed to in writing, software
17 distributed under the License is distributed on an "AS IS" BASIS,
18 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19 See the License for the specific language governing permissions and
20 limitations under the License.
32 from struct import pack
33 from typing import Any, Dict, Optional
37 from decorator_utils import retry_if_false, timeout
39 logger = logging.getLogger(__name__)
42 @timeout(10.0, use_signals=False, error_message="Timed out waiting for tplink.py")
43 def tplink_command(command: str) -> bool:
44 result = os.system(command)
45 signal = result & 0xFF
47 msg = f'{command} died with signal {signal}'
49 logging_utils.hlog(msg)
52 exit_value = result >> 8
54 msg = f'{command} failed, exited {exit_value}'
56 logging_utils.hlog(msg)
58 logger.debug('%s succeeded.', command)
62 @timeout(10.0, use_signals=False, error_message="Timed out waiting for tplink.py")
63 def tplink_get_info(cmd: str) -> Optional[Dict]:
64 logger.debug('Getting tplink device status via "%s"', cmd)
66 out = subprocess.getoutput(cmd)
67 logger.debug('RAW OUT> %s', out)
68 out = re.sub("Sent:.*\n", "", out)
69 out = re.sub("Received: *", "", out)
70 info = json.loads(out)["system"]["get_sysinfo"]
71 logger.debug("%s", json.dumps(info, indent=4, sort_keys=True))
73 except Exception as e:
75 print(out, file=sys.stderr)
79 def encrypt(string: str) -> bytes:
80 """Encryption and Decryption of TP-Link Smart Home Protocol. Note:
81 this introduces one dword (4 bytes) or crap in the front of the
84 XOR Autokey Cipher with starting key = 171
86 >>> " ".join(hex(b).replace('0x', '') for b in encrypt('{"system":{"get_sysinfo":{}}}'))
87 '0 0 0 1d d0 f2 81 f8 8b ff 9a f7 d5 ef 94 b6 d1 b4 c0 9f ec 95 e6 8f e1 87 e8 ca f0 8b f6 8b f6'
91 result = pack(">I", len(string))
99 def decrypt(string: bytes) -> str:
100 """Opposite of encrypt (above). Note: encrypt introduces 4 bytes of
101 crap in the front of the message.
103 >>> b = encrypt('hi, mom')
104 >>> s = decrypt(b[4:])
118 def item_generator(json_input: Any, lookup_key: str):
119 """Walk through some JSON recursively looking for a key. Used by the query
120 parameter in communicate_with_device (see below).
123 if isinstance(json_input, dict):
124 for k, v in json_input.items():
128 yield from item_generator(v, lookup_key)
129 elif isinstance(json_input, list):
130 for item in json_input:
131 yield from item_generator(item, lookup_key)
134 @timeout(10, error_message="Timed out comunicating with device.")
136 def communicate_with_device(
140 query: Optional[str] = None,
146 if string_utils.is_none_or_empty(ip) or not string_utils.is_ip_v4(ip):
147 raise ValueError(f"Invalid IP address. ({ip})")
148 if string_utils.is_none_or_empty(cmd):
149 raise ValueError(f"Invalid cmd ({cmd}).")
152 sock_tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
153 sock_tcp.connect((ip, port))
154 logger.info('Connected to %s:%s; running %s...', ip, port, cmd)
155 for c in cmd.split(";"):
157 if not quiet and not brief:
158 print("Sleeping 3000ms...")
163 encrypted_raw_request = encrypt(raw_request)
164 sock_tcp.send(encrypted_raw_request)
165 raw_response = sock_tcp.recv(2048)
167 json_request = json.loads(raw_request)
168 json_request = json.dumps(json_request, sort_keys=True, indent=4)
169 logger.debug('Sent: "%s"', json_request)
172 for b in encrypted_raw_request:
174 logger.debug('Sent raw: "%s"', raw)
176 # Note: 4 bytes of garbage (the key)
177 decrypted_raw_response = decrypt(raw_response[4:])
179 json_response = json.loads(decrypted_raw_response)
180 json_response = json.dumps(json_response, sort_keys=True, indent=4)
181 logger.debug('Received: "%s"', json_response)
184 for b in raw_response:
186 logger.debug('Received raw: "%s"', raw)
188 if query is not None:
189 j = json.loads(decrypted_raw_response)
190 for subquery in query:
191 for q in subquery.split(","):
192 for v in item_generator(j, q):
194 print(f"{q}:", end="")
198 if '"err_code":0' not in decrypted_raw_response:
199 if '"err_code": 0' not in decrypted_raw_response:
200 logger.error("Did not see clean err_code in response?!")
204 logger.error("Cound not connect to host %s:%s", ip, port)
208 if __name__ == '__main__':