2cb4fede0fface6bbf65ad0e99449acabfbf68bd
[pyutils.git] / src / pyutils / exec_utils.py
1 #!/usr/bin/env python3
2
3 # © Copyright 2021-2023, Scott Gasch
4
5 """Helper methods concerned with executing subprocesses."""
6
7 import atexit
8 import logging
9 import os
10 import selectors
11 import shlex
12 import subprocess
13 import sys
14 from typing import List, Optional
15
16 logger = logging.getLogger(__file__)
17
18
19 def cmd_showing_output(
20     command: str,
21     *,
22     timeout_seconds: Optional[float] = None,
23 ) -> int:
24     """Kick off a child process.  Capture and emit all output that it
25     produces on stdout and stderr in a raw, character by character,
26     manner so that we don't have to wait on newlines.  This was done
27     to capture, for example, the output of a subprocess that creates
28     dots to show incremental progress on a task and render it
29     correctly.
30
31     Args:
32         command: the command to execute
33         timeout_seconds: terminate the subprocess if it takes longer
34             than N seconds; None means to wait as long as it takes.
35
36     Returns:
37         the exit status of the subprocess once the subprocess has
38         exited.  Raises `TimeoutExpired` after killing the subprocess
39         if the timeout expires.
40
41     Side effects:
42         prints all output of the child process (stdout or stderr)
43     """
44
45     def timer_expired(p):
46         p.kill()
47         raise subprocess.TimeoutExpired(command, timeout_seconds)
48
49     line_enders = set([b"\n", b"\r"])
50     sel = selectors.DefaultSelector()
51     with subprocess.Popen(
52         command,
53         shell=True,
54         bufsize=0,
55         stdout=subprocess.PIPE,
56         stderr=subprocess.PIPE,
57         universal_newlines=False,
58     ) as p:
59         timer = None
60         if timeout_seconds:
61             import threading
62
63             timer = threading.Timer(timeout_seconds, timer_expired(p))
64             timer.start()
65         try:
66             sel.register(p.stdout, selectors.EVENT_READ)  # type: ignore
67             sel.register(p.stderr, selectors.EVENT_READ)  # type: ignore
68             done = False
69             while not done:
70                 for key, _ in sel.select():
71                     char = key.fileobj.read(1)  # type: ignore
72                     if not char:
73                         sel.unregister(key.fileobj)
74                         if len(sel.get_map()) == 0:
75                             sys.stdout.flush()
76                             sys.stderr.flush()
77                             sel.close()
78                             done = True
79                     if key.fileobj is p.stdout:
80                         os.write(sys.stdout.fileno(), char)
81                         if char in line_enders:
82                             sys.stdout.flush()
83                     else:
84                         os.write(sys.stderr.fileno(), char)
85                         if char in line_enders:
86                             sys.stderr.flush()
87             p.wait()
88         finally:
89             if timer:
90                 timer.cancel()
91         return p.returncode
92
93
94 def cmd_exitcode(command: str, timeout_seconds: Optional[float] = None) -> int:
95     """Run a command silently in the background and return its exit
96     code once it has finished.  If timeout_seconds is provided and the
97     command runs longer than timeout_seconds, raise a `TimeoutExpired`
98     exception.
99
100     Args:
101         command: the command to run
102         timeout_seconds: optional the max number of seconds to allow
103             the subprocess to execute or None to indicate no timeout
104
105     Returns:
106         the exit status of the subprocess once the subprocess has
107         exited
108
109     >>> cmd_exitcode('/bin/echo foo', 10.0)
110     0
111
112     >>> cmd_exitcode('/bin/sleep 2', 0.01)
113     Traceback (most recent call last):
114     ...
115     subprocess.TimeoutExpired: Command '['/bin/bash', '-c', '/bin/sleep 2']' timed out after 0.01 seconds
116
117     """
118     return subprocess.check_call(["/bin/bash", "-c", command], timeout=timeout_seconds)
119
120
121 def cmd(command: str, timeout_seconds: Optional[float] = None) -> str:
122     """Run a command and capture its output to stdout and stderr into a
123     string buffer.  Return that string as this function's output.
124     Raises subprocess.CalledProcessError or TimeoutExpired on error.
125
126     Args:
127         command: the command to run
128         timeout_seconds: the max number of seconds to allow the subprocess
129             to execute or None to indicate no timeout
130
131     Returns:
132         The captured output of the subprocess' stdout as a string buffer
133
134     >>> cmd('/bin/echo foo')[:-1]
135     'foo'
136
137     >>> cmd('/bin/sleep 2', 0.01)
138     Traceback (most recent call last):
139     ...
140     subprocess.TimeoutExpired: Command '/bin/sleep 2' timed out after 0.01 seconds
141
142     """
143     ret = subprocess.run(
144         command,
145         shell=True,
146         stdout=subprocess.PIPE,
147         stderr=subprocess.STDOUT,
148         check=True,
149         timeout=timeout_seconds,
150     ).stdout
151     return ret.decode("utf-8")
152
153
154 def run_silently(command: str, timeout_seconds: Optional[float] = None) -> None:
155     """Run a command silently but raise
156     `subprocess.CalledProcessError` if it fails (i.e. returns a
157     non-zero return value) and raise a `TimeoutExpired` if it runs too
158     long.
159
160     Args:
161         command: the command to run.
162         timeout_seconds: the optional max number of seconds to allow
163             the subprocess to execute or None (default) to indicate no
164             time limit.
165
166     Returns:
167         No return value; error conditions (including non-zero child process
168         exits) produce exceptions.
169
170     >>> run_silently("/usr/bin/true")
171
172     >>> run_silently("/usr/bin/false")
173     Traceback (most recent call last):
174     ...
175     subprocess.CalledProcessError: Command '/usr/bin/false' returned non-zero exit status 1.
176
177     """
178     subprocess.run(
179         command,
180         shell=True,
181         stderr=subprocess.DEVNULL,
182         stdout=subprocess.DEVNULL,
183         capture_output=False,
184         check=True,
185         timeout=timeout_seconds,
186     )
187
188
189 def cmd_in_background(command: str, *, silent: bool = False) -> subprocess.Popen:
190     """Spawns a child process in the background and registers an exit
191     handler to make sure we kill it if the parent process (us) is
192     terminated.
193
194     Args:
195         command: the command to run
196         silent: do not allow any output from the child process to be displayed
197             in the parent process' window
198
199     Returns:
200         the :class:`Popen` object that can be used to communicate
201             with the background process.
202     """
203     args = shlex.split(command)
204     if silent:
205         subproc = subprocess.Popen(
206             args,
207             stdin=subprocess.DEVNULL,
208             stdout=subprocess.DEVNULL,
209             stderr=subprocess.DEVNULL,
210         )
211     else:
212         subproc = subprocess.Popen(args, stdin=subprocess.DEVNULL)
213
214     def kill_subproc() -> None:
215         try:
216             if subproc.poll() is None:
217                 logger.info("At exit handler: killing %s (%s)", subproc, command)
218                 subproc.terminate()
219                 subproc.wait(timeout=10.0)
220         except BaseException:
221             logger.exception(
222                 "Failed to terminate background process %s; giving up.", subproc
223             )
224
225     atexit.register(kill_subproc)
226     return subproc
227
228
229 def cmd_list(command: List[str]) -> str:
230     """Run a command with args encapsulated in a list and return the
231     output text as a string.  Raises subprocess.CalledProcessError.
232     """
233     ret = subprocess.run(command, capture_output=True, check=True).stdout
234     return ret.decode("utf-8")
235
236
237 if __name__ == "__main__":
238     import doctest
239
240     doctest.testmod()