4 A smart, fast test runner. Used in a git pre-commit hook.
13 from abc import ABC, abstractmethod
14 from dataclasses import dataclass
15 from typing import Any, Dict, List, Optional
17 from overrides import overrides
24 import parallelize as par
29 logger = logging.getLogger(__name__)
30 args = config.add_commandline_args(f'({__file__})', 'Args related to __file__')
31 args.add_argument('--unittests', '-u', action='store_true', help='Run unittests.')
32 args.add_argument('--doctests', '-d', action='store_true', help='Run doctests.')
33 args.add_argument('--integration', '-i', action='store_true', help='Run integration tests.')
35 '--coverage', '-c', action='store_true', help='Run tests and capture code coverage data'
38 HOME = os.environ['HOME']
42 class TestingParameters:
44 """Should we stop as soon as one error has occurred?"""
46 halt_event: threading.Event
47 """An event that, when set, indicates to stop ASAP."""
53 """The name of this test / set of tests."""
55 tests_executed: List[str]
56 """Tests that were executed."""
58 tests_succeeded: List[str]
59 """Tests that succeeded."""
61 tests_failed: List[str]
62 """Tests that failed."""
64 tests_timed_out: List[str]
65 """Tests that timed out."""
67 def __add__(self, other):
68 self.tests_executed.extend(other.tests_executed)
69 self.tests_succeeded.extend(other.tests_succeeded)
70 self.tests_failed.extend(other.tests_failed)
71 self.tests_timed_out.extend(other.tests_timed_out)
76 def __repr__(self) -> str:
77 out = f'{self.name}: '
78 out += f'{ansi.fg("green")}'
79 out += f'{len(self.tests_succeeded)}/{len(self.tests_executed)} passed'
80 out += f'{ansi.reset()}.\n'
82 if len(self.tests_failed) > 0:
83 out += f' ..{ansi.fg("red")}'
84 out += f'{len(self.tests_failed)} tests failed'
85 out += f'{ansi.reset()}:\n'
86 for test in self.tests_failed:
90 if len(self.tests_timed_out) > 0:
91 out += f' ..{ansi.fg("yellow")}'
92 out += f'{len(self.tests_timed_out)} tests timed out'
93 out += f'{ansi.reset()}:\n'
94 for test in self.tests_failed:
100 class TestRunner(ABC, thread_utils.ThreadWithReturnValue):
101 """A Base class for something that runs a test."""
103 def __init__(self, params: TestingParameters):
104 """Create a TestRunner.
107 params: Test running paramters.
110 super().__init__(self, target=self.begin, args=[params])
112 self.test_results = TestResults(
113 name=self.get_name(),
121 def get_name(self) -> str:
122 """The name of this test collection."""
126 def begin(self, params: TestingParameters) -> TestResults:
127 """Start execution."""
131 class TemplatedTestRunner(TestRunner, ABC):
132 """A TestRunner that has a recipe for executing the tests."""
135 def identify_tests(self) -> List[str]:
136 """Return a list of tests that should be executed."""
140 def run_test(self, test: Any) -> TestResults:
141 """Run a single test and return its TestResults."""
144 def check_for_abort(self):
145 """Periodically caled to check to see if we need to stop."""
147 if self.params.halt_event.is_set():
148 logger.debug('Thread %s saw halt event; exiting.', self.get_name())
149 raise Exception("Kill myself!")
150 if self.params.halt_on_error:
151 if len(self.test_results.tests_failed) > 0:
152 logger.error('Thread %s saw abnormal results; exiting.', self.get_name())
153 raise Exception("Kill myself!")
155 def status_report(self, started: int, result: TestResults):
156 """Periodically called to report current status."""
159 len(self.test_results.tests_succeeded)
160 + len(self.test_results.tests_failed)
161 + len(self.test_results.tests_timed_out)
163 running = started - finished
164 finished_percent = finished / started * 100.0
166 '%s: %d/%d in flight; %d/%d finished (%.1f%%).',
175 def persist_output(self, test_name: str, message: str, output: str) -> None:
176 """Called to save the output of a test run."""
178 basename = file_utils.without_path(test_name)
179 dest = f'{basename}-output.txt'
180 with open(f'./test_output/{dest}', 'w') as wf:
181 print(message, file=wf)
182 print('-' * len(message), file=wf)
185 def execute_commandline(
190 timeout: float = 120.0,
192 """Execute a particular commandline to run a test."""
195 logger.debug('%s: Running %s (%s)', self.get_name(), test_name, cmdline)
196 output = exec_utils.cmd(
198 timeout_seconds=timeout,
200 self.persist_output(test_name, f'{test_name} ({cmdline}) succeeded.', output)
201 logger.debug('%s (%s) succeeded', test_name, cmdline)
202 return TestResults(test_name, [test_name], [test_name], [], [])
203 except subprocess.TimeoutExpired as e:
204 msg = f'{self.get_name()}: {test_name} ({cmdline}) timed out after {e.timeout:.1f} seconds.'
207 '%s: %s output when it timed out: %s', self.get_name(), test_name, e.output
209 self.persist_output(test_name, msg, e.output.decode('utf-8'))
217 except subprocess.CalledProcessError as e:
218 msg = f'{self.get_name()}: {test_name} ({cmdline}) failed; exit code {e.returncode}'
220 logger.debug('%s: %s output when it failed: %s', self.get_name(), test_name, e.output)
221 self.persist_output(test_name, msg, e.output.decide('utf-8'))
231 def begin(self, params: TestingParameters) -> TestResults:
232 logger.debug('Thread %s started.', self.get_name())
233 interesting_tests = self.identify_tests()
235 running: List[Any] = []
236 for test in interesting_tests:
237 running.append(self.run_test(test))
238 started = len(running)
240 for future in smart_future.wait_any(running):
241 self.check_for_abort()
242 result = future._resolve()
243 self.status_report(started, result)
244 logger.debug('Test %s finished.', result.name)
245 self.test_results += result
247 logger.debug('Thread %s finished.', self.get_name())
248 return self.test_results
251 class UnittestTestRunner(TemplatedTestRunner):
252 """Run all known Unittests."""
255 def get_name(self) -> str:
259 def identify_tests(self) -> List[str]:
260 return list(file_utils.expand_globs('*_test.py'))
263 def run_test(self, test: Any) -> TestResults:
264 if config.config['coverage']:
265 cmdline = f'coverage run --source {HOME}/lib {test} --unittests_ignore_perf'
268 return self.execute_commandline(test, cmdline)
271 class DoctestTestRunner(TemplatedTestRunner):
272 """Run all known Doctests."""
275 def get_name(self) -> str:
279 def identify_tests(self) -> List[str]:
281 out = exec_utils.cmd('grep -lR "^ *import doctest" /home/scott/lib/python_modules/*')
282 for line in out.split('\n'):
283 if re.match(r'.*\.py$', line):
284 if 'run_tests.py' not in line:
289 def run_test(self, test: Any) -> TestResults:
290 if config.config['coverage']:
291 cmdline = f'coverage run --source {HOME}/lib {test} 2>&1'
293 cmdline = f'python3 {test}'
294 return self.execute_commandline(test, cmdline)
297 class IntegrationTestRunner(TemplatedTestRunner):
298 """Run all know Integration tests."""
301 def get_name(self) -> str:
302 return "Integration Tests"
305 def identify_tests(self) -> List[str]:
306 return list(file_utils.expand_globs('*_itest.py'))
309 def run_test(self, test: Any) -> TestResults:
310 if config.config['coverage']:
311 cmdline = f'coverage run --source {HOME}/lib {test}'
314 return self.execute_commandline(test, cmdline)
317 def test_results_report(results: Dict[str, TestResults]) -> int:
318 """Give a final report about the tests that were run."""
320 for result in results.values():
321 print(result, end='')
322 total_problems += len(result.tests_failed)
323 total_problems += len(result.tests_timed_out)
325 if total_problems > 0:
326 print('Reminder: look in ./test_output to view test output logs')
327 return total_problems
330 def code_coverage_report():
331 """Give a final code coverage report."""
332 text_utils.header('Code Coverage')
333 exec_utils.cmd('coverage combine .coverage*')
334 out = exec_utils.cmd('coverage report --omit=config-3.8.py,*_test.py,*_itest.py --sort=-cover')
338 To recall this report w/o re-running the tests:
340 $ coverage report --omit=config-3.8.py,*_test.py,*_itest.py --sort=-cover
342 ...from the 'tests' directory. Note that subsequent calls to
343 run_tests.py with --coverage will klobber previous results. See:
345 https://coverage.readthedocs.io/en/6.2/
350 @bootstrap.initialize
351 def main() -> Optional[int]:
353 halt_event = threading.Event()
354 threads: List[TestRunner] = []
357 params = TestingParameters(
359 halt_event=halt_event,
362 if config.config['coverage']:
363 logger.debug('Clearing existing coverage data via "coverage erase".')
364 exec_utils.cmd('coverage erase')
366 if config.config['unittests']:
368 threads.append(UnittestTestRunner(params))
369 if config.config['doctests']:
371 threads.append(DoctestTestRunner(params))
372 if config.config['integration']:
374 threads.append(IntegrationTestRunner(params))
378 print('ERROR: one of --unittests, --doctests or --integration is required.')
381 for thread in threads:
384 results: Dict[str, TestResults] = {}
385 while len(results) != len(threads):
386 for thread in threads:
387 if not thread.is_alive():
389 if tid not in results:
390 result = thread.join()
392 results[tid] = result
393 if len(result.tests_failed) > 0:
395 'Thread %s returned abnormal results; killing the others.', tid
400 if config.config['coverage']:
401 code_coverage_report()
402 total_problems = test_results_report(results)
403 return total_problems
406 if __name__ == '__main__':