More mypy cleanup.
[python_utils.git] / exec_utils.py
1 #!/usr/bin/env python3
2
3 import atexit
4 import logging
5 import os
6 import selectors
7 import shlex
8 import subprocess
9 import sys
10 from typing import List, Optional
11
12
13 logger = logging.getLogger(__file__)
14
15
16 def cmd_showing_output(
17     command: str,
18 ) -> int:
19     """Kick off a child process.  Capture and print all output that it
20     produces on stdout and stderr.  Wait for the subprocess to exit
21     and return the exit value as the return code of this function.
22
23     """
24     line_enders = set([b'\n', b'\r'])
25     sel = selectors.DefaultSelector()
26     with subprocess.Popen(
27         command,
28         shell=True,
29         bufsize=0,
30         stdout=subprocess.PIPE,
31         stderr=subprocess.PIPE,
32         universal_newlines=False,
33     ) as p:
34         sel.register(p.stdout, selectors.EVENT_READ)  # type: ignore
35         sel.register(p.stderr, selectors.EVENT_READ)  # type: ignore
36         done = False
37         while not done:
38             for key, _ in sel.select():
39                 char = key.fileobj.read(1)  # type: ignore
40                 if not char:
41                     sel.unregister(key.fileobj)
42                     if len(sel.get_map()) == 0:
43                         sys.stdout.flush()
44                         sys.stderr.flush()
45                         sel.close()
46                         done = True
47                 if key.fileobj is p.stdout:
48                     # sys.stdout.buffer.write(char)
49                     os.write(sys.stdout.fileno(), char)
50                     if char in line_enders:
51                         sys.stdout.flush()
52                 else:
53                     # sys.stderr.buffer.write(char)
54                     os.write(sys.stderr.fileno(), char)
55                     if char in line_enders:
56                         sys.stderr.flush()
57         p.wait()
58         return p.returncode
59
60
61 def cmd_with_timeout(command: str, timeout_seconds: Optional[float]) -> int:
62     """Run a command but do not let it run for more than timeout seconds.
63     Doesn't capture or rebroadcast command output.  Function returns
64     the exit value of the command or raises a TimeoutExpired exception
65     if the deadline is exceeded.
66
67     >>> cmd_with_timeout('/bin/echo foo', 10.0)
68     0
69
70     >>> cmd_with_timeout('/bin/sleep 2', 0.1)
71     Traceback (most recent call last):
72     ...
73     subprocess.TimeoutExpired: Command '['/bin/bash', '-c', '/bin/sleep 2']' timed out after 0.1 seconds
74
75     """
76     return subprocess.check_call(["/bin/bash", "-c", command], timeout=timeout_seconds)
77
78
79 def cmd(command: str, timeout_seconds: Optional[float] = None) -> str:
80     """Run a command and capture its output to stdout (only) in a string.
81     Return that string as this function's output.  Raises
82     subprocess.CalledProcessError or TimeoutExpired on error.
83
84     >>> cmd('/bin/echo foo')[:-1]
85     'foo'
86
87     >>> cmd('/bin/sleep 2', 0.1)
88     Traceback (most recent call last):
89     ...
90     subprocess.TimeoutExpired: Command '/bin/sleep 2' timed out after 0.1 seconds
91
92     """
93     ret = subprocess.run(
94         command,
95         shell=True,
96         capture_output=True,
97         check=True,
98         timeout=timeout_seconds,
99     ).stdout
100     return ret.decode("utf-8")
101
102
103 def run_silently(command: str, timeout_seconds: Optional[float] = None) -> None:
104     """Run a command silently but raise subprocess.CalledProcessError if
105     it fails.
106
107     >>> run_silently("/usr/bin/true")
108
109     >>> run_silently("/usr/bin/false")
110     Traceback (most recent call last):
111     ...
112     subprocess.CalledProcessError: Command '/usr/bin/false' returned non-zero exit status 1.
113
114     """
115     subprocess.run(
116         command,
117         shell=True,
118         stderr=subprocess.DEVNULL,
119         stdout=subprocess.DEVNULL,
120         capture_output=False,
121         check=True,
122         timeout=timeout_seconds,
123     )
124
125
126 def cmd_in_background(command: str, *, silent: bool = False) -> subprocess.Popen:
127     args = shlex.split(command)
128     if silent:
129         subproc = subprocess.Popen(
130             args,
131             stdin=subprocess.DEVNULL,
132             stdout=subprocess.DEVNULL,
133             stderr=subprocess.DEVNULL,
134         )
135     else:
136         subproc = subprocess.Popen(args, stdin=subprocess.DEVNULL)
137
138     def kill_subproc() -> None:
139         try:
140             if subproc.poll() is None:
141                 logger.info(f'At exit handler: killing {subproc} ({command})')
142                 subproc.terminate()
143                 subproc.wait(timeout=10.0)
144         except BaseException as be:
145             logger.exception(be)
146
147     atexit.register(kill_subproc)
148     return subproc
149
150
151 def cmd_list(command: List[str]) -> str:
152     """Run a command with args encapsulated in a list and return the
153     output text as a string.  Raises subprocess.CalledProcessError.
154     """
155     ret = subprocess.run(command, capture_output=True, check=True).stdout
156     return ret.decode("utf-8")
157
158
159 if __name__ == '__main__':
160     import doctest
161
162     doctest.testmod()