Cleanup docs.
[pyutils.git] / src / pyutils / exec_utils.py
1 #!/usr/bin/env python3
2
3 # © Copyright 2021-2023, Scott Gasch
4
5 """Helper methods concerned with executing subprocesses."""
6
7 import atexit
8 import logging
9 import os
10 import selectors
11 import shlex
12 import subprocess
13 import sys
14 from typing import List, Optional
15
16 logger = logging.getLogger(__file__)
17
18
19 def cmd_showing_output(
20     command: str,
21     *,
22     timeout_seconds: Optional[float] = None,
23 ) -> int:
24     """Kick off a child process.  Capture and emit all output that it
25     produces on stdout and stderr in a raw, character by character,
26     manner so that we don't have to wait on newlines.  This was done
27     to capture, for example, the output of a subprocess that creates
28     dots to show incremental progress on a task and render it
29     correctly.
30
31     Args:
32         command: the command to execute
33         timeout_seconds: terminate the subprocess if it takes longer
34             than N seconds; None means to wait as long as it takes.
35
36     Returns:
37         the exit status of the subprocess once the subprocess has
38         exited.  Raises `TimeoutExpired` after killing the subprocess
39         if the timeout expires.
40
41     Raises:
42         TimeoutExpired: if timeout expires before child terminates
43
44     Side effects:
45         prints all output of the child process (stdout or stderr)
46     """
47
48     def timer_expired(p):
49         p.kill()
50         raise subprocess.TimeoutExpired(command, timeout_seconds)
51
52     line_enders = set([b"\n", b"\r"])
53     sel = selectors.DefaultSelector()
54     with subprocess.Popen(
55         command,
56         shell=True,
57         bufsize=0,
58         stdout=subprocess.PIPE,
59         stderr=subprocess.PIPE,
60         universal_newlines=False,
61     ) as p:
62         timer = None
63         if timeout_seconds:
64             import threading
65
66             timer = threading.Timer(timeout_seconds, timer_expired(p))
67             timer.start()
68         try:
69             sel.register(p.stdout, selectors.EVENT_READ)  # type: ignore
70             sel.register(p.stderr, selectors.EVENT_READ)  # type: ignore
71             done = False
72             while not done:
73                 for key, _ in sel.select():
74                     char = key.fileobj.read(1)  # type: ignore
75                     if not char:
76                         sel.unregister(key.fileobj)
77                         if not sel.get_map():
78                             sys.stdout.flush()
79                             sys.stderr.flush()
80                             sel.close()
81                             done = True
82                     if key.fileobj is p.stdout:
83                         os.write(sys.stdout.fileno(), char)
84                         if char in line_enders:
85                             sys.stdout.flush()
86                     else:
87                         os.write(sys.stderr.fileno(), char)
88                         if char in line_enders:
89                             sys.stderr.flush()
90             p.wait()
91         finally:
92             if timer:
93                 timer.cancel()
94         return p.returncode
95
96
97 def cmd_exitcode(command: str, timeout_seconds: Optional[float] = None) -> int:
98     """Run a command silently in the background and return its exit
99     code once it has finished.
100
101     Args:
102         command: the command to run
103         timeout_seconds: optional the max number of seconds to allow
104             the subprocess to execute or None to indicate no timeout
105
106     Returns:
107         the exit status of the subprocess once the subprocess has
108         exited
109
110     Raises:
111         TimeoutExpired: if timeout_seconds is provided and the child process
112             executes longer than the limit.
113
114     >>> cmd_exitcode('/bin/echo foo', 10.0)
115     0
116
117     >>> cmd_exitcode('/bin/sleep 2', 0.01)
118     Traceback (most recent call last):
119     ...
120     subprocess.TimeoutExpired: Command '['/bin/bash', '-c', '/bin/sleep 2']' timed out after 0.01 seconds
121
122     """
123     return subprocess.check_call(["/bin/bash", "-c", command], timeout=timeout_seconds)
124
125
126 def cmd(command: str, timeout_seconds: Optional[float] = None) -> str:
127     """Run a command and capture its output to stdout and stderr into a
128     string buffer.  Return that string as this function's output.
129     Raises subprocess.CalledProcessError or TimeoutExpired on error.
130
131     Args:
132         command: the command to run
133         timeout_seconds: the max number of seconds to allow the subprocess
134             to execute or None to indicate no timeout
135
136     Returns:
137         The captured output of the subprocess' stdout as a string buffer
138
139     >>> cmd('/bin/echo foo')[:-1]
140     'foo'
141
142     >>> cmd('/bin/sleep 2', 0.01)
143     Traceback (most recent call last):
144     ...
145     subprocess.TimeoutExpired: Command '/bin/sleep 2' timed out after 0.01 seconds
146
147     """
148     ret = subprocess.run(
149         command,
150         shell=True,
151         stdout=subprocess.PIPE,
152         stderr=subprocess.STDOUT,
153         check=True,
154         timeout=timeout_seconds,
155     ).stdout
156     return ret.decode("utf-8")
157
158
159 def run_silently(command: str, timeout_seconds: Optional[float] = None) -> None:
160     """Run a command silently.
161
162     Args:
163         command: the command to run.
164         timeout_seconds: the optional max number of seconds to allow
165             the subprocess to execute or None (default) to indicate no
166             time limit.
167
168     Returns:
169         No return value; error conditions (including non-zero child process
170         exits) produce exceptions.
171
172     Raises:
173         CalledProcessError: if the child process fails (i.e. exit != 0)
174         TimeoutExpired: if the child process executes too long.
175
176     >>> run_silently("/usr/bin/true")
177
178     >>> run_silently("/usr/bin/false")
179     Traceback (most recent call last):
180     ...
181     subprocess.CalledProcessError: Command '/usr/bin/false' returned non-zero exit status 1.
182
183     """
184     subprocess.run(
185         command,
186         shell=True,
187         stderr=subprocess.DEVNULL,
188         stdout=subprocess.DEVNULL,
189         capture_output=False,
190         check=True,
191         timeout=timeout_seconds,
192     )
193
194
195 def cmd_in_background(command: str, *, silent: bool = False) -> subprocess.Popen:
196     """Spawns a child process in the background and registers an exit
197     handler to make sure we kill it if the parent process (us) is
198     terminated.
199
200     Args:
201         command: the command to run
202         silent: do not allow any output from the child process to be displayed
203             in the parent process' window
204
205     Returns:
206         the :class:`Popen` object that can be used to communicate
207             with the background process.
208     """
209     args = shlex.split(command)
210     if silent:
211         subproc = subprocess.Popen(
212             args,
213             stdin=subprocess.DEVNULL,
214             stdout=subprocess.DEVNULL,
215             stderr=subprocess.DEVNULL,
216         )
217     else:
218         subproc = subprocess.Popen(args, stdin=subprocess.DEVNULL)
219
220     def kill_subproc() -> None:
221         try:
222             if subproc.poll() is None:
223                 logger.info("At exit handler: killing %s (%s)", subproc, command)
224                 subproc.terminate()
225                 subproc.wait(timeout=10.0)
226         except BaseException:
227             logger.exception(
228                 "Failed to terminate background process %s; giving up.", subproc
229             )
230
231     atexit.register(kill_subproc)
232     return subproc
233
234
235 def cmd_list(command: List[str]) -> str:
236     """Run a command with args encapsulated in a list and return the
237     output text as a string.  Raises subprocess.CalledProcessError.
238     """
239     ret = subprocess.run(command, capture_output=True, check=True).stdout
240     return ret.decode("utf-8")
241
242
243 if __name__ == "__main__":
244     import doctest
245
246     doctest.testmod()