#!/usr/bin/env python3
+import atexit
+import logging
+import selectors
import shlex
import subprocess
-from typing import List
+import sys
+from typing import List, Optional
-def cmd_with_timeout(command: str, timeout_seconds: float) -> int:
+logger = logging.getLogger(__file__)
+
+
+def cmd_showing_output(command: str, ) -> int:
+ """Kick off a child process. Capture and print all output that it
+ produces on stdout and stderr. Wait for the subprocess to exit
+ and return the exit value as the return code of this function.
+
+ """
+ line_enders = set([b'\n', b'\r'])
+ p = subprocess.Popen(
+ command,
+ shell=True,
+ bufsize=0,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ universal_newlines=False,
+ )
+ sel = selectors.DefaultSelector()
+ sel.register(p.stdout, selectors.EVENT_READ)
+ sel.register(p.stderr, selectors.EVENT_READ)
+ stream_ends = 0
+ while stream_ends < 2:
+ for key, _ in sel.select():
+ char = key.fileobj.read(1)
+ if not char:
+ stream_ends += 1
+ continue
+ if key.fileobj is p.stdout:
+ sys.stdout.buffer.write(char)
+ if char in line_enders:
+ sys.stdout.flush()
+ else:
+ sys.stderr.buffer.write(char)
+ if char in line_enders:
+ sys.stderr.flush()
+ p.wait()
+ sys.stdout.flush()
+ sys.stderr.flush()
+ return p.returncode
+
+
+def cmd_with_timeout(command: str, timeout_seconds: Optional[float]) -> int:
+ """Run a command but do not let it run for more than timeout seconds.
+ Doesn't capture or rebroadcast command output. Function returns
+ the exit value of the command or raises a TimeoutExpired exception
+ if the deadline is exceeded.
+
+ >>> cmd_with_timeout('/bin/echo foo', 10.0)
+ 0
+
+ >>> cmd_with_timeout('/bin/sleep 2', 0.1)
+ Traceback (most recent call last):
+ ...
+ subprocess.TimeoutExpired: Command '['/bin/bash', '-c', '/bin/sleep 2']' timed out after 0.1 seconds
+
+ """
return subprocess.check_call(
["/bin/bash", "-c", command], timeout=timeout_seconds
)
-def cmd(command: str) -> str:
- """Run a command with everything encased in a string and return
- the output text as a string. Raises subprocess.CalledProcessError.
+def cmd(command: str, timeout_seconds: Optional[float] = None) -> str:
+ """Run a command and capture its output to stdout (only) in a string.
+ Return that string as this function's output. Raises
+ subprocess.CalledProcessError or TimeoutExpired on error.
+
+ >>> cmd('/bin/echo foo')[:-1]
+ 'foo'
+
+ >>> cmd('/bin/sleep 2', 0.1)
+ Traceback (most recent call last):
+ ...
+ subprocess.TimeoutExpired: Command '/bin/sleep 2' timed out after 0.1 seconds
+
"""
ret = subprocess.run(
- command, shell=True, capture_output=True, check=True
+ command,
+ shell=True,
+ capture_output=True,
+ check=True,
+ timeout=timeout_seconds,
).stdout
return ret.decode("utf-8")
-def run_silently(command: str) -> None:
+def run_silently(command: str, timeout_seconds: Optional[float] = None) -> None:
"""Run a command silently but raise subprocess.CalledProcessError if
- it fails."""
+ it fails.
+
+ >>> run_silently("/usr/bin/true")
+
+ >>> run_silently("/usr/bin/false")
+ Traceback (most recent call last):
+ ...
+ subprocess.CalledProcessError: Command '/usr/bin/false' returned non-zero exit status 1.
+
+ """
subprocess.run(
- command, shell=True, stderr=subprocess.DEVNULL,
- stdout=subprocess.DEVNULL, capture_output=False, check=True
+ command,
+ shell=True,
+ stderr=subprocess.DEVNULL,
+ stdout=subprocess.DEVNULL,
+ capture_output=False,
+ check=True,
+ timeout=timeout_seconds,
)
) -> subprocess.Popen:
args = shlex.split(command)
if silent:
- return subprocess.Popen(args,
- stdin=subprocess.DEVNULL,
- stdout=subprocess.DEVNULL,
- stderr=subprocess.DEVNULL)
+ subproc = subprocess.Popen(args,
+ stdin=subprocess.DEVNULL,
+ stdout=subprocess.DEVNULL,
+ stderr=subprocess.DEVNULL)
else:
- return subprocess.Popen(args,
- stdin=subprocess.DEVNULL)
+ subproc = subprocess.Popen(args, stdin=subprocess.DEVNULL)
+
+ def kill_subproc() -> None:
+ try:
+ if subproc.poll() is None:
+ logger.info("At exit handler: killing {}: {}".format(subproc, command))
+ subproc.terminate()
+ subproc.wait(timeout=10.0)
+ except BaseException as be:
+ logger.exception(be)
+ atexit.register(kill_subproc)
+ return subproc
def cmd_list(command: List[str]) -> str:
"""
ret = subprocess.run(command, capture_output=True, check=True).stdout
return ret.decode("utf-8")
+
+
+if __name__ == '__main__':
+ import doctest
+ doctest.testmod()