Moved more logic from tplink.py into a library.
[python_utils.git] / smart_home / tplink_utils.py
1 #!/usr/bin/env python3
2
3 """Wrapper functions for dealing with TPLink devices.  Based on code
4 written by Lubomir Stroetmann and Copyright 2016 softScheck GmbH which
5 was covered by the Apache 2.0 license:
6
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
10
11      http://www.apache.org/licenses/LICENSE-2.0
12
13 Modifications by Scott Gasch Copyright 2020-2022 also released under
14 the Apache 2.0 license as required by the license (see above).
15
16 Unless required by applicable law or agreed to in writing, software
17 distributed under the License is distributed on an "AS IS" BASIS,
18 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19 See the License for the specific language governing permissions and
20 limitations under the License.
21
22 """
23
24 import json
25 import logging
26 import os
27 import re
28 import socket
29 import subprocess
30 import sys
31 import time
32 from struct import pack
33 from typing import Dict, List, Optional, Tuple
34
35 import logging_utils
36 import string_utils
37 from decorator_utils import timeout
38
39 logger = logging.getLogger(__name__)
40
41 commands = {
42     "info": '{"system":{"get_sysinfo":{}}}',
43     "on": '{"system":{"set_relay_state":{"state":1}}}',
44     "off": '{"system":{"set_relay_state":{"state":0}}}',
45     "offon": '{"system":{"set_relay_state":{"state":0}}};wait;{"system":{"set_relay_state":{"state":1}}}',
46     "onoff": '{"system":{"set_relay_state":{"state":1}}};wait;{"system":{"set_relay_state":{"state":0}}}',
47     "cloudinfo": '{"cnCloud":{"get_info":{}}}',
48     "wlanscan": '{"netif":{"get_scaninfo":{"refresh":0}}}',
49     "time": '{"time":{"get_time":{}}}',
50     "schedule": '{"schedule":{"get_rules":{}}}',
51     "countdown": '{"count_down":{"get_rules":{}}}',
52     "antitheft": '{"anti_theft":{"get_rules":{}}}',
53     "reboot": '{"system":{"reboot":{"delay":1}}}',
54     "reset": '{"system":{"reset":{"delay":1}}}',
55     "energy": '{"emeter":{"get_realtime":{}}}',
56 }
57
58
59 @timeout(10.0, use_signals=False, error_message="Timed out waiting for tplink.py")
60 def tplink_command(command: str) -> bool:
61     result = os.system(command)
62     signal = result & 0xFF
63     if signal != 0:
64         msg = f'{command} died with signal {signal}'
65         logger.warning(msg)
66         logging_utils.hlog(msg)
67         return False
68     else:
69         exit_value = result >> 8
70         if exit_value != 0:
71             msg = f'{command} failed, exited {exit_value}'
72             logger.warning(msg)
73             logging_utils.hlog(msg)
74             return False
75     logger.debug('%s succeeded.', command)
76     return True
77
78
79 @timeout(10.0, use_signals=False, error_message="Timed out waiting for tplink.py")
80 def tplink_get_info(cmd: str) -> Optional[Dict]:
81     logger.debug('Getting tplink device status via "%s"', cmd)
82     try:
83         out = subprocess.getoutput(cmd)
84         logger.debug('RAW OUT> %s', out)
85         out = re.sub("Sent:.*\n", "", out)
86         out = re.sub("Received: *", "", out)
87         info = json.loads(out)["system"]["get_sysinfo"]
88         logger.debug("%s", json.dumps(info, indent=4, sort_keys=True))
89         return info
90     except Exception as e:
91         logger.exception(e)
92         print(out, file=sys.stderr)
93         return None
94
95
96 def encrypt(string: str) -> bytes:
97     """Encryption and Decryption of TP-Link Smart Home Protocol.  Note:
98     this introduces one dword (4 bytes) or crap in the front of the
99     message, the 171 key.
100
101     XOR Autokey Cipher with starting key = 171
102
103     >>> " ".join(hex(b).replace('0x', '') for b in encrypt('{"system":{"get_sysinfo":{}}}'))
104     '0 0 0 1d d0 f2 81 f8 8b ff 9a f7 d5 ef 94 b6 d1 b4 c0 9f ec 95 e6 8f e1 87 e8 ca f0 8b f6 8b f6'
105
106     """
107     key = 171
108     result = pack(">I", len(string))
109     for i in string:
110         a = key ^ ord(i)
111         key = a
112         result += bytes([a])
113     return result
114
115
116 def decrypt(string: bytes) -> str:
117     """Opposite of encrypt (above).  Note: encrypt introduces 4 bytes of
118     crap in the front of the message.
119
120     >>> b = encrypt('hi, mom')
121     >>> s = decrypt(b[4:])
122     >>> s
123     'hi, mom'
124
125     """
126     key = 171
127     result = ""
128     for i in string:
129         a = key ^ i
130         key = i
131         result += chr(a)
132     return result
133
134
135 @timeout(10, error_message="Timed out comunicating with device.")
136 def communicate_with_device(
137     ip: str,
138     port: int,
139     cmd: str,
140     *,
141     quiet: bool = False,
142     brief: bool = True,
143 ) -> Tuple[bool, List[str]]:
144     """Given an IP address and port, open a socket, encrypt cmd, and sent it to
145     the device.  Read a response, decrypt it and parse it.
146
147     """
148     if string_utils.is_none_or_empty(ip) or not string_utils.is_ip_v4(ip):
149         raise ValueError(f"Invalid IP address. ({ip})")
150     if string_utils.is_none_or_empty(cmd):
151         raise ValueError(f"Invalid cmd ({cmd}).")
152     all_responses = []
153
154     try:
155         sock_tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
156         sock_tcp.connect((ip, port))
157         logger.debug('Connected to %s:%s; running %s...', ip, port, cmd)
158         for c in cmd.split(";"):
159             if c == "wait":
160                 if not quiet and not brief:
161                     print("Sleeping 3000ms...")
162                 time.sleep(3.0)
163                 continue
164
165             raw_request = c
166             encrypted_raw_request = encrypt(raw_request)
167             sock_tcp.send(encrypted_raw_request)
168             raw_response = sock_tcp.recv(2048)
169             decrypted_raw_response = decrypt(raw_response[4:])
170             all_responses.append(decrypted_raw_response)
171
172             if not quiet:
173                 json_request = json.loads(raw_request)
174                 json_request = json.dumps(json_request, sort_keys=True, indent=4)
175                 print(f'Sent: "{json_request}"')
176                 if not brief:
177                     raw = ''
178                     for b in encrypted_raw_request:
179                         raw += '%02X ' % b
180                     logger.debug('Sent raw: "%s"', raw)
181
182                 # Note: 4 bytes of garbage (the key)
183                 json_response = json.loads(decrypted_raw_response)
184                 json_response = json.dumps(json_response, sort_keys=True, indent=4)
185                 print(f'Received: "{json_response}"')
186                 if not brief:
187                     raw = ''
188                     for b in raw_response:
189                         raw += '%02X ' % b
190                     logger.debug('Received raw: "%s"', raw)
191
192             if '"err_code":0' not in decrypted_raw_response:
193                 if '"err_code": 0' not in decrypted_raw_response:
194                     logger.error("Did not see clean err_code in response?!")
195                     return (False, all_responses)
196         logger.debug('All commands succeeded, returning True.')
197         return (True, all_responses)
198     except socket.error:
199         logger.error("Cound not connect to host %s:%s", ip, port)
200         return (False, all_responses)
201     finally:
202         sock_tcp.close()
203
204
205 if __name__ == '__main__':
206     import doctest
207
208     doctest.testmod()