Fix a threading related bug in tplink_utils.
[python_utils.git] / smart_home / tplink_utils.py
1 #!/usr/bin/env python3
2
3 """Wrapper functions for dealing with TPLink devices.  Based on code
4 written by Lubomir Stroetmann and Copyright 2016 softScheck GmbH which
5 was covered by the Apache 2.0 license:
6
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
10
11      http://www.apache.org/licenses/LICENSE-2.0
12
13 Modifications by Scott Gasch Copyright © 2020-2022 also released under
14 the Apache 2.0 license as required by the license (see above).
15
16 Unless required by applicable law or agreed to in writing, software
17 distributed under the License is distributed on an "AS IS" BASIS,
18 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19 See the License for the specific language governing permissions and
20 limitations under the License.
21
22 """
23
24 import json
25 import logging
26 import os
27 import socket
28 import time
29 from struct import pack
30 from typing import Dict, List, Optional, Tuple
31
32 import logging_utils
33 import string_utils
34 from decorator_utils import timeout
35
36 logger = logging.getLogger(__name__)
37
38 commands = {
39     "info": '{"system":{"get_sysinfo":{}}}',
40     "on": '{"system":{"set_relay_state":{"state":1}}}',
41     "off": '{"system":{"set_relay_state":{"state":0}}}',
42     "offon": '{"system":{"set_relay_state":{"state":0}}};wait;{"system":{"set_relay_state":{"state":1}}}',
43     "onoff": '{"system":{"set_relay_state":{"state":1}}};wait;{"system":{"set_relay_state":{"state":0}}}',
44     "cloudinfo": '{"cnCloud":{"get_info":{}}}',
45     "wlanscan": '{"netif":{"get_scaninfo":{"refresh":0}}}',
46     "time": '{"time":{"get_time":{}}}',
47     "schedule": '{"schedule":{"get_rules":{}}}',
48     "countdown": '{"count_down":{"get_rules":{}}}',
49     "antitheft": '{"anti_theft":{"get_rules":{}}}',
50     "reboot": '{"system":{"reboot":{"delay":1}}}',
51     "reset": '{"system":{"reset":{"delay":1}}}',
52     "energy": '{"emeter":{"get_realtime":{}}}',
53 }
54
55
56 @timeout(10.0, use_signals=False, error_message="Timed out waiting for tplink.py")
57 def tplink_command_wrapper(command: str) -> bool:
58     result = os.system(command)
59     signal = result & 0xFF
60     if signal != 0:
61         msg = f'{command} died with signal {signal}'
62         logger.warning(msg)
63         logging_utils.hlog(msg)
64         return False
65     else:
66         exit_value = result >> 8
67         if exit_value != 0:
68             msg = f'{command} failed, exited {exit_value}'
69             logger.warning(msg)
70             logging_utils.hlog(msg)
71             return False
72     logger.debug('%s succeeded.', command)
73     return True
74
75
76 def tplink_get_info(ip: str, port: int = 9999) -> Optional[Dict]:
77     success, response = communicate_with_device(ip, port, commands['info'], quiet=True)
78     if success:
79         assert len(response) == 1
80         info = json.loads(response[0])["system"]["get_sysinfo"]
81         logger.debug("%s", json.dumps(info, indent=4, sort_keys=True))
82         return info
83     return None
84
85
86 def encrypt(string: str) -> bytes:
87     """Encryption and Decryption of TP-Link Smart Home Protocol.  Note:
88     this introduces one dword (4 bytes) or crap in the front of the
89     message, the 171 key.
90
91     XOR Autokey Cipher with starting key = 171
92
93     >>> " ".join(hex(b).replace('0x', '') for b in encrypt('{"system":{"get_sysinfo":{}}}'))
94     '0 0 0 1d d0 f2 81 f8 8b ff 9a f7 d5 ef 94 b6 d1 b4 c0 9f ec 95 e6 8f e1 87 e8 ca f0 8b f6 8b f6'
95
96     """
97     key = 171
98     result = pack(">I", len(string))
99     for i in string:
100         a = key ^ ord(i)
101         key = a
102         result += bytes([a])
103     return result
104
105
106 def decrypt(string: bytes) -> str:
107     """Opposite of encrypt (above).  Note: encrypt introduces 4 bytes of
108     crap in the front of the message.
109
110     >>> b = encrypt('hi, mom')
111     >>> s = decrypt(b[4:])
112     >>> s
113     'hi, mom'
114
115     """
116     key = 171
117     result = ""
118     for i in string:
119         a = key ^ i
120         key = i
121         result += chr(a)
122     return result
123
124
125 @timeout(10, use_signals=False, error_message="Timed out comunicating with device.")
126 def communicate_with_device(
127     ip: str,
128     port: int,
129     cmd: str,
130     *,
131     quiet: bool = False,
132     brief: bool = True,
133 ) -> Tuple[bool, List[str]]:
134     """Given an IP address and port, open a socket, encrypt cmd, and sent it to
135     the device.  Read a response, decrypt it and parse it.
136
137     """
138     if string_utils.is_none_or_empty(ip) or not string_utils.is_ip_v4(ip):
139         raise ValueError(f"Invalid IP address. ({ip})")
140     if string_utils.is_none_or_empty(cmd):
141         raise ValueError(f"Invalid cmd ({cmd}).")
142     all_responses = []
143
144     try:
145         sock_tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
146         sock_tcp.connect((ip, port))
147         logger.debug('Connected to %s:%s; running %s...', ip, port, cmd)
148         for c in cmd.split(";"):
149             if c == "wait":
150                 if not quiet and not brief:
151                     print("Sleeping 3000ms...")
152                 time.sleep(3.0)
153                 continue
154
155             raw_request = c
156             encrypted_raw_request = encrypt(raw_request)
157             sock_tcp.send(encrypted_raw_request)
158             raw_response = sock_tcp.recv(2048)
159             decrypted_raw_response = decrypt(raw_response[4:])
160             all_responses.append(decrypted_raw_response)
161
162             if not quiet:
163                 json_request = json.loads(raw_request)
164                 json_request = json.dumps(json_request, sort_keys=True, indent=4)
165                 print(f'Sent: "{json_request}"')
166                 if not brief:
167                     raw = ''
168                     for b in encrypted_raw_request:
169                         raw += f'{b:02X} '
170                     logger.debug('Sent raw: "%s"', raw)
171
172                 # Note: 4 bytes of garbage (the key)
173                 json_response = json.loads(decrypted_raw_response)
174                 json_response = json.dumps(json_response, sort_keys=True, indent=4)
175                 print(f'Received: "{json_response}"')
176                 if not brief:
177                     raw = ''
178                     for b in raw_response:
179                         raw += f'{b:02X} '
180                     logger.debug('Received raw: "%s"', raw)
181
182             if (
183                 '"err_code":0' not in decrypted_raw_response
184                 and '"err_code": 0' not in decrypted_raw_response
185             ):
186                 logger.error("Did not see clean err_code in response?!")
187                 return (False, all_responses)
188         logger.debug('All commands succeeded.')
189         return (True, all_responses)
190     except socket.error:
191         logger.error("Cound not connect to host %s:%s", ip, port)
192         return (False, all_responses)
193     finally:
194         sock_tcp.close()
195
196
197 if __name__ == '__main__':
198     import doctest
199
200     doctest.testmod()