Better logging + cleanup.
[python_utils.git] / exec_utils.py
1 #!/usr/bin/env python3
2
3 # © Copyright 2021-2022, Scott Gasch
4
5 """Helper methods concerned with executing subprocesses."""
6
7 import atexit
8 import logging
9 import os
10 import selectors
11 import shlex
12 import subprocess
13 import sys
14 from typing import List, Optional
15
16 logger = logging.getLogger(__file__)
17
18
19 def cmd_showing_output(
20     command: str,
21 ) -> int:
22     """Kick off a child process.  Capture and print all output that it
23     produces on stdout and stderr.  Wait for the subprocess to exit
24     and return the exit value as the return code of this function.
25
26     """
27     line_enders = set([b'\n', b'\r'])
28     sel = selectors.DefaultSelector()
29     with subprocess.Popen(
30         command,
31         shell=True,
32         bufsize=0,
33         stdout=subprocess.PIPE,
34         stderr=subprocess.PIPE,
35         universal_newlines=False,
36     ) as p:
37         sel.register(p.stdout, selectors.EVENT_READ)  # type: ignore
38         sel.register(p.stderr, selectors.EVENT_READ)  # type: ignore
39         done = False
40         while not done:
41             for key, _ in sel.select():
42                 char = key.fileobj.read(1)  # type: ignore
43                 if not char:
44                     sel.unregister(key.fileobj)
45                     if len(sel.get_map()) == 0:
46                         sys.stdout.flush()
47                         sys.stderr.flush()
48                         sel.close()
49                         done = True
50                 if key.fileobj is p.stdout:
51                     # sys.stdout.buffer.write(char)
52                     os.write(sys.stdout.fileno(), char)
53                     if char in line_enders:
54                         sys.stdout.flush()
55                 else:
56                     # sys.stderr.buffer.write(char)
57                     os.write(sys.stderr.fileno(), char)
58                     if char in line_enders:
59                         sys.stderr.flush()
60         p.wait()
61         return p.returncode
62
63
64 def cmd_with_timeout(command: str, timeout_seconds: Optional[float]) -> int:
65     """Run a command but do not let it run for more than timeout seconds.
66     Doesn't capture or rebroadcast command output.  Function returns
67     the exit value of the command or raises a TimeoutExpired exception
68     if the deadline is exceeded.
69
70     >>> cmd_with_timeout('/bin/echo foo', 10.0)
71     0
72
73     >>> cmd_with_timeout('/bin/sleep 2', 0.1)
74     Traceback (most recent call last):
75     ...
76     subprocess.TimeoutExpired: Command '['/bin/bash', '-c', '/bin/sleep 2']' timed out after 0.1 seconds
77
78     """
79     return subprocess.check_call(["/bin/bash", "-c", command], timeout=timeout_seconds)
80
81
82 def cmd(command: str, timeout_seconds: Optional[float] = None) -> str:
83     """Run a command and capture its output to stdout (only) in a string.
84     Return that string as this function's output.  Raises
85     subprocess.CalledProcessError or TimeoutExpired on error.
86
87     >>> cmd('/bin/echo foo')[:-1]
88     'foo'
89
90     >>> cmd('/bin/sleep 2', 0.1)
91     Traceback (most recent call last):
92     ...
93     subprocess.TimeoutExpired: Command '/bin/sleep 2' timed out after 0.1 seconds
94
95     """
96     ret = subprocess.run(
97         command,
98         shell=True,
99         capture_output=True,
100         check=True,
101         timeout=timeout_seconds,
102     ).stdout
103     return ret.decode("utf-8")
104
105
106 def run_silently(command: str, timeout_seconds: Optional[float] = None) -> None:
107     """Run a command silently but raise subprocess.CalledProcessError if
108     it fails.
109
110     >>> run_silently("/usr/bin/true")
111
112     >>> run_silently("/usr/bin/false")
113     Traceback (most recent call last):
114     ...
115     subprocess.CalledProcessError: Command '/usr/bin/false' returned non-zero exit status 1.
116
117     """
118     subprocess.run(
119         command,
120         shell=True,
121         stderr=subprocess.DEVNULL,
122         stdout=subprocess.DEVNULL,
123         capture_output=False,
124         check=True,
125         timeout=timeout_seconds,
126     )
127
128
129 def cmd_in_background(command: str, *, silent: bool = False) -> subprocess.Popen:
130     args = shlex.split(command)
131     if silent:
132         subproc = subprocess.Popen(
133             args,
134             stdin=subprocess.DEVNULL,
135             stdout=subprocess.DEVNULL,
136             stderr=subprocess.DEVNULL,
137         )
138     else:
139         subproc = subprocess.Popen(args, stdin=subprocess.DEVNULL)
140
141     def kill_subproc() -> None:
142         try:
143             if subproc.poll() is None:
144                 logger.info('At exit handler: killing %s (%s)', subproc, command)
145                 subproc.terminate()
146                 subproc.wait(timeout=10.0)
147         except BaseException as be:
148             logger.exception(be)
149
150     atexit.register(kill_subproc)
151     return subproc
152
153
154 def cmd_list(command: List[str]) -> str:
155     """Run a command with args encapsulated in a list and return the
156     output text as a string.  Raises subprocess.CalledProcessError.
157     """
158     ret = subprocess.run(command, capture_output=True, check=True).stdout
159     return ret.decode("utf-8")
160
161
162 if __name__ == '__main__':
163     import doctest
164
165     doctest.testmod()