#!/usr/bin/env python3
+# © Copyright 2021-2022, Scott Gasch
+
+"""Helper methods concerned with executing subprocesses."""
+
import atexit
import logging
+import os
import selectors
import shlex
import subprocess
import sys
from typing import List, Optional
-
logger = logging.getLogger(__file__)
-def cmd_showing_output(command: str, ) -> int:
+def cmd_showing_output(
+ command: str,
+) -> int:
"""Kick off a child process. Capture and print all output that it
produces on stdout and stderr. Wait for the subprocess to exit
and return the exit value as the return code of this function.
"""
line_enders = set([b'\n', b'\r'])
- p = subprocess.Popen(
+ sel = selectors.DefaultSelector()
+ with subprocess.Popen(
command,
shell=True,
bufsize=0,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=False,
- )
- sel = selectors.DefaultSelector()
- sel.register(p.stdout, selectors.EVENT_READ)
- sel.register(p.stderr, selectors.EVENT_READ)
- stream_ends = 0
- while stream_ends < 2:
- for key, _ in sel.select():
- char = key.fileobj.read(1)
- if not char:
- stream_ends += 1
- continue
- if key.fileobj is p.stdout:
- sys.stdout.buffer.write(char)
- if char in line_enders:
- sys.stdout.flush()
- else:
- sys.stderr.buffer.write(char)
- if char in line_enders:
- sys.stderr.flush()
- p.wait()
- sys.stdout.flush()
- sys.stderr.flush()
- return p.returncode
+ ) as p:
+ sel.register(p.stdout, selectors.EVENT_READ) # type: ignore
+ sel.register(p.stderr, selectors.EVENT_READ) # type: ignore
+ done = False
+ while not done:
+ for key, _ in sel.select():
+ char = key.fileobj.read(1) # type: ignore
+ if not char:
+ sel.unregister(key.fileobj)
+ if len(sel.get_map()) == 0:
+ sys.stdout.flush()
+ sys.stderr.flush()
+ sel.close()
+ done = True
+ if key.fileobj is p.stdout:
+ # sys.stdout.buffer.write(char)
+ os.write(sys.stdout.fileno(), char)
+ if char in line_enders:
+ sys.stdout.flush()
+ else:
+ # sys.stderr.buffer.write(char)
+ os.write(sys.stderr.fileno(), char)
+ if char in line_enders:
+ sys.stderr.flush()
+ p.wait()
+ return p.returncode
def cmd_with_timeout(command: str, timeout_seconds: Optional[float]) -> int:
subprocess.TimeoutExpired: Command '['/bin/bash', '-c', '/bin/sleep 2']' timed out after 0.1 seconds
"""
- return subprocess.check_call(
- ["/bin/bash", "-c", command], timeout=timeout_seconds
- )
+ return subprocess.check_call(["/bin/bash", "-c", command], timeout=timeout_seconds)
def cmd(command: str, timeout_seconds: Optional[float] = None) -> str:
)
-def cmd_in_background(
- command: str, *, silent: bool = False
-) -> subprocess.Popen:
+def cmd_in_background(command: str, *, silent: bool = False) -> subprocess.Popen:
args = shlex.split(command)
if silent:
- subproc = subprocess.Popen(args,
- stdin=subprocess.DEVNULL,
- stdout=subprocess.DEVNULL,
- stderr=subprocess.DEVNULL)
+ subproc = subprocess.Popen(
+ args,
+ stdin=subprocess.DEVNULL,
+ stdout=subprocess.DEVNULL,
+ stderr=subprocess.DEVNULL,
+ )
else:
subproc = subprocess.Popen(args, stdin=subprocess.DEVNULL)
def kill_subproc() -> None:
try:
if subproc.poll() is None:
- logger.info("At exit handler: killing {}: {}".format(subproc, command))
+ logger.info('At exit handler: killing %s (%s)', subproc, command)
subproc.terminate()
subproc.wait(timeout=10.0)
except BaseException as be:
logger.exception(be)
+
atexit.register(kill_subproc)
return subproc
if __name__ == '__main__':
import doctest
+
doctest.testmod()