I guess it's 2023 now...
[pyutils.git] / src / pyutils / exec_utils.py
1 #!/usr/bin/env python3
2
3 # © Copyright 2021-2023, Scott Gasch
4
5 """Helper methods concerned with executing subprocesses."""
6
7 import atexit
8 import logging
9 import os
10 import selectors
11 import shlex
12 import subprocess
13 import sys
14 from typing import List, Optional
15
16 logger = logging.getLogger(__file__)
17
18
19 def cmd_showing_output(
20     command: str,
21     *,
22     timeout_seconds: Optional[float] = None,
23 ) -> int:
24     """Kick off a child process.  Capture and emit all output that it
25     produces on stdout and stderr in a raw, character by character,
26     manner so that we don't have to wait on newlines.  This was done
27     to capture, for example, the output of a subprocess that creates
28     dots to show incremental progress on a task and render it
29     correctly.
30
31     Args:
32         command: the command to execute
33         timeout_seconds: terminate the subprocess if it takes longer
34             than N seconds; None means to wait as long as it takes.
35
36     Returns:
37         the exit status of the subprocess once the subprocess has
38         exited.  Raises `TimeoutExpired` after killing the subprocess
39         if the timeout expires.
40
41     Side effects:
42         prints all output of the child process (stdout or stderr)
43     """
44
45     def timer_expired(p):
46         p.kill()
47         raise subprocess.TimeoutExpired(command, timeout_seconds)
48
49     line_enders = set([b"\n", b"\r"])
50     sel = selectors.DefaultSelector()
51     with subprocess.Popen(
52         command,
53         shell=True,
54         bufsize=0,
55         stdout=subprocess.PIPE,
56         stderr=subprocess.PIPE,
57         universal_newlines=False,
58     ) as p:
59         timer = None
60         if timeout_seconds:
61             import threading
62
63             timer = threading.Timer(timeout_seconds, timer_expired(p))
64             timer.start()
65         try:
66             sel.register(p.stdout, selectors.EVENT_READ)  # type: ignore
67             sel.register(p.stderr, selectors.EVENT_READ)  # type: ignore
68             done = False
69             while not done:
70                 for key, _ in sel.select():
71                     char = key.fileobj.read(1)  # type: ignore
72                     if not char:
73                         sel.unregister(key.fileobj)
74                         if len(sel.get_map()) == 0:
75                             sys.stdout.flush()
76                             sys.stderr.flush()
77                             sel.close()
78                             done = True
79                     if key.fileobj is p.stdout:
80                         os.write(sys.stdout.fileno(), char)
81                         if char in line_enders:
82                             sys.stdout.flush()
83                     else:
84                         os.write(sys.stderr.fileno(), char)
85                         if char in line_enders:
86                             sys.stderr.flush()
87             p.wait()
88         finally:
89             if timer:
90                 timer.cancel()
91         return p.returncode
92
93
94 def cmd_exitcode(command: str, timeout_seconds: Optional[float] = None) -> int:
95     """Run a command silently in the background and return its exit
96     code once it has finished.  If timeout_seconds is provided and the
97     command runs longer than timeout_seconds, raise a `TimeoutExpired`
98     exception.
99
100     Args:
101         command: the command to run
102         timeout_seconds: optional the max number of seconds to allow
103             the subprocess to execute or None to indicate no timeout
104
105     Returns:
106         the exit status of the subprocess once the subprocess has
107         exited
108
109     >>> cmd_exitcode('/bin/echo foo', 10.0)
110     0
111
112     >>> cmd_exitcode('/bin/sleep 2', 0.01)
113     Traceback (most recent call last):
114     ...
115     subprocess.TimeoutExpired: Command '['/bin/bash', '-c', '/bin/sleep 2']' timed out after 0.01 seconds
116
117     """
118     return subprocess.check_call(["/bin/bash", "-c", command], timeout=timeout_seconds)
119
120
121 def cmd(command: str, timeout_seconds: Optional[float] = None) -> str:
122     """Run a command and capture its output to stdout and stderr into a
123     string buffer.  Return that string as this function's output.
124     Raises subprocess.CalledProcessError or TimeoutExpired on error.
125
126     Args:
127         command: the command to run
128         timeout_seconds: the max number of seconds to allow the subprocess
129             to execute or None to indicate no timeout
130
131     Returns:
132         The captured output of the subprocess' stdout as a string buffer
133
134     >>> cmd('/bin/echo foo')[:-1]
135     'foo'
136
137     >>> cmd('/bin/sleep 2', 0.01)
138     Traceback (most recent call last):
139     ...
140     subprocess.TimeoutExpired: Command '/bin/sleep 2' timed out after 0.01 seconds
141
142     """
143     ret = subprocess.run(
144         command,
145         shell=True,
146         stdout=subprocess.PIPE,
147         stderr=subprocess.STDOUT,
148         check=True,
149         timeout=timeout_seconds,
150     ).stdout
151     return ret.decode("utf-8")
152
153
154 def run_silently(command: str, timeout_seconds: Optional[float] = None) -> None:
155     """Run a command silently but raise
156     `subprocess.CalledProcessError` if it fails (i.e. returns a
157     non-zero return value) and raise a `TimeoutExpired` if it runs too
158     long.
159
160     Args:
161         command: the command to run timeout_seconds: the optional
162             max number of seconds to allow the subprocess to execute or
163             None to indicate no timeout
164
165     Returns:
166         No return value; error conditions (including non-zero child process
167         exits) produce exceptions.
168
169     >>> run_silently("/usr/bin/true")
170
171     >>> run_silently("/usr/bin/false")
172     Traceback (most recent call last):
173     ...
174     subprocess.CalledProcessError: Command '/usr/bin/false' returned non-zero exit status 1.
175
176     """
177     subprocess.run(
178         command,
179         shell=True,
180         stderr=subprocess.DEVNULL,
181         stdout=subprocess.DEVNULL,
182         capture_output=False,
183         check=True,
184         timeout=timeout_seconds,
185     )
186
187
188 def cmd_in_background(command: str, *, silent: bool = False) -> subprocess.Popen:
189     """Spawns a child process in the background and registers an exit
190     handler to make sure we kill it if the parent process (us) is
191     terminated.
192
193     Args:
194         command: the command to run
195         silent: do not allow any output from the child process to be displayed
196             in the parent process' window
197
198     Returns:
199         the :class:`Popen` object that can be used to communicate
200             with the background process.
201     """
202     args = shlex.split(command)
203     if silent:
204         subproc = subprocess.Popen(
205             args,
206             stdin=subprocess.DEVNULL,
207             stdout=subprocess.DEVNULL,
208             stderr=subprocess.DEVNULL,
209         )
210     else:
211         subproc = subprocess.Popen(args, stdin=subprocess.DEVNULL)
212
213     def kill_subproc() -> None:
214         try:
215             if subproc.poll() is None:
216                 logger.info("At exit handler: killing %s (%s)", subproc, command)
217                 subproc.terminate()
218                 subproc.wait(timeout=10.0)
219         except BaseException:
220             logger.exception(
221                 "Failed to terminate background process %s; giving up.", subproc
222             )
223
224     atexit.register(kill_subproc)
225     return subproc
226
227
228 def cmd_list(command: List[str]) -> str:
229     """Run a command with args encapsulated in a list and return the
230     output text as a string.  Raises subprocess.CalledProcessError.
231     """
232     ret = subprocess.run(command, capture_output=True, check=True).stdout
233     return ret.decode("utf-8")
234
235
236 if __name__ == "__main__":
237     import doctest
238
239     doctest.testmod()