Moving towards not needing a separate tplink.py utility.
[python_utils.git] / smart_home / tplink_utils.py
1 #!/usr/bin/env python3
2
3 """Wrapper functions for dealing with TPLink devices.  Based on code
4 written by Lubomir Stroetmann and Copyright 2016 softScheck GmbH which
5 was covered by the Apache 2.0 license:
6
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
10
11      http://www.apache.org/licenses/LICENSE-2.0
12
13 Modifications by Scott Gasch Copyright 2020-2022 also released under
14 the Apache 2.0 license as required by the license (see above).
15
16 Unless required by applicable law or agreed to in writing, software
17 distributed under the License is distributed on an "AS IS" BASIS,
18 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19 See the License for the specific language governing permissions and
20 limitations under the License.
21
22 """
23
24 import json
25 import logging
26 import os
27 import re
28 import socket
29 import subprocess
30 import sys
31 import time
32 from struct import pack
33 from typing import Any, Dict, Optional
34
35 import logging_utils
36 import string_utils
37 from decorator_utils import retry_if_false, timeout
38
39 logger = logging.getLogger(__name__)
40
41
42 @timeout(10.0, use_signals=False, error_message="Timed out waiting for tplink.py")
43 def tplink_command(command: str) -> bool:
44     result = os.system(command)
45     signal = result & 0xFF
46     if signal != 0:
47         msg = f'{command} died with signal {signal}'
48         logger.warning(msg)
49         logging_utils.hlog(msg)
50         return False
51     else:
52         exit_value = result >> 8
53         if exit_value != 0:
54             msg = f'{command} failed, exited {exit_value}'
55             logger.warning(msg)
56             logging_utils.hlog(msg)
57             return False
58     logger.debug('%s succeeded.', command)
59     return True
60
61
62 @timeout(10.0, use_signals=False, error_message="Timed out waiting for tplink.py")
63 def tplink_get_info(cmd: str) -> Optional[Dict]:
64     logger.debug('Getting tplink device status via "%s"', cmd)
65     try:
66         out = subprocess.getoutput(cmd)
67         logger.debug('RAW OUT> %s', out)
68         out = re.sub("Sent:.*\n", "", out)
69         out = re.sub("Received: *", "", out)
70         info = json.loads(out)["system"]["get_sysinfo"]
71         logger.debug("%s", json.dumps(info, indent=4, sort_keys=True))
72         return info
73     except Exception as e:
74         logger.exception(e)
75         print(out, file=sys.stderr)
76         return None
77
78
79 def encrypt(string: str) -> bytes:
80     """Encryption and Decryption of TP-Link Smart Home Protocol.  Note:
81     this introduces one dword (4 bytes) or crap in the front of the
82     message, the 171 key.
83
84     XOR Autokey Cipher with starting key = 171
85
86     >>> " ".join(hex(b).replace('0x', '') for b in encrypt('{"system":{"get_sysinfo":{}}}'))
87     '0 0 0 1d d0 f2 81 f8 8b ff 9a f7 d5 ef 94 b6 d1 b4 c0 9f ec 95 e6 8f e1 87 e8 ca f0 8b f6 8b f6'
88
89     """
90     key = 171
91     result = pack(">I", len(string))
92     for i in string:
93         a = key ^ ord(i)
94         key = a
95         result += bytes([a])
96     return result
97
98
99 def decrypt(string: bytes) -> str:
100     """Opposite of encrypt (above).  Note: encrypt introduces 4 bytes of
101     crap in the front of the message.
102
103     >>> b = encrypt('hi, mom')
104     >>> s = decrypt(b[4:])
105     >>> s
106     'hi, mom'
107
108     """
109     key = 171
110     result = ""
111     for i in string:
112         a = key ^ i
113         key = i
114         result += chr(a)
115     return result
116
117
118 def item_generator(json_input: Any, lookup_key: str):
119     """Walk through some JSON recursively looking for a key.  Used by the query
120     parameter in communicate_with_device (see below).
121
122     """
123     if isinstance(json_input, dict):
124         for k, v in json_input.items():
125             if k == lookup_key:
126                 yield v
127             else:
128                 yield from item_generator(v, lookup_key)
129     elif isinstance(json_input, list):
130         for item in json_input:
131             yield from item_generator(item, lookup_key)
132
133
134 @timeout(10, error_message="Timed out comunicating with device.")
135 @retry_if_false(3)
136 def communicate_with_device(
137     ip: str,
138     port: int,
139     cmd: str,
140     query: Optional[str] = None,
141     *,
142     quiet: bool = False,
143     brief: bool = True,
144 ) -> bool:
145
146     if string_utils.is_none_or_empty(ip) or not string_utils.is_ip_v4(ip):
147         raise ValueError(f"Invalid IP address. ({ip})")
148     if string_utils.is_none_or_empty(cmd):
149         raise ValueError(f"Invalid cmd ({cmd}).")
150
151     try:
152         sock_tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
153         sock_tcp.connect((ip, port))
154         logger.info('Connected to %s:%s; running %s...', ip, port, cmd)
155         for c in cmd.split(";"):
156             if c == "wait":
157                 if not quiet and not brief:
158                     print("Sleeping 3000ms...")
159                 time.sleep(3.0)
160                 continue
161
162             raw_request = c
163             encrypted_raw_request = encrypt(raw_request)
164             sock_tcp.send(encrypted_raw_request)
165             raw_response = sock_tcp.recv(2048)
166             if not quiet:
167                 json_request = json.loads(raw_request)
168                 json_request = json.dumps(json_request, sort_keys=True, indent=4)
169                 logger.debug('Sent: "%s"', json_request)
170                 if not brief:
171                     raw = ''
172                     for b in encrypted_raw_request:
173                         raw += '%02X ' % b
174                     logger.debug('Sent raw: "%s"', raw)
175
176                 # Note: 4 bytes of garbage (the key)
177                 decrypted_raw_response = decrypt(raw_response[4:])
178                 if not quiet:
179                     json_response = json.loads(decrypted_raw_response)
180                     json_response = json.dumps(json_response, sort_keys=True, indent=4)
181                     logger.debug('Received: "%s"', json_response)
182                     if not brief:
183                         raw = ''
184                         for b in raw_response:
185                             raw += '%02X ' % b
186                         logger.debug('Received raw: "%s"', raw)
187
188                 if query is not None:
189                     j = json.loads(decrypted_raw_response)
190                     for subquery in query:
191                         for q in subquery.split(","):
192                             for v in item_generator(j, q):
193                                 if not brief:
194                                     print(f"{q}:", end="")
195                                 print(f"{v}")
196
197         sock_tcp.close()
198         if '"err_code":0' not in decrypted_raw_response:
199             if '"err_code": 0' not in decrypted_raw_response:
200                 logger.error("Did not see clean err_code in response?!")
201                 return False
202         return True
203     except socket.error:
204         logger.error("Cound not connect to host %s:%s", ip, port)
205         return False
206
207
208 if __name__ == '__main__':
209     import doctest
210
211     doctest.testmod()