+@timeout(10, error_message="Timed out comunicating with device.")
+def communicate_with_device(
+ ip: str,
+ port: int,
+ cmd: str,
+ *,
+ quiet: bool = False,
+ brief: bool = True,
+) -> Tuple[bool, List[str]]:
+ """Given an IP address and port, open a socket, encrypt cmd, and sent it to
+ the device. Read a response, decrypt it and parse it.
+
+ """
+ if string_utils.is_none_or_empty(ip) or not string_utils.is_ip_v4(ip):
+ raise ValueError(f"Invalid IP address. ({ip})")
+ if string_utils.is_none_or_empty(cmd):
+ raise ValueError(f"Invalid cmd ({cmd}).")
+ all_responses = []
+
+ try:
+ sock_tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ sock_tcp.connect((ip, port))
+ logger.debug('Connected to %s:%s; running %s...', ip, port, cmd)
+ for c in cmd.split(";"):
+ if c == "wait":
+ if not quiet and not brief:
+ print("Sleeping 3000ms...")
+ time.sleep(3.0)
+ continue
+
+ raw_request = c
+ encrypted_raw_request = encrypt(raw_request)
+ sock_tcp.send(encrypted_raw_request)
+ raw_response = sock_tcp.recv(2048)
+ decrypted_raw_response = decrypt(raw_response[4:])
+ all_responses.append(decrypted_raw_response)
+
+ if not quiet:
+ json_request = json.loads(raw_request)
+ json_request = json.dumps(json_request, sort_keys=True, indent=4)
+ print(f'Sent: "{json_request}"')
+ if not brief:
+ raw = ''
+ for b in encrypted_raw_request:
+ raw += f'{b:02X} '
+ logger.debug('Sent raw: "%s"', raw)
+
+ # Note: 4 bytes of garbage (the key)
+ json_response = json.loads(decrypted_raw_response)
+ json_response = json.dumps(json_response, sort_keys=True, indent=4)
+ print(f'Received: "{json_response}"')
+ if not brief:
+ raw = ''
+ for b in raw_response:
+ raw += f'{b:02X} '
+ logger.debug('Received raw: "%s"', raw)
+
+ if (
+ '"err_code":0' not in decrypted_raw_response
+ and '"err_code": 0' not in decrypted_raw_response
+ ):
+ logger.error("Did not see clean err_code in response?!")
+ return (False, all_responses)
+ logger.debug('All commands succeeded.')
+ return (True, all_responses)
+ except socket.error:
+ logger.error("Cound not connect to host %s:%s", ip, port)
+ return (False, all_responses)
+ finally:
+ sock_tcp.close()
+
+