4 A smart, fast test runner. Used in a git pre-commit hook.
13 from abc import ABC, abstractmethod
14 from dataclasses import dataclass
15 from typing import Any, Dict, List, Optional, Tuple
17 from overrides import overrides
24 import parallelize as par
29 logger = logging.getLogger(__name__)
30 args = config.add_commandline_args(f'({__file__})', f'Args related to {__file__}')
31 args.add_argument('--unittests', '-u', action='store_true', help='Run unittests.')
32 args.add_argument('--doctests', '-d', action='store_true', help='Run doctests.')
33 args.add_argument('--integration', '-i', action='store_true', help='Run integration tests.')
38 help='Run unittests, doctests and integration tests. Equivalient to -u -d -i',
41 '--coverage', '-c', action='store_true', help='Run tests and capture code coverage data'
44 HOME = os.environ['HOME']
46 # These tests will be run twice in --coverage mode: once to get code
47 # coverage and then again with not coverage enabeled. This is because
48 # they pay attention to code performance which is adversely affected
50 PERF_SENSATIVE_TESTS = set(['/home/scott/lib/python_modules/tests/string_utils_test.py'])
54 class TestingParameters:
56 """Should we stop as soon as one error has occurred?"""
58 halt_event: threading.Event
59 """An event that, when set, indicates to stop ASAP."""
65 """The name of the test"""
68 """The kind of the test"""
71 """The command line to execute"""
77 """The name of this test / set of tests."""
79 tests_executed: List[str]
80 """Tests that were executed."""
82 tests_succeeded: List[str]
83 """Tests that succeeded."""
85 tests_failed: List[str]
86 """Tests that failed."""
88 tests_timed_out: List[str]
89 """Tests that timed out."""
91 def __add__(self, other):
92 self.tests_executed.extend(other.tests_executed)
93 self.tests_succeeded.extend(other.tests_succeeded)
94 self.tests_failed.extend(other.tests_failed)
95 self.tests_timed_out.extend(other.tests_timed_out)
100 def __repr__(self) -> str:
101 out = f'{self.name}: '
102 out += f'{ansi.fg("green")}'
103 out += f'{len(self.tests_succeeded)}/{len(self.tests_executed)} passed'
104 out += f'{ansi.reset()}.\n'
106 if len(self.tests_failed) > 0:
107 out += f' ..{ansi.fg("red")}'
108 out += f'{len(self.tests_failed)} tests failed'
109 out += f'{ansi.reset()}:\n'
110 for test in self.tests_failed:
114 if len(self.tests_timed_out) > 0:
115 out += f' ..{ansi.fg("yellow")}'
116 out += f'{len(self.tests_timed_out)} tests timed out'
117 out += f'{ansi.reset()}:\n'
118 for test in self.tests_failed:
124 class TestRunner(ABC, thread_utils.ThreadWithReturnValue):
125 """A Base class for something that runs a test."""
127 def __init__(self, params: TestingParameters):
128 """Create a TestRunner.
131 params: Test running paramters.
134 super().__init__(self, target=self.begin, args=[params])
136 self.test_results = TestResults(
137 name=self.get_name(),
143 self.tests_started = 0
144 self.lock = threading.Lock()
147 def get_name(self) -> str:
148 """The name of this test collection."""
151 def get_status(self) -> Tuple[int, TestResults]:
152 """Ask the TestRunner for its status."""
154 return (self.tests_started, self.test_results)
157 def begin(self, params: TestingParameters) -> TestResults:
158 """Start execution."""
162 class TemplatedTestRunner(TestRunner, ABC):
163 """A TestRunner that has a recipe for executing the tests."""
166 def identify_tests(self) -> List[TestToRun]:
167 """Return a list of tuples (test, cmdline) that should be executed."""
171 def run_test(self, test: TestToRun) -> TestResults:
172 """Run a single test and return its TestResults."""
175 def check_for_abort(self):
176 """Periodically caled to check to see if we need to stop."""
178 if self.params.halt_event.is_set():
179 logger.debug('Thread %s saw halt event; exiting.', self.get_name())
180 raise Exception("Kill myself!")
181 if self.params.halt_on_error:
182 if len(self.test_results.tests_failed) > 0:
183 logger.error('Thread %s saw abnormal results; exiting.', self.get_name())
184 raise Exception("Kill myself!")
186 def persist_output(self, test: TestToRun, message: str, output: str) -> None:
187 """Called to save the output of a test run."""
189 dest = f'{test.name}-output.txt'
190 with open(f'./test_output/{dest}', 'w') as wf:
191 print(message, file=wf)
192 print('-' * len(message), file=wf)
195 def execute_commandline(
199 timeout: float = 120.0,
201 """Execute a particular commandline to run a test."""
204 output = exec_utils.cmd(
206 timeout_seconds=timeout,
208 self.persist_output(test, f'{test.name} ({test.cmdline}) succeeded.', output)
209 logger.debug('%s: %s (%s) succeeded', self.get_name(), test.name, test.cmdline)
210 return TestResults(test.name, [test.name], [test.name], [], [])
211 except subprocess.TimeoutExpired as e:
212 msg = f'{self.get_name()}: {test.name} ({test.cmdline}) timed out after {e.timeout:.1f} seconds.'
215 '%s: %s output when it timed out: %s', self.get_name(), test.name, e.output
217 self.persist_output(test, msg, e.output.decode('utf-8'))
225 except subprocess.CalledProcessError as e:
227 f'{self.get_name()}: {test.name} ({test.cmdline}) failed; exit code {e.returncode}'
230 logger.debug('%s: %s output when it failed: %s', self.get_name(), test.name, e.output)
231 self.persist_output(test, msg, e.output.decode('utf-8'))
241 def begin(self, params: TestingParameters) -> TestResults:
242 logger.debug('Thread %s started.', self.get_name())
243 interesting_tests = self.identify_tests()
244 logger.debug('%s: Identified %d tests to be run.', self.get_name(), len(interesting_tests))
246 # Note: because of @parallelize on run_tests it actually
247 # returns a SmartFuture with a TestResult inside of it.
248 # That's the reason for this Any business.
249 running: List[Any] = []
250 for test_to_run in interesting_tests:
251 running.append(self.run_test(test_to_run))
253 '%s: Test %s started in the background.', self.get_name(), test_to_run.name
255 self.tests_started += 1
257 for future in smart_future.wait_any(running):
258 self.check_for_abort()
259 result = future._resolve()
260 logger.debug('Test %s finished.', result.name)
261 self.test_results += result
263 logger.debug('Thread %s finished.', self.get_name())
264 return self.test_results
267 class UnittestTestRunner(TemplatedTestRunner):
268 """Run all known Unittests."""
271 def get_name(self) -> str:
275 def identify_tests(self) -> List[TestToRun]:
277 for test in file_utils.expand_globs('*_test.py'):
278 basename = file_utils.without_path(test)
279 if config.config['coverage']:
283 kind='unittest capturing coverage',
284 cmdline=f'coverage run --source {HOME}/lib {test} --unittests_ignore_perf 2>&1',
287 if test in PERF_SENSATIVE_TESTS:
291 kind='unittest w/o coverage to record perf',
292 cmdline=f'{test} 2>&1',
300 cmdline=f'{test} 2>&1',
306 def run_test(self, test: TestToRun) -> TestResults:
307 return self.execute_commandline(test)
310 class DoctestTestRunner(TemplatedTestRunner):
311 """Run all known Doctests."""
314 def get_name(self) -> str:
318 def identify_tests(self) -> List[TestToRun]:
320 out = exec_utils.cmd('grep -lR "^ *import doctest" /home/scott/lib/python_modules/*')
321 for test in out.split('\n'):
322 if re.match(r'.*\.py$', test):
323 if 'run_tests.py' not in test:
324 basename = file_utils.without_path(test)
325 if config.config['coverage']:
329 kind='doctest capturing coverage',
330 cmdline=f'coverage run --source {HOME}/lib {test} 2>&1',
333 if test in PERF_SENSATIVE_TESTS:
337 kind='doctest w/o coverage to record perf',
338 cmdline=f'python3 {test} 2>&1',
343 TestToRun(name=basename, kind='doctest', cmdline=f'python3 {test} 2>&1')
348 def run_test(self, test: TestToRun) -> TestResults:
349 return self.execute_commandline(test)
352 class IntegrationTestRunner(TemplatedTestRunner):
353 """Run all know Integration tests."""
356 def get_name(self) -> str:
357 return "Integration Tests"
360 def identify_tests(self) -> List[TestToRun]:
362 for test in file_utils.expand_globs('*_itest.py'):
363 basename = file_utils.without_path(test)
364 if config.config['coverage']:
368 kind='integration test capturing coverage',
369 cmdline=f'coverage run --source {HOME}/lib {test} 2>&1',
372 if test in PERF_SENSATIVE_TESTS:
376 kind='integration test w/o coverage to capture perf',
377 cmdline=f'{test} 2>&1',
382 TestToRun(name=basename, kind='integration test', cmdline=f'{test} 2>&1')
387 def run_test(self, test: TestToRun) -> TestResults:
388 return self.execute_commandline(test)
391 def test_results_report(results: Dict[str, TestResults]) -> int:
392 """Give a final report about the tests that were run."""
394 for result in results.values():
395 print(result, end='')
396 total_problems += len(result.tests_failed)
397 total_problems += len(result.tests_timed_out)
399 if total_problems > 0:
400 print('Reminder: look in ./test_output to view test output logs')
401 return total_problems
404 def code_coverage_report():
405 """Give a final code coverage report."""
406 text_utils.header('Code Coverage')
407 exec_utils.cmd('coverage combine .coverage*')
408 out = exec_utils.cmd('coverage report --omit=config-3.8.py,*_test.py,*_itest.py --sort=-cover')
411 """To recall this report w/o re-running the tests:
413 $ coverage report --omit=config-3.8.py,*_test.py,*_itest.py --sort=-cover
415 ...from the 'tests' directory. Note that subsequent calls to
416 run_tests.py with --coverage will klobber previous results. See:
418 https://coverage.readthedocs.io/en/6.2/
423 @bootstrap.initialize
424 def main() -> Optional[int]:
426 halt_event = threading.Event()
427 threads: List[TestRunner] = []
430 params = TestingParameters(
432 halt_event=halt_event,
435 if config.config['coverage']:
436 logger.debug('Clearing existing coverage data via "coverage erase".')
437 exec_utils.cmd('coverage erase')
439 if config.config['unittests'] or config.config['all']:
441 threads.append(UnittestTestRunner(params))
442 if config.config['doctests'] or config.config['all']:
444 threads.append(DoctestTestRunner(params))
445 if config.config['integration'] or config.config['all']:
447 threads.append(IntegrationTestRunner(params))
451 print('ERROR: one of --unittests, --doctests or --integration is required.')
454 for thread in threads:
457 results: Dict[str, TestResults] = {}
458 while len(results) != len(threads):
463 for thread in threads:
464 (s, tr) = thread.get_status()
466 failed += len(tr.tests_failed) + len(tr.tests_timed_out)
467 done += failed + len(tr.tests_succeeded)
468 if not thread.is_alive():
470 if tid not in results:
471 result = thread.join()
473 results[tid] = result
474 if len(result.tests_failed) > 0:
476 'Thread %s returned abnormal results; killing the others.', tid
481 percent_done = done / started
486 color = ansi.fg('green')
488 color = ansi.fg('red')
490 if percent_done < 100.0:
492 text_utils.bar_graph_string(
495 text=text_utils.BarGraphText.FRACTION,
504 print(f'{ansi.clear_line()}Final Report:')
505 if config.config['coverage']:
506 code_coverage_report()
507 total_problems = test_results_report(results)
508 return total_problems
511 if __name__ == '__main__':