4 A smart, fast test runner. Used in a git pre-commit hook.
13 from abc import ABC, abstractmethod
14 from dataclasses import dataclass
15 from typing import Any, Dict, List, Optional
17 from overrides import overrides
19 from pyutils import ansi, bootstrap, config, exec_utils, text_utils
20 from pyutils.files import file_utils
21 from pyutils.parallelize import parallelize as par
22 from pyutils.parallelize import smart_future, thread_utils
24 logger = logging.getLogger(__name__)
25 args = config.add_commandline_args(
26 f'Run Tests Driver ({__file__})', f'Args related to {__file__}'
28 args.add_argument('--unittests', '-u', action='store_true', help='Run unittests.')
29 args.add_argument('--doctests', '-d', action='store_true', help='Run doctests.')
31 '--integration', '-i', action='store_true', help='Run integration tests.'
37 help='Run unittests, doctests and integration tests. Equivalient to -u -d -i',
43 help='Run tests and capture code coverage data',
46 HOME = os.environ['HOME']
48 # These tests will be run twice in --coverage mode: once to get code
49 # coverage and then again with not coverage enabeled. This is because
50 # they pay attention to code performance which is adversely affected
52 PERF_SENSATIVE_TESTS = set(['string_utils_test.py'])
53 TESTS_TO_SKIP = set(['zookeeper_test.py', 'zookeeper.py', 'run_tests.py'])
59 class TestingParameters:
61 """Should we stop as soon as one error has occurred?"""
63 halt_event: threading.Event
64 """An event that, when set, indicates to stop ASAP."""
70 """The name of the test"""
73 """The kind of the test"""
76 """The command line to execute"""
82 """The name of this test / set of tests."""
84 tests_executed: List[str]
85 """Tests that were executed."""
87 tests_succeeded: List[str]
88 """Tests that succeeded."""
90 tests_failed: List[str]
91 """Tests that failed."""
93 tests_timed_out: List[str]
94 """Tests that timed out."""
96 def __add__(self, other):
97 self.tests_executed.extend(other.tests_executed)
98 self.tests_succeeded.extend(other.tests_succeeded)
99 self.tests_failed.extend(other.tests_failed)
100 self.tests_timed_out.extend(other.tests_timed_out)
105 def __repr__(self) -> str:
106 out = f'{self.name}: '
107 out += f'{ansi.fg("green")}'
108 out += f'{len(self.tests_succeeded)}/{len(self.tests_executed)} passed'
109 out += f'{ansi.reset()}.\n'
111 if len(self.tests_failed) > 0:
112 out += f' ..{ansi.fg("red")}'
113 out += f'{len(self.tests_failed)} tests failed'
114 out += f'{ansi.reset()}:\n'
115 for test in self.tests_failed:
119 if len(self.tests_timed_out) > 0:
120 out += f' ..{ansi.fg("yellow")}'
121 out += f'{len(self.tests_timed_out)} tests timed out'
122 out += f'{ansi.reset()}:\n'
123 for test in self.tests_failed:
129 class TestRunner(ABC, thread_utils.ThreadWithReturnValue):
130 """A Base class for something that runs a test."""
132 def __init__(self, params: TestingParameters):
133 """Create a TestRunner.
136 params: Test running paramters.
139 super().__init__(self, target=self.begin, args=[params])
141 self.test_results = TestResults(
142 name=self.get_name(),
148 self.lock = threading.Lock()
151 def get_name(self) -> str:
152 """The name of this test collection."""
155 def get_status(self) -> TestResults:
156 """Ask the TestRunner for its status."""
158 return self.test_results
161 def begin(self, params: TestingParameters) -> TestResults:
162 """Start execution."""
166 class TemplatedTestRunner(TestRunner, ABC):
167 """A TestRunner that has a recipe for executing the tests."""
170 def identify_tests(self) -> List[TestToRun]:
171 """Return a list of tuples (test, cmdline) that should be executed."""
175 def run_test(self, test: TestToRun) -> TestResults:
176 """Run a single test and return its TestResults."""
179 def check_for_abort(self) -> bool:
180 """Periodically caled to check to see if we need to stop."""
182 if self.params.halt_event.is_set():
183 logger.debug('Thread %s saw halt event; exiting.', self.get_name())
186 if self.params.halt_on_error and len(self.test_results.tests_failed) > 0:
187 logger.error('Thread %s saw abnormal results; exiting.', self.get_name())
191 def persist_output(self, test: TestToRun, message: str, output: str) -> None:
192 """Called to save the output of a test run."""
194 dest = f'{test.name}-output.txt'
195 with open(f'./test_output/{dest}', 'w') as wf:
196 print(message, file=wf)
197 print('-' * len(message), file=wf)
200 def execute_commandline(
204 timeout: float = 120.0,
206 """Execute a particular commandline to run a test."""
209 output = exec_utils.cmd(
211 timeout_seconds=timeout,
213 if "***Test Failed***" in output:
214 msg = f'{self.get_name()}: {test.name} ({test.cmdline}) failed; doctest failure message detected'
216 self.persist_output(test, msg, output)
226 test, f'{test.name} ({test.cmdline}) succeeded.', output
229 '%s: %s (%s) succeeded', self.get_name(), test.name, test.cmdline
231 return TestResults(test.name, [], [test.name], [], [])
232 except subprocess.TimeoutExpired as e:
233 msg = f'{self.get_name()}: {test.name} ({test.cmdline}) timed out after {e.timeout:.1f} seconds.'
236 '%s: %s output when it timed out: %s',
241 self.persist_output(test, msg, e.output.decode('utf-8'))
249 except subprocess.CalledProcessError as e:
250 msg = f'{self.get_name()}: {test.name} ({test.cmdline}) failed; exit code {e.returncode}'
253 '%s: %s output when it failed: %s', self.get_name(), test.name, e.output
255 self.persist_output(test, msg, e.output.decode('utf-8'))
265 def begin(self, params: TestingParameters) -> TestResults:
266 logger.debug('Thread %s started.', self.get_name())
267 interesting_tests = self.identify_tests()
269 '%s: Identified %d tests to be run.',
271 len(interesting_tests),
274 # Note: because of @parallelize on run_tests it actually
275 # returns a SmartFuture with a TestResult inside of it.
276 # That's the reason for this Any business.
277 running: List[Any] = []
278 for test_to_run in interesting_tests:
279 running.append(self.run_test(test_to_run))
281 '%s: Test %s started in the background.',
285 self.test_results.tests_executed.append(test_to_run.name)
287 for future in smart_future.wait_any(running, log_exceptions=False):
288 result = future._resolve()
289 logger.debug('Test %s finished.', result.name)
290 self.test_results += result
291 if self.check_for_abort():
293 '%s: check_for_abort told us to exit early.', self.get_name()
295 return self.test_results
297 logger.debug('Thread %s finished running all tests', self.get_name())
298 return self.test_results
301 class UnittestTestRunner(TemplatedTestRunner):
302 """Run all known Unittests."""
305 def get_name(self) -> str:
309 def identify_tests(self) -> List[TestToRun]:
311 for test in file_utils.get_matching_files_recursive(ROOT, '*_test.py'):
312 basename = file_utils.without_path(test)
313 if basename in TESTS_TO_SKIP:
315 if config.config['coverage']:
319 kind='unittest capturing coverage',
320 cmdline=f'coverage run --source ../src {test} --unittests_ignore_perf 2>&1',
323 if basename in PERF_SENSATIVE_TESTS:
327 kind='unittest w/o coverage to record perf',
328 cmdline=f'{test} 2>&1',
336 cmdline=f'{test} 2>&1',
342 def run_test(self, test: TestToRun) -> TestResults:
343 return self.execute_commandline(test)
346 class DoctestTestRunner(TemplatedTestRunner):
347 """Run all known Doctests."""
350 def get_name(self) -> str:
354 def identify_tests(self) -> List[TestToRun]:
356 out = exec_utils.cmd(f'grep -lR "^ *import doctest" {ROOT}/*')
357 for test in out.split('\n'):
358 if re.match(r'.*\.py$', test):
359 basename = file_utils.without_path(test)
360 if basename in TESTS_TO_SKIP:
362 if config.config['coverage']:
366 kind='doctest capturing coverage',
367 cmdline=f'coverage run --source ../src {test} 2>&1',
370 if basename in PERF_SENSATIVE_TESTS:
374 kind='doctest w/o coverage to record perf',
375 cmdline=f'python3 {test} 2>&1',
383 cmdline=f'python3 {test} 2>&1',
389 def run_test(self, test: TestToRun) -> TestResults:
390 return self.execute_commandline(test)
393 class IntegrationTestRunner(TemplatedTestRunner):
394 """Run all know Integration tests."""
397 def get_name(self) -> str:
398 return "Integration Tests"
401 def identify_tests(self) -> List[TestToRun]:
403 for test in file_utils.get_matching_files_recursive(ROOT, '*_itest.py'):
404 basename = file_utils.without_path(test)
405 if basename in TESTS_TO_SKIP:
407 if config.config['coverage']:
411 kind='integration test capturing coverage',
412 cmdline=f'coverage run --source ../src {test} 2>&1',
415 if basename in PERF_SENSATIVE_TESTS:
419 kind='integration test w/o coverage to capture perf',
420 cmdline=f'{test} 2>&1',
426 name=basename, kind='integration test', cmdline=f'{test} 2>&1'
432 def run_test(self, test: TestToRun) -> TestResults:
433 return self.execute_commandline(test)
436 def test_results_report(results: Dict[str, Optional[TestResults]]) -> int:
437 """Give a final report about the tests that were run."""
439 for result in results.values():
441 print('Unexpected unhandled exception in test runner!!!')
444 print(result, end='')
445 total_problems += len(result.tests_failed)
446 total_problems += len(result.tests_timed_out)
448 if total_problems > 0:
449 print('Reminder: look in ./test_output to view test output logs')
450 return total_problems
453 def code_coverage_report():
454 """Give a final code coverage report."""
455 text_utils.header('Code Coverage')
456 exec_utils.cmd('coverage combine .coverage*')
457 out = exec_utils.cmd(
458 'coverage report --omit=config-3.*.py,*_test.py,*_itest.py --sort=-cover'
462 """To recall this report w/o re-running the tests:
464 $ coverage report --omit=config-3.*.py,*_test.py,*_itest.py --sort=-cover
466 ...from the 'tests' directory. Note that subsequent calls to
467 run_tests.py with --coverage will klobber previous results. See:
469 https://coverage.readthedocs.io/en/6.2/
474 @bootstrap.initialize
475 def main() -> Optional[int]:
477 halt_event = threading.Event()
478 threads: List[TestRunner] = []
481 params = TestingParameters(
483 halt_event=halt_event,
486 if config.config['coverage']:
487 logger.debug('Clearing existing coverage data via "coverage erase".')
488 exec_utils.cmd('coverage erase')
490 if config.config['unittests'] or config.config['all']:
492 threads.append(UnittestTestRunner(params))
493 if config.config['doctests'] or config.config['all']:
495 threads.append(DoctestTestRunner(params))
496 if config.config['integration'] or config.config['all']:
498 threads.append(IntegrationTestRunner(params))
502 print('ERROR: one of --unittests, --doctests or --integration is required.')
505 for thread in threads:
508 results: Dict[str, Optional[TestResults]] = {}
509 start_time = time.time()
510 last_update = start_time
513 while len(results) != len(threads):
518 for thread in threads:
520 tr = thread.get_status()
521 started += len(tr.tests_executed)
522 failed += len(tr.tests_failed) + len(tr.tests_timed_out)
523 done += failed + len(tr.tests_succeeded)
524 running = set(tr.tests_executed)
525 running -= set(tr.tests_failed)
526 running -= set(tr.tests_succeeded)
527 running -= set(tr.tests_timed_out)
528 still_running[tid] = running
530 if time.time() - start_time > 5.0:
531 if time.time() - last_update > 3.0:
532 last_update = time.time()
534 for _, running_set in still_running.items():
535 for test_name in running_set:
536 update.append(test_name)
537 print(f'\r{ansi.clear_line()}')
539 print(f'Still running: {" ".join(update)}')
541 print(f'Still running: {len(update)} tests.')
543 if not thread.is_alive():
544 if tid not in results:
545 result = thread.join()
547 results[tid] = result
548 if len(result.tests_failed) > 0:
550 'Thread %s returned abnormal results; killing the others.',
556 'Thread %s took an unhandled exception... bug in run_tests.py?! Aborting.',
563 percent_done = done / started
568 color = ansi.fg('green')
570 color = ansi.fg('red')
572 if percent_done < 100.0:
574 text_utils.bar_graph_string(
577 text=text_utils.BarGraphText.FRACTION,
586 print(f'{ansi.clear_line()}Final Report:')
587 if config.config['coverage']:
588 code_coverage_report()
589 total_problems = test_results_report(results)
590 return total_problems
593 if __name__ == '__main__':