Adds a __repr__ to graph.
[pyutils.git] / src / pyutils / exec_utils.py
1 #!/usr/bin/env python3
2
3 # © Copyright 2021-2023, Scott Gasch
4
5 """Helper methods concerned with executing subprocesses."""
6
7 import atexit
8 import logging
9 import os
10 import selectors
11 import shlex
12 import subprocess
13 import sys
14 from typing import List, Optional
15
16 logger = logging.getLogger(__file__)
17
18
19 def cmd_showing_output(
20     command: str,
21     *,
22     timeout_seconds: Optional[float] = None,
23 ) -> int:
24     """Kick off a child process.  Capture and emit all output that it
25     produces on stdout and stderr in a raw, character by character,
26     manner so that we don't have to wait on newlines.  This was done
27     to capture, for example, the output of a subprocess that creates
28     dots to show incremental progress on a task and render it
29     correctly.
30
31     Args:
32         command: the command to execute
33         timeout_seconds: terminate the subprocess if it takes longer
34             than N seconds; None means to wait as long as it takes.
35
36     Returns:
37         the exit status of the subprocess once the subprocess has
38         exited.  Raises `TimeoutExpired` after killing the subprocess
39         if the timeout expires.
40
41     Raises:
42         TimeoutExpired: if timeout expires before child terminates
43
44     Side effects:
45         prints all output of the child process (stdout or stderr)
46     """
47
48     def timer_expired(p):
49         p.kill()
50         raise subprocess.TimeoutExpired(command, timeout_seconds)
51
52     line_enders = set([b"\n", b"\r"])
53     sel = selectors.DefaultSelector()
54     with subprocess.Popen(
55         command,
56         shell=True,
57         bufsize=0,
58         stdout=subprocess.PIPE,
59         stderr=subprocess.PIPE,
60         universal_newlines=False,
61     ) as p:
62         timer = None
63         if timeout_seconds:
64             import threading
65
66             timer = threading.Timer(timeout_seconds, timer_expired(p))
67             timer.start()
68         try:
69             sel.register(p.stdout, selectors.EVENT_READ)  # type: ignore
70             sel.register(p.stderr, selectors.EVENT_READ)  # type: ignore
71             done = False
72             while not done:
73                 for key, _ in sel.select():
74                     char = key.fileobj.read(1)  # type: ignore
75                     if not char:
76                         sel.unregister(key.fileobj)
77                         if not sel.get_map():
78                             sys.stdout.flush()
79                             sys.stderr.flush()
80                             sel.close()
81                             done = True
82                     if key.fileobj is p.stdout:
83                         os.write(sys.stdout.fileno(), char)
84                         if char in line_enders:
85                             sys.stdout.flush()
86                     else:
87                         os.write(sys.stderr.fileno(), char)
88                         if char in line_enders:
89                             sys.stderr.flush()
90             p.wait()
91         finally:
92             if timer:
93                 timer.cancel()
94         return p.returncode
95
96
97 def cmd_exitcode(command: str, timeout_seconds: Optional[float] = None) -> int:
98     """Run a command silently in the background and return its exit
99     code once it has finished.
100
101     Args:
102         command: the command to run
103         timeout_seconds: optional the max number of seconds to allow
104             the subprocess to execute or None to indicate no timeout
105
106     Returns:
107         the exit status of the subprocess once the subprocess has
108         exited
109
110     Raises:
111         TimeoutExpired: if timeout_seconds is provided and the child process
112             executes longer than the limit.
113
114     >>> cmd_exitcode('/bin/echo foo', 10.0)
115     0
116
117     >>> cmd_exitcode('/bin/sleep 2', 0.01)
118     Traceback (most recent call last):
119     ...
120     subprocess.TimeoutExpired: Command '['/bin/bash', '-c', '/bin/sleep 2']' timed out after 0.01 seconds
121
122     """
123     return subprocess.check_call(["/bin/bash", "-c", command], timeout=timeout_seconds)
124
125
126 def cmd(command: str, timeout_seconds: Optional[float] = None) -> str:
127     """Run a command and capture its output to stdout and stderr into a
128     string buffer.  Return that string as this function's output.
129
130     Args:
131         command: the command to run
132         timeout_seconds: the max number of seconds to allow the subprocess
133             to execute or None to indicate no timeout
134
135     Returns:
136         The captured output of the subprocess' stdout as a string buffer
137
138     Raises:
139         CalledProcessError: the child process didn't exit cleanly
140         TimeoutExpired: the child process ran too long
141
142     >>> cmd('/bin/echo foo')[:-1]
143     'foo'
144
145     >>> cmd('/bin/sleep 2', 0.01)
146     Traceback (most recent call last):
147     ...
148     subprocess.TimeoutExpired: Command '/bin/sleep 2' timed out after 0.01 seconds
149
150     """
151     ret = subprocess.run(
152         command,
153         shell=True,
154         stdout=subprocess.PIPE,
155         stderr=subprocess.STDOUT,
156         check=True,
157         timeout=timeout_seconds,
158     ).stdout
159     return ret.decode("utf-8")
160
161
162 def run_silently(command: str, timeout_seconds: Optional[float] = None) -> None:
163     """Run a command silently.
164
165     Args:
166         command: the command to run.
167         timeout_seconds: the optional max number of seconds to allow
168             the subprocess to execute or None (default) to indicate no
169             time limit.
170
171     Returns:
172         No return value; error conditions (including non-zero child process
173         exits) produce exceptions.
174
175     Raises:
176         CalledProcessError: if the child process fails (i.e. exit != 0)
177         TimeoutExpired: if the child process executes too long.
178
179     >>> run_silently("/usr/bin/true")
180
181     >>> run_silently("/usr/bin/false")
182     Traceback (most recent call last):
183     ...
184     subprocess.CalledProcessError: Command '/usr/bin/false' returned non-zero exit status 1.
185
186     """
187     subprocess.run(
188         command,
189         shell=True,
190         stderr=subprocess.DEVNULL,
191         stdout=subprocess.DEVNULL,
192         capture_output=False,
193         check=True,
194         timeout=timeout_seconds,
195     )
196
197
198 def cmd_in_background(command: str, *, silent: bool = False) -> subprocess.Popen:
199     """Spawns a child process in the background and registers an exit
200     handler to make sure we kill it if the parent process (us) is
201     terminated.
202
203     Args:
204         command: the command to run
205         silent: do not allow any output from the child process to be displayed
206             in the parent process' window
207
208     Returns:
209         the :class:`Popen` object that can be used to communicate
210             with the background process.
211     """
212     args = shlex.split(command)
213     if silent:
214         subproc = subprocess.Popen(
215             args,
216             stdin=subprocess.DEVNULL,
217             stdout=subprocess.DEVNULL,
218             stderr=subprocess.DEVNULL,
219         )
220     else:
221         subproc = subprocess.Popen(args, stdin=subprocess.DEVNULL)
222
223     def kill_subproc() -> None:
224         try:
225             if subproc.poll() is None:
226                 logger.info("At exit handler: killing %s (%s)", subproc, command)
227                 subproc.terminate()
228                 subproc.wait(timeout=10.0)
229         except BaseException:
230             logger.exception(
231                 "Failed to terminate background process %s; giving up.", subproc
232             )
233
234     atexit.register(kill_subproc)
235     return subproc
236
237
238 def cmd_list(command: List[str]) -> str:
239     """Run a command with args encapsulated in a list and return the
240     output text as a string.
241
242     Raises:
243         CalledProcessError: the child process didn't exit cleanly
244         TimeoutExpired: the child process ran too long
245     """
246     ret = subprocess.run(command, capture_output=True, check=True).stdout
247     return ret.decode("utf-8")
248
249
250 if __name__ == "__main__":
251     import doctest
252
253     doctest.testmod()