From: Scott Gasch Date: Sat, 4 Jun 2022 16:06:07 +0000 (-0700) Subject: Add timeout_seconds to cmd_showing_output. X-Git-Url: https://wannabe.guru.org/gitweb/?a=commitdiff_plain;h=e8cfdd16192b40c460a620cf04e2667b3020e5ac;p=python_utils.git Add timeout_seconds to cmd_showing_output. --- diff --git a/exec_utils.py b/exec_utils.py index 51aaeb4..49484c6 100644 --- a/exec_utils.py +++ b/exec_utils.py @@ -18,6 +18,8 @@ logger = logging.getLogger(__file__) def cmd_showing_output( command: str, + *, + timeout_seconds: Optional[float] = None, ) -> int: """Kick off a child process. Capture and emit all output that it produces on stdout and stderr in a character by character manner @@ -27,15 +29,22 @@ def cmd_showing_output( Args: command: the command to execute + timeout_seconds: terminate the subprocess if it takes longer + than N seconds; None means to wait as long as it takes. Returns: the exit status of the subprocess once the subprocess has - exited + exited. Raises TimeoutExpired after killing the subprocess + if the timeout expires. Side effects: prints all output of the child process (stdout or stderr) """ + def timer_expired(p): + p.kill() + raise subprocess.TimeoutExpired(command, timeout_seconds) + line_enders = set([b'\n', b'\r']) sel = selectors.DefaultSelector() with subprocess.Popen( @@ -46,28 +55,38 @@ def cmd_showing_output( stderr=subprocess.PIPE, universal_newlines=False, ) as p: - sel.register(p.stdout, selectors.EVENT_READ) # type: ignore - sel.register(p.stderr, selectors.EVENT_READ) # type: ignore - done = False - while not done: - for key, _ in sel.select(): - char = key.fileobj.read(1) # type: ignore - if not char: - sel.unregister(key.fileobj) - if len(sel.get_map()) == 0: - sys.stdout.flush() - sys.stderr.flush() - sel.close() - done = True - if key.fileobj is p.stdout: - os.write(sys.stdout.fileno(), char) - if char in line_enders: - sys.stdout.flush() - else: - os.write(sys.stderr.fileno(), char) - if char in line_enders: - sys.stderr.flush() - p.wait() + timer = None + if timeout_seconds: + import threading + + timer = threading.Timer(timeout_seconds, timer_expired(p)) + timer.start() + try: + sel.register(p.stdout, selectors.EVENT_READ) # type: ignore + sel.register(p.stderr, selectors.EVENT_READ) # type: ignore + done = False + while not done: + for key, _ in sel.select(): + char = key.fileobj.read(1) # type: ignore + if not char: + sel.unregister(key.fileobj) + if len(sel.get_map()) == 0: + sys.stdout.flush() + sys.stderr.flush() + sel.close() + done = True + if key.fileobj is p.stdout: + os.write(sys.stdout.fileno(), char) + if char in line_enders: + sys.stdout.flush() + else: + os.write(sys.stderr.fileno(), char) + if char in line_enders: + sys.stderr.flush() + p.wait() + finally: + if timer: + timer.cancel() return p.returncode diff --git a/tests/exec_utils_test.py b/tests/exec_utils_test.py index 4c003aa..11dda89 100755 --- a/tests/exec_utils_test.py +++ b/tests/exec_utils_test.py @@ -4,6 +4,7 @@ """exec_utils unittest.""" +import subprocess import unittest import exec_utils @@ -18,6 +19,14 @@ class TestExecUtils(unittest.TestCase): self.assertEqual(0, ret) record().close() + def test_cmd_showing_output_with_timeout(self): + try: + exec_utils.cmd_showing_output('sleep 10', timeout_seconds=0.1) + except subprocess.TimeoutExpired: + pass + else: + self.fail('Expected a TimeoutException, didn\'t see one.') + def test_cmd_showing_output_fails(self): with unittest_utils.RecordStdout() as record: ret = exec_utils.cmd_showing_output('/usr/bin/printf hello && false')