4 A smart, fast test runner. Used in a git pre-commit hook.
13 from abc import ABC, abstractmethod
14 from dataclasses import dataclass
15 from typing import Any, Dict, List, Optional
17 from overrides import overrides
19 from pyutils import ansi, bootstrap, config, dict_utils, exec_utils, text_utils
20 from pyutils.files import file_utils
21 from pyutils.parallelize import parallelize as par
22 from pyutils.parallelize import smart_future, thread_utils
24 logger = logging.getLogger(__name__)
25 args = config.add_commandline_args(
26 f'Run Tests Driver ({__file__})', f'Args related to {__file__}'
28 args.add_argument('--unittests', '-u', action='store_true', help='Run unittests.')
29 args.add_argument('--doctests', '-d', action='store_true', help='Run doctests.')
31 '--integration', '-i', action='store_true', help='Run integration tests.'
37 help='Run unittests, doctests and integration tests. Equivalient to -u -d -i',
43 help='Run tests and capture code coverage data',
46 HOME = os.environ['HOME']
48 # These tests will be run twice in --coverage mode: once to get code
49 # coverage and then again with not coverage enabeled. This is because
50 # they pay attention to code performance which is adversely affected
52 PERF_SENSATIVE_TESTS = set(['string_utils_test.py'])
53 TESTS_TO_SKIP = set(['zookeeper_test.py', 'zookeeper.py', 'run_tests.py'])
59 class TestingParameters:
61 """Should we stop as soon as one error has occurred?"""
63 halt_event: threading.Event
64 """An event that, when set, indicates to stop ASAP."""
70 """The name of the test"""
73 """The kind of the test"""
76 """The command line to execute"""
82 """The name of this test / set of tests."""
84 tests_executed: Dict[str, float]
85 """Tests that were executed."""
87 tests_succeeded: List[str]
88 """Tests that succeeded."""
90 tests_failed: List[str]
91 """Tests that failed."""
93 tests_timed_out: List[str]
94 """Tests that timed out."""
96 def __add__(self, other):
97 merged = dict_utils.coalesce(
98 [self.tests_executed, other.tests_executed],
99 aggregation_function=dict_utils.raise_on_duplicated_keys,
101 self.tests_executed = merged
102 self.tests_succeeded.extend(other.tests_succeeded)
103 self.tests_failed.extend(other.tests_failed)
104 self.tests_timed_out.extend(other.tests_timed_out)
109 def __repr__(self) -> str:
110 out = f'{self.name}: '
111 out += f'{ansi.fg("green")}'
112 out += f'{len(self.tests_succeeded)}/{len(self.tests_executed)} passed'
113 out += f'{ansi.reset()}.\n'
115 if len(self.tests_failed) > 0:
116 out += f' ..{ansi.fg("red")}'
117 out += f'{len(self.tests_failed)} tests failed'
118 out += f'{ansi.reset()}:\n'
119 for test in self.tests_failed:
123 if len(self.tests_timed_out) > 0:
124 out += f' ..{ansi.fg("yellow")}'
125 out += f'{len(self.tests_timed_out)} tests timed out'
126 out += f'{ansi.reset()}:\n'
127 for test in self.tests_failed:
133 class TestRunner(ABC, thread_utils.ThreadWithReturnValue):
134 """A Base class for something that runs a test."""
136 def __init__(self, params: TestingParameters):
137 """Create a TestRunner.
140 params: Test running paramters.
143 super().__init__(self, target=self.begin, args=[params])
145 self.test_results = TestResults(
146 name=self.get_name(),
152 self.lock = threading.Lock()
155 def get_name(self) -> str:
156 """The name of this test collection."""
159 def get_status(self) -> TestResults:
160 """Ask the TestRunner for its status."""
162 return self.test_results
165 def begin(self, params: TestingParameters) -> TestResults:
166 """Start execution."""
170 class TemplatedTestRunner(TestRunner, ABC):
171 """A TestRunner that has a recipe for executing the tests."""
174 def identify_tests(self) -> List[TestToRun]:
175 """Return a list of tuples (test, cmdline) that should be executed."""
179 def run_test(self, test: TestToRun) -> TestResults:
180 """Run a single test and return its TestResults."""
183 def check_for_abort(self) -> bool:
184 """Periodically caled to check to see if we need to stop."""
186 if self.params.halt_event.is_set():
187 logger.debug('Thread %s saw halt event; exiting.', self.get_name())
190 if self.params.halt_on_error and len(self.test_results.tests_failed) > 0:
191 logger.error('Thread %s saw abnormal results; exiting.', self.get_name())
195 def persist_output(self, test: TestToRun, message: str, output: str) -> None:
196 """Called to save the output of a test run."""
198 dest = f'{test.name}-output.txt'
199 with open(f'./test_output/{dest}', 'w') as wf:
200 print(message, file=wf)
201 print('-' * len(message), file=wf)
204 def execute_commandline(
208 timeout: float = 120.0,
210 """Execute a particular commandline to run a test."""
213 output = exec_utils.cmd(
215 timeout_seconds=timeout,
217 if "***Test Failed***" in output:
218 msg = f'{self.get_name()}: {test.name} ({test.cmdline}) failed; doctest failure message detected'
220 self.persist_output(test, msg, output)
230 test, f'{test.name} ({test.cmdline}) succeeded.', output
233 '%s: %s (%s) succeeded', self.get_name(), test.name, test.cmdline
235 return TestResults(test.name, {}, [test.name], [], [])
236 except subprocess.TimeoutExpired as e:
237 msg = f'{self.get_name()}: {test.name} ({test.cmdline}) timed out after {e.timeout:.1f} seconds.'
240 '%s: %s output when it timed out: %s',
245 self.persist_output(test, msg, e.output.decode('utf-8'))
253 except subprocess.CalledProcessError as e:
254 msg = f'{self.get_name()}: {test.name} ({test.cmdline}) failed; exit code {e.returncode}'
257 '%s: %s output when it failed: %s', self.get_name(), test.name, e.output
259 self.persist_output(test, msg, e.output.decode('utf-8'))
269 def begin(self, params: TestingParameters) -> TestResults:
270 logger.debug('Thread %s started.', self.get_name())
271 interesting_tests = self.identify_tests()
273 '%s: Identified %d tests to be run.',
275 len(interesting_tests),
278 # Note: because of @parallelize on run_tests it actually
279 # returns a SmartFuture with a TestResult inside of it.
280 # That's the reason for this Any business.
281 running: List[Any] = []
282 for test_to_run in interesting_tests:
283 running.append(self.run_test(test_to_run))
285 '%s: Test %s started in the background.',
289 self.test_results.tests_executed[test_to_run.name] = time.time()
291 for result in smart_future.wait_any(running, log_exceptions=False):
292 logger.debug('Test %s finished.', result.name)
294 # We sometimes run the same test more than once. Do not allow
295 # one run's results to klobber the other's.
296 self.test_results += result
297 if self.check_for_abort():
299 '%s: check_for_abort told us to exit early.', self.get_name()
301 return self.test_results
303 logger.debug('Thread %s finished running all tests', self.get_name())
304 return self.test_results
307 class UnittestTestRunner(TemplatedTestRunner):
308 """Run all known Unittests."""
311 def get_name(self) -> str:
315 def identify_tests(self) -> List[TestToRun]:
317 for test in file_utils.get_matching_files_recursive(ROOT, '*_test.py'):
318 basename = file_utils.without_path(test)
319 if basename in TESTS_TO_SKIP:
321 if config.config['coverage']:
325 kind='unittest capturing coverage',
326 cmdline=f'coverage run --source ../src {test} --unittests_ignore_perf 2>&1',
329 if basename in PERF_SENSATIVE_TESTS:
332 name=f'{basename}_no_coverage',
333 kind='unittest w/o coverage to record perf',
334 cmdline=f'{test} 2>&1',
342 cmdline=f'{test} 2>&1',
348 def run_test(self, test: TestToRun) -> TestResults:
349 return self.execute_commandline(test)
352 class DoctestTestRunner(TemplatedTestRunner):
353 """Run all known Doctests."""
356 def get_name(self) -> str:
360 def identify_tests(self) -> List[TestToRun]:
362 out = exec_utils.cmd(f'grep -lR "^ *import doctest" {ROOT}/*')
363 for test in out.split('\n'):
364 if re.match(r'.*\.py$', test):
365 basename = file_utils.without_path(test)
366 if basename in TESTS_TO_SKIP:
368 if config.config['coverage']:
372 kind='doctest capturing coverage',
373 cmdline=f'coverage run --source ../src {test} 2>&1',
376 if basename in PERF_SENSATIVE_TESTS:
379 name=f'{basename}_no_coverage',
380 kind='doctest w/o coverage to record perf',
381 cmdline=f'python3 {test} 2>&1',
389 cmdline=f'python3 {test} 2>&1',
395 def run_test(self, test: TestToRun) -> TestResults:
396 return self.execute_commandline(test)
399 class IntegrationTestRunner(TemplatedTestRunner):
400 """Run all know Integration tests."""
403 def get_name(self) -> str:
404 return "Integration Tests"
407 def identify_tests(self) -> List[TestToRun]:
409 for test in file_utils.get_matching_files_recursive(ROOT, '*_itest.py'):
410 basename = file_utils.without_path(test)
411 if basename in TESTS_TO_SKIP:
413 if config.config['coverage']:
417 kind='integration test capturing coverage',
418 cmdline=f'coverage run --source ../src {test} 2>&1',
421 if basename in PERF_SENSATIVE_TESTS:
424 name=f'{basename}_no_coverage',
425 kind='integration test w/o coverage to capture perf',
426 cmdline=f'{test} 2>&1',
432 name=basename, kind='integration test', cmdline=f'{test} 2>&1'
438 def run_test(self, test: TestToRun) -> TestResults:
439 return self.execute_commandline(test)
442 def test_results_report(results: Dict[str, Optional[TestResults]]) -> int:
443 """Give a final report about the tests that were run."""
445 for result in results.values():
447 print('Unexpected unhandled exception in test runner!!!')
450 print(result, end='')
451 total_problems += len(result.tests_failed)
452 total_problems += len(result.tests_timed_out)
454 if total_problems > 0:
455 print('Reminder: look in ./test_output to view test output logs')
456 return total_problems
459 def code_coverage_report():
460 """Give a final code coverage report."""
461 text_utils.header('Code Coverage')
462 exec_utils.cmd('coverage combine .coverage*')
463 out = exec_utils.cmd(
464 'coverage report --omit=config-3.*.py,*_test.py,*_itest.py --sort=-cover'
468 """To recall this report w/o re-running the tests:
470 $ coverage report --omit=config-3.*.py,*_test.py,*_itest.py --sort=-cover
472 ...from the 'tests' directory. Note that subsequent calls to
473 run_tests.py with --coverage will klobber previous results. See:
475 https://coverage.readthedocs.io/en/6.2/
480 @bootstrap.initialize
481 def main() -> Optional[int]:
483 halt_event = threading.Event()
484 threads: List[TestRunner] = []
487 params = TestingParameters(
489 halt_event=halt_event,
492 if config.config['coverage']:
493 logger.debug('Clearing existing coverage data via "coverage erase".')
494 exec_utils.cmd('coverage erase')
496 if config.config['unittests'] or config.config['all']:
498 threads.append(UnittestTestRunner(params))
499 if config.config['doctests'] or config.config['all']:
501 threads.append(DoctestTestRunner(params))
502 if config.config['integration'] or config.config['all']:
504 threads.append(IntegrationTestRunner(params))
508 print('ERROR: one of --unittests, --doctests or --integration is required.')
511 for thread in threads:
514 results: Dict[str, Optional[TestResults]] = {}
515 start_time = time.time()
516 last_update = start_time
519 while len(results) != len(threads):
524 for thread in threads:
526 tr = thread.get_status()
527 started += len(tr.tests_executed)
528 failed += len(tr.tests_failed) + len(tr.tests_timed_out)
529 done += failed + len(tr.tests_succeeded)
530 running = set(tr.tests_executed.keys())
531 running -= set(tr.tests_failed)
532 running -= set(tr.tests_succeeded)
533 running -= set(tr.tests_timed_out)
534 running_with_start_time = {
535 test: tr.tests_executed[test] for test in running
537 still_running[tid] = running_with_start_time
540 if now - start_time > 5.0:
541 if now - last_update > 3.0:
544 for _, running_dict in still_running.items():
545 for test_name, start_time in running_dict.items():
546 if now - start_time > 10.0:
547 update.append(f'{test_name}@{now-start_time:.1f}s')
549 update.append(test_name)
550 print(f'\r{ansi.clear_line()}')
552 print(f'Still running: {",".join(update)}')
554 print(f'Still running: {len(update)} tests.')
556 if not thread.is_alive():
557 if tid not in results:
558 result = thread.join()
560 results[tid] = result
561 if len(result.tests_failed) > 0:
563 'Thread %s returned abnormal results; killing the others.',
569 'Thread %s took an unhandled exception... bug in run_tests.py?! Aborting.',
576 color = ansi.fg('green')
578 color = ansi.fg('red')
581 percent_done = done / started * 100.0
585 if percent_done < 100.0:
587 text_utils.bar_graph_string(
590 text=text_utils.BarGraphText.FRACTION,
599 print(f'{ansi.clear_line()}Final Report:')
600 if config.config['coverage']:
601 code_coverage_report()
602 total_problems = test_results_report(results)
603 if total_problems > 0:
605 'Exiting with non-zero return code %d due to problems.', total_problems
607 return total_problems
610 if __name__ == '__main__':