Used isort to sort imports. Also added to the git pre-commit hook.
[python_utils.git] / exec_utils.py
1 #!/usr/bin/env python3
2
3 import atexit
4 import logging
5 import os
6 import selectors
7 import shlex
8 import subprocess
9 import sys
10 from typing import List, Optional
11
12 logger = logging.getLogger(__file__)
13
14
15 def cmd_showing_output(
16     command: str,
17 ) -> int:
18     """Kick off a child process.  Capture and print all output that it
19     produces on stdout and stderr.  Wait for the subprocess to exit
20     and return the exit value as the return code of this function.
21
22     """
23     line_enders = set([b'\n', b'\r'])
24     sel = selectors.DefaultSelector()
25     with subprocess.Popen(
26         command,
27         shell=True,
28         bufsize=0,
29         stdout=subprocess.PIPE,
30         stderr=subprocess.PIPE,
31         universal_newlines=False,
32     ) as p:
33         sel.register(p.stdout, selectors.EVENT_READ)  # type: ignore
34         sel.register(p.stderr, selectors.EVENT_READ)  # type: ignore
35         done = False
36         while not done:
37             for key, _ in sel.select():
38                 char = key.fileobj.read(1)  # type: ignore
39                 if not char:
40                     sel.unregister(key.fileobj)
41                     if len(sel.get_map()) == 0:
42                         sys.stdout.flush()
43                         sys.stderr.flush()
44                         sel.close()
45                         done = True
46                 if key.fileobj is p.stdout:
47                     # sys.stdout.buffer.write(char)
48                     os.write(sys.stdout.fileno(), char)
49                     if char in line_enders:
50                         sys.stdout.flush()
51                 else:
52                     # sys.stderr.buffer.write(char)
53                     os.write(sys.stderr.fileno(), char)
54                     if char in line_enders:
55                         sys.stderr.flush()
56         p.wait()
57         return p.returncode
58
59
60 def cmd_with_timeout(command: str, timeout_seconds: Optional[float]) -> int:
61     """Run a command but do not let it run for more than timeout seconds.
62     Doesn't capture or rebroadcast command output.  Function returns
63     the exit value of the command or raises a TimeoutExpired exception
64     if the deadline is exceeded.
65
66     >>> cmd_with_timeout('/bin/echo foo', 10.0)
67     0
68
69     >>> cmd_with_timeout('/bin/sleep 2', 0.1)
70     Traceback (most recent call last):
71     ...
72     subprocess.TimeoutExpired: Command '['/bin/bash', '-c', '/bin/sleep 2']' timed out after 0.1 seconds
73
74     """
75     return subprocess.check_call(["/bin/bash", "-c", command], timeout=timeout_seconds)
76
77
78 def cmd(command: str, timeout_seconds: Optional[float] = None) -> str:
79     """Run a command and capture its output to stdout (only) in a string.
80     Return that string as this function's output.  Raises
81     subprocess.CalledProcessError or TimeoutExpired on error.
82
83     >>> cmd('/bin/echo foo')[:-1]
84     'foo'
85
86     >>> cmd('/bin/sleep 2', 0.1)
87     Traceback (most recent call last):
88     ...
89     subprocess.TimeoutExpired: Command '/bin/sleep 2' timed out after 0.1 seconds
90
91     """
92     ret = subprocess.run(
93         command,
94         shell=True,
95         capture_output=True,
96         check=True,
97         timeout=timeout_seconds,
98     ).stdout
99     return ret.decode("utf-8")
100
101
102 def run_silently(command: str, timeout_seconds: Optional[float] = None) -> None:
103     """Run a command silently but raise subprocess.CalledProcessError if
104     it fails.
105
106     >>> run_silently("/usr/bin/true")
107
108     >>> run_silently("/usr/bin/false")
109     Traceback (most recent call last):
110     ...
111     subprocess.CalledProcessError: Command '/usr/bin/false' returned non-zero exit status 1.
112
113     """
114     subprocess.run(
115         command,
116         shell=True,
117         stderr=subprocess.DEVNULL,
118         stdout=subprocess.DEVNULL,
119         capture_output=False,
120         check=True,
121         timeout=timeout_seconds,
122     )
123
124
125 def cmd_in_background(command: str, *, silent: bool = False) -> subprocess.Popen:
126     args = shlex.split(command)
127     if silent:
128         subproc = subprocess.Popen(
129             args,
130             stdin=subprocess.DEVNULL,
131             stdout=subprocess.DEVNULL,
132             stderr=subprocess.DEVNULL,
133         )
134     else:
135         subproc = subprocess.Popen(args, stdin=subprocess.DEVNULL)
136
137     def kill_subproc() -> None:
138         try:
139             if subproc.poll() is None:
140                 logger.info(f'At exit handler: killing {subproc} ({command})')
141                 subproc.terminate()
142                 subproc.wait(timeout=10.0)
143         except BaseException as be:
144             logger.exception(be)
145
146     atexit.register(kill_subproc)
147     return subproc
148
149
150 def cmd_list(command: List[str]) -> str:
151     """Run a command with args encapsulated in a list and return the
152     output text as a string.  Raises subprocess.CalledProcessError.
153     """
154     ret = subprocess.run(command, capture_output=True, check=True).stdout
155     return ret.decode("utf-8")
156
157
158 if __name__ == '__main__':
159     import doctest
160
161     doctest.testmod()