Since this thing is on the innerwebs I suppose it should have a
[python_utils.git] / smart_home / tplink_utils.py
index 9b8eb6da926e5325cbf1da79e7f45a67a725b2ec..abf4970c3c9593f76b641114716fcff88a782627 100644 (file)
@@ -10,7 +10,7 @@ You may obtain a copy of the License at
 
      http://www.apache.org/licenses/LICENSE-2.0
 
 
      http://www.apache.org/licenses/LICENSE-2.0
 
-Modifications by Scott Gasch Copyright 2020-2022 also released under
+Modifications by Scott Gasch Copyright © 2020-2022 also released under
 the Apache 2.0 license as required by the license (see above).
 
 Unless required by applicable law or agreed to in writing, software
 the Apache 2.0 license as required by the license (see above).
 
 Unless required by applicable law or agreed to in writing, software
@@ -24,10 +24,7 @@ limitations under the License.
 import json
 import logging
 import os
 import json
 import logging
 import os
-import re
 import socket
 import socket
-import subprocess
-import sys
 import time
 from struct import pack
 from typing import Dict, List, Optional, Tuple
 import time
 from struct import pack
 from typing import Dict, List, Optional, Tuple
@@ -57,7 +54,7 @@ commands = {
 
 
 @timeout(10.0, use_signals=False, error_message="Timed out waiting for tplink.py")
 
 
 @timeout(10.0, use_signals=False, error_message="Timed out waiting for tplink.py")
-def tplink_command(command: str) -> bool:
+def tplink_command_wrapper(command: str) -> bool:
     result = os.system(command)
     signal = result & 0xFF
     if signal != 0:
     result = os.system(command)
     signal = result & 0xFF
     if signal != 0:
@@ -76,21 +73,14 @@ def tplink_command(command: str) -> bool:
     return True
 
 
     return True
 
 
-@timeout(10.0, use_signals=False, error_message="Timed out waiting for tplink.py")
-def tplink_get_info(cmd: str) -> Optional[Dict]:
-    logger.debug('Getting tplink device status via "%s"', cmd)
-    try:
-        out = subprocess.getoutput(cmd)
-        logger.debug('RAW OUT> %s', out)
-        out = re.sub("Sent:.*\n", "", out)
-        out = re.sub("Received: *", "", out)
-        info = json.loads(out)["system"]["get_sysinfo"]
+def tplink_get_info(ip: str, port: int = 9999) -> Optional[Dict]:
+    success, response = communicate_with_device(ip, port, commands['info'], quiet=True)
+    if success:
+        assert len(response) == 1
+        info = json.loads(response[0])["system"]["get_sysinfo"]
         logger.debug("%s", json.dumps(info, indent=4, sort_keys=True))
         return info
         logger.debug("%s", json.dumps(info, indent=4, sort_keys=True))
         return info
-    except Exception as e:
-        logger.exception(e)
-        print(out, file=sys.stderr)
-        return None
+    return None
 
 
 def encrypt(string: str) -> bytes:
 
 
 def encrypt(string: str) -> bytes:
@@ -176,7 +166,7 @@ def communicate_with_device(
                 if not brief:
                     raw = ''
                     for b in encrypted_raw_request:
                 if not brief:
                     raw = ''
                     for b in encrypted_raw_request:
-                        raw += '%02X ' % b
+                        raw += f'{b:02X} '
                     logger.debug('Sent raw: "%s"', raw)
 
                 # Note: 4 bytes of garbage (the key)
                     logger.debug('Sent raw: "%s"', raw)
 
                 # Note: 4 bytes of garbage (the key)
@@ -186,14 +176,16 @@ def communicate_with_device(
                 if not brief:
                     raw = ''
                     for b in raw_response:
                 if not brief:
                     raw = ''
                     for b in raw_response:
-                        raw += '%02X ' % b
+                        raw += f'{b:02X} '
                     logger.debug('Received raw: "%s"', raw)
 
                     logger.debug('Received raw: "%s"', raw)
 
-            if '"err_code":0' not in decrypted_raw_response:
-                if '"err_code": 0' not in decrypted_raw_response:
-                    logger.error("Did not see clean err_code in response?!")
-                    return (False, all_responses)
-        logger.debug('All commands succeeded, returning True.')
+            if (
+                '"err_code":0' not in decrypted_raw_response
+                and '"err_code": 0' not in decrypted_raw_response
+            ):
+                logger.error("Did not see clean err_code in response?!")
+                return (False, all_responses)
+        logger.debug('All commands succeeded.')
         return (True, all_responses)
     except socket.error:
         logger.error("Cound not connect to host %s:%s", ip, port)
         return (True, all_responses)
     except socket.error:
         logger.error("Cound not connect to host %s:%s", ip, port)