Since this thing is on the innerwebs I suppose it should have a
[python_utils.git] / smart_home / tplink_utils.py
index 9b8eb6da926e5325cbf1da79e7f45a67a725b2ec..abf4970c3c9593f76b641114716fcff88a782627 100644 (file)
@@ -10,7 +10,7 @@ You may obtain a copy of the License at
 
      http://www.apache.org/licenses/LICENSE-2.0
 
-Modifications by Scott Gasch Copyright 2020-2022 also released under
+Modifications by Scott Gasch Copyright © 2020-2022 also released under
 the Apache 2.0 license as required by the license (see above).
 
 Unless required by applicable law or agreed to in writing, software
@@ -24,10 +24,7 @@ limitations under the License.
 import json
 import logging
 import os
-import re
 import socket
-import subprocess
-import sys
 import time
 from struct import pack
 from typing import Dict, List, Optional, Tuple
@@ -57,7 +54,7 @@ commands = {
 
 
 @timeout(10.0, use_signals=False, error_message="Timed out waiting for tplink.py")
-def tplink_command(command: str) -> bool:
+def tplink_command_wrapper(command: str) -> bool:
     result = os.system(command)
     signal = result & 0xFF
     if signal != 0:
@@ -76,21 +73,14 @@ def tplink_command(command: str) -> bool:
     return True
 
 
-@timeout(10.0, use_signals=False, error_message="Timed out waiting for tplink.py")
-def tplink_get_info(cmd: str) -> Optional[Dict]:
-    logger.debug('Getting tplink device status via "%s"', cmd)
-    try:
-        out = subprocess.getoutput(cmd)
-        logger.debug('RAW OUT> %s', out)
-        out = re.sub("Sent:.*\n", "", out)
-        out = re.sub("Received: *", "", out)
-        info = json.loads(out)["system"]["get_sysinfo"]
+def tplink_get_info(ip: str, port: int = 9999) -> Optional[Dict]:
+    success, response = communicate_with_device(ip, port, commands['info'], quiet=True)
+    if success:
+        assert len(response) == 1
+        info = json.loads(response[0])["system"]["get_sysinfo"]
         logger.debug("%s", json.dumps(info, indent=4, sort_keys=True))
         return info
-    except Exception as e:
-        logger.exception(e)
-        print(out, file=sys.stderr)
-        return None
+    return None
 
 
 def encrypt(string: str) -> bytes:
@@ -176,7 +166,7 @@ def communicate_with_device(
                 if not brief:
                     raw = ''
                     for b in encrypted_raw_request:
-                        raw += '%02X ' % b
+                        raw += f'{b:02X} '
                     logger.debug('Sent raw: "%s"', raw)
 
                 # Note: 4 bytes of garbage (the key)
@@ -186,14 +176,16 @@ def communicate_with_device(
                 if not brief:
                     raw = ''
                     for b in raw_response:
-                        raw += '%02X ' % b
+                        raw += f'{b:02X} '
                     logger.debug('Received raw: "%s"', raw)
 
-            if '"err_code":0' not in decrypted_raw_response:
-                if '"err_code": 0' not in decrypted_raw_response:
-                    logger.error("Did not see clean err_code in response?!")
-                    return (False, all_responses)
-        logger.debug('All commands succeeded, returning True.')
+            if (
+                '"err_code":0' not in decrypted_raw_response
+                and '"err_code": 0' not in decrypted_raw_response
+            ):
+                logger.error("Did not see clean err_code in response?!")
+                return (False, all_responses)
+        logger.debug('All commands succeeded.')
         return (True, all_responses)
     except socket.error:
         logger.error("Cound not connect to host %s:%s", ip, port)