4 A smart, fast test runner. Used in a git pre-commit hook.
13 from abc import ABC, abstractmethod
14 from dataclasses import dataclass
15 from typing import Any, Dict, List, Optional, Tuple
17 from overrides import overrides
24 import parallelize as par
29 logger = logging.getLogger(__name__)
30 args = config.add_commandline_args(f'({__file__})', 'Args related to __file__')
31 args.add_argument('--unittests', '-u', action='store_true', help='Run unittests.')
32 args.add_argument('--doctests', '-d', action='store_true', help='Run doctests.')
33 args.add_argument('--integration', '-i', action='store_true', help='Run integration tests.')
38 help='Run unittests, doctests and integration tests. Equivalient to -u -d -i',
41 '--coverage', '-c', action='store_true', help='Run tests and capture code coverage data'
44 HOME = os.environ['HOME']
46 # These tests will be run twice in --coverage mode: once to get code
47 # coverage and then again with not coverage enabeled. This is because
48 # they pay attention to code performance which is adversely affected
50 PERF_SENSATIVE_TESTS = set(['/home/scott/lib/python_modules/tests/string_utils_test.py'])
54 class TestingParameters:
56 """Should we stop as soon as one error has occurred?"""
58 halt_event: threading.Event
59 """An event that, when set, indicates to stop ASAP."""
65 """The name of the test"""
68 """The kind of the test"""
71 """The command line to execute"""
77 """The name of this test / set of tests."""
79 tests_executed: List[str]
80 """Tests that were executed."""
82 tests_succeeded: List[str]
83 """Tests that succeeded."""
85 tests_failed: List[str]
86 """Tests that failed."""
88 tests_timed_out: List[str]
89 """Tests that timed out."""
91 def __add__(self, other):
92 self.tests_executed.extend(other.tests_executed)
93 self.tests_succeeded.extend(other.tests_succeeded)
94 self.tests_failed.extend(other.tests_failed)
95 self.tests_timed_out.extend(other.tests_timed_out)
100 def __repr__(self) -> str:
101 out = f'{self.name}: '
102 out += f'{ansi.fg("green")}'
103 out += f'{len(self.tests_succeeded)}/{len(self.tests_executed)} passed'
104 out += f'{ansi.reset()}.\n'
106 if len(self.tests_failed) > 0:
107 out += f' ..{ansi.fg("red")}'
108 out += f'{len(self.tests_failed)} tests failed'
109 out += f'{ansi.reset()}:\n'
110 for test in self.tests_failed:
114 if len(self.tests_timed_out) > 0:
115 out += f' ..{ansi.fg("yellow")}'
116 out += f'{len(self.tests_timed_out)} tests timed out'
117 out += f'{ansi.reset()}:\n'
118 for test in self.tests_failed:
124 class TestRunner(ABC, thread_utils.ThreadWithReturnValue):
125 """A Base class for something that runs a test."""
127 def __init__(self, params: TestingParameters):
128 """Create a TestRunner.
131 params: Test running paramters.
134 super().__init__(self, target=self.begin, args=[params])
136 self.test_results = TestResults(
137 name=self.get_name(),
143 self.tests_started = 0
144 self.lock = threading.Lock()
147 def get_name(self) -> str:
148 """The name of this test collection."""
151 def get_status(self) -> Tuple[int, TestResults]:
152 """Ask the TestRunner for its status."""
154 return (self.tests_started, self.test_results)
157 def begin(self, params: TestingParameters) -> TestResults:
158 """Start execution."""
162 class TemplatedTestRunner(TestRunner, ABC):
163 """A TestRunner that has a recipe for executing the tests."""
166 def identify_tests(self) -> List[TestToRun]:
167 """Return a list of tuples (test, cmdline) that should be executed."""
171 def run_test(self, test: TestToRun) -> TestResults:
172 """Run a single test and return its TestResults."""
175 def check_for_abort(self):
176 """Periodically caled to check to see if we need to stop."""
178 if self.params.halt_event.is_set():
179 logger.debug('Thread %s saw halt event; exiting.', self.get_name())
180 raise Exception("Kill myself!")
181 if self.params.halt_on_error:
182 if len(self.test_results.tests_failed) > 0:
183 logger.error('Thread %s saw abnormal results; exiting.', self.get_name())
184 raise Exception("Kill myself!")
186 def persist_output(self, test: TestToRun, message: str, output: str) -> None:
187 """Called to save the output of a test run."""
189 dest = f'{test.name}-output.txt'
190 with open(f'./test_output/{dest}', 'w') as wf:
191 print(message, file=wf)
192 print('-' * len(message), file=wf)
195 def execute_commandline(
199 timeout: float = 120.0,
201 """Execute a particular commandline to run a test."""
204 output = exec_utils.cmd(
206 timeout_seconds=timeout,
208 self.persist_output(test, f'{test.name} ({test.cmdline}) succeeded.', output)
209 logger.debug('%s: %s (%s) succeeded', self.get_name(), test.name, test.cmdline)
210 return TestResults(test.name, [test.name], [test.name], [], [])
211 except subprocess.TimeoutExpired as e:
212 msg = f'{self.get_name()}: {test.name} ({test.cmdline}) timed out after {e.timeout:.1f} seconds.'
215 '%s: %s output when it timed out: %s', self.get_name(), test.name, e.output
217 self.persist_output(test, msg, e.output.decode('utf-8'))
225 except subprocess.CalledProcessError as e:
227 f'{self.get_name()}: {test.name} ({test.cmdline}) failed; exit code {e.returncode}'
230 logger.debug('%s: %s output when it failed: %s', self.get_name(), test.name, e.output)
231 self.persist_output(test, msg, e.output.decode('utf-8'))
241 def begin(self, params: TestingParameters) -> TestResults:
242 logger.debug('Thread %s started.', self.get_name())
243 interesting_tests = self.identify_tests()
244 logger.debug('%s: Identified %d tests to be run.', self.get_name(), len(interesting_tests))
246 # Note: because of @parallelize on run_tests it actually
247 # returns a SmartFuture with a TestResult inside of it.
248 # That's the reason for this Any business.
249 running: List[Any] = []
250 for test_to_run in interesting_tests:
251 running.append(self.run_test(test_to_run))
253 '%s: Test %s started in the background.', self.get_name(), test_to_run.name
255 self.tests_started += 1
257 for future in smart_future.wait_any(running):
258 self.check_for_abort()
259 result = future._resolve()
260 logger.debug('Test %s finished.', result.name)
261 self.test_results += result
263 logger.debug('Thread %s finished.', self.get_name())
264 return self.test_results
267 class UnittestTestRunner(TemplatedTestRunner):
268 """Run all known Unittests."""
271 def get_name(self) -> str:
275 def identify_tests(self) -> List[TestToRun]:
277 for test in file_utils.expand_globs('*_test.py'):
278 basename = file_utils.without_path(test)
279 if config.config['coverage']:
283 kind='unittest capturing coverage',
284 cmdline=f'coverage run --source {HOME}/lib {test} --unittests_ignore_perf 2>&1',
287 if test in PERF_SENSATIVE_TESTS:
291 kind='unittest w/o coverage to record perf',
292 cmdline=f'{test} 2>&1',
300 cmdline=f'{test} 2>&1',
306 def run_test(self, test: TestToRun) -> TestResults:
307 return self.execute_commandline(test)
310 class DoctestTestRunner(TemplatedTestRunner):
311 """Run all known Doctests."""
314 def get_name(self) -> str:
318 def identify_tests(self) -> List[TestToRun]:
320 out = exec_utils.cmd('grep -lR "^ *import doctest" /home/scott/lib/python_modules/*')
321 for test in out.split('\n'):
322 if re.match(r'.*\.py$', test):
323 if 'run_tests.py' not in test:
324 basename = file_utils.without_path(test)
325 if config.config['coverage']:
329 kind='doctest capturing coverage',
330 cmdline=f'coverage run --source {HOME}/lib {test} 2>&1',
333 if test in PERF_SENSATIVE_TESTS:
337 kind='doctest w/o coverage to record perf',
338 cmdline=f'python3 {test} 2>&1',
343 TestToRun(name=basename, kind='doctest', cmdline=f'python3 {test} 2>&1')
348 def run_test(self, test: TestToRun) -> TestResults:
349 return self.execute_commandline(test)
352 class IntegrationTestRunner(TemplatedTestRunner):
353 """Run all know Integration tests."""
356 def get_name(self) -> str:
357 return "Integration Tests"
360 def identify_tests(self) -> List[TestToRun]:
362 for test in file_utils.expand_globs('*_itest.py'):
363 basename = file_utils.without_path(test)
364 if config.config['coverage']:
368 kind='integration test capturing coverage',
369 cmdline=f'coverage run --source {HOME}/lib {test} 2>&1',
372 if test in PERF_SENSATIVE_TESTS:
376 kind='integration test w/o coverage to capture perf',
377 cmdline=f'{test} 2>&1',
382 TestToRun(name=basename, kind='integration test', cmdline=f'{test} 2>&1')
387 def run_test(self, test: TestToRun) -> TestResults:
388 return self.execute_commandline(test)
391 def test_results_report(results: Dict[str, TestResults]) -> int:
392 """Give a final report about the tests that were run."""
394 for result in results.values():
395 print(result, end='')
396 total_problems += len(result.tests_failed)
397 total_problems += len(result.tests_timed_out)
399 if total_problems > 0:
400 print('Reminder: look in ./test_output to view test output logs')
401 return total_problems
404 def code_coverage_report():
405 """Give a final code coverage report."""
406 text_utils.header('Code Coverage')
407 exec_utils.cmd('coverage combine .coverage*')
408 out = exec_utils.cmd('coverage report --omit=config-3.8.py,*_test.py,*_itest.py --sort=-cover')
412 To recall this report w/o re-running the tests:
414 $ coverage report --omit=config-3.8.py,*_test.py,*_itest.py --sort=-cover
416 ...from the 'tests' directory. Note that subsequent calls to
417 run_tests.py with --coverage will klobber previous results. See:
419 https://coverage.readthedocs.io/en/6.2/
424 @bootstrap.initialize
425 def main() -> Optional[int]:
427 halt_event = threading.Event()
428 threads: List[TestRunner] = []
431 params = TestingParameters(
433 halt_event=halt_event,
436 if config.config['coverage']:
437 logger.debug('Clearing existing coverage data via "coverage erase".')
438 exec_utils.cmd('coverage erase')
440 if config.config['unittests'] or config.config['all']:
442 threads.append(UnittestTestRunner(params))
443 if config.config['doctests'] or config.config['all']:
445 threads.append(DoctestTestRunner(params))
446 if config.config['integration'] or config.config['all']:
448 threads.append(IntegrationTestRunner(params))
452 print('ERROR: one of --unittests, --doctests or --integration is required.')
455 for thread in threads:
458 results: Dict[str, TestResults] = {}
459 while len(results) != len(threads):
464 for thread in threads:
465 (s, tr) = thread.get_status()
467 failed += len(tr.tests_failed) + len(tr.tests_timed_out)
468 done += failed + len(tr.tests_succeeded)
469 if not thread.is_alive():
471 if tid not in results:
472 result = thread.join()
474 results[tid] = result
475 if len(result.tests_failed) > 0:
477 'Thread %s returned abnormal results; killing the others.', tid
482 percent_done = done / started
487 color = ansi.fg('green')
489 color = ansi.fg('red')
491 if percent_done < 100.0:
493 text_utils.bar_graph_string(
496 text=text_utils.BarGraphText.FRACTION,
505 print(f'{ansi.clear_line()}Final Report:')
506 if config.config['coverage']:
507 code_coverage_report()
508 total_problems = test_results_report(results)
509 return total_problems
512 if __name__ == '__main__':