4 A smart, fast test runner. Used in a git pre-commit hook.
13 from abc import ABC, abstractmethod
14 from dataclasses import dataclass
15 from typing import Any, Dict, List, Optional
17 from overrides import overrides
19 from pyutils import ansi, bootstrap, config, dict_utils, exec_utils, text_utils
20 from pyutils.files import file_utils
21 from pyutils.parallelize import deferred_operand
22 from pyutils.parallelize import parallelize as par
23 from pyutils.parallelize import smart_future, thread_utils
25 logger = logging.getLogger(__name__)
26 args = config.add_commandline_args(
27 f'Run Tests Driver ({__file__})', f'Args related to {__file__}'
29 args.add_argument('--unittests', '-u', action='store_true', help='Run unittests.')
30 args.add_argument('--doctests', '-d', action='store_true', help='Run doctests.')
32 '--integration', '-i', action='store_true', help='Run integration tests.'
38 help='Run unittests, doctests and integration tests. Equivalient to -u -d -i',
44 help='Run tests and capture code coverage data',
47 HOME = os.environ['HOME']
49 # These tests will be run twice in --coverage mode: once to get code
50 # coverage and then again with not coverage enabeled. This is because
51 # they pay attention to code performance which is adversely affected
53 PERF_SENSATIVE_TESTS = set(['string_utils_test.py'])
54 TESTS_TO_SKIP = set(['zookeeper_test.py', 'zookeeper.py', 'run_tests.py'])
60 class TestingParameters:
62 """Should we stop as soon as one error has occurred?"""
64 halt_event: threading.Event
65 """An event that, when set, indicates to stop ASAP."""
71 """The name of the test"""
74 """The kind of the test"""
77 """The command line to execute"""
83 """The name of this test / set of tests."""
85 tests_executed: Dict[str, float]
86 """Tests that were executed."""
88 tests_succeeded: List[str]
89 """Tests that succeeded."""
91 tests_failed: List[str]
92 """Tests that failed."""
94 tests_timed_out: List[str]
95 """Tests that timed out."""
97 def __add__(self, other):
98 merged = dict_utils.coalesce(
99 [self.tests_executed, other.tests_executed],
100 aggregation_function=dict_utils.raise_on_duplicated_keys,
102 self.tests_executed = merged
103 self.tests_succeeded.extend(other.tests_succeeded)
104 self.tests_failed.extend(other.tests_failed)
105 self.tests_timed_out.extend(other.tests_timed_out)
110 def __repr__(self) -> str:
111 out = f'{self.name}: '
112 out += f'{ansi.fg("green")}'
113 out += f'{len(self.tests_succeeded)}/{len(self.tests_executed)} passed'
114 out += f'{ansi.reset()}.\n'
116 if len(self.tests_failed) > 0:
117 out += f' ..{ansi.fg("red")}'
118 out += f'{len(self.tests_failed)} tests failed'
119 out += f'{ansi.reset()}:\n'
120 for test in self.tests_failed:
124 if len(self.tests_timed_out) > 0:
125 out += f' ..{ansi.fg("yellow")}'
126 out += f'{len(self.tests_timed_out)} tests timed out'
127 out += f'{ansi.reset()}:\n'
128 for test in self.tests_failed:
134 class TestRunner(ABC, thread_utils.ThreadWithReturnValue):
135 """A Base class for something that runs a test."""
137 def __init__(self, params: TestingParameters):
138 """Create a TestRunner.
141 params: Test running paramters.
144 super().__init__(self, target=self.begin, args=[params])
146 self.test_results = TestResults(
147 name=self.get_name(),
153 self.lock = threading.Lock()
156 def get_name(self) -> str:
157 """The name of this test collection."""
160 def get_status(self) -> TestResults:
161 """Ask the TestRunner for its status."""
163 return self.test_results
166 def begin(self, params: TestingParameters) -> TestResults:
167 """Start execution."""
171 class TemplatedTestRunner(TestRunner, ABC):
172 """A TestRunner that has a recipe for executing the tests."""
175 def identify_tests(self) -> List[TestToRun]:
176 """Return a list of tuples (test, cmdline) that should be executed."""
180 def run_test(self, test: TestToRun) -> TestResults:
181 """Run a single test and return its TestResults."""
184 def check_for_abort(self) -> bool:
185 """Periodically caled to check to see if we need to stop."""
187 if self.params.halt_event.is_set():
188 logger.debug('Thread %s saw halt event; exiting.', self.get_name())
191 if self.params.halt_on_error and len(self.test_results.tests_failed) > 0:
192 logger.error('Thread %s saw abnormal results; exiting.', self.get_name())
196 def persist_output(self, test: TestToRun, message: str, output: str) -> None:
197 """Called to save the output of a test run."""
199 dest = f'{test.name}-output.txt'
200 with open(f'./test_output/{dest}', 'w') as wf:
201 print(message, file=wf)
202 print('-' * len(message), file=wf)
205 def execute_commandline(
209 timeout: float = 120.0,
211 """Execute a particular commandline to run a test."""
214 output = exec_utils.cmd(
216 timeout_seconds=timeout,
218 if "***Test Failed***" in output:
219 msg = f'{self.get_name()}: {test.name} ({test.cmdline}) failed; doctest failure message detected'
221 self.persist_output(test, msg, output)
231 test, f'{test.name} ({test.cmdline}) succeeded.', output
234 '%s: %s (%s) succeeded', self.get_name(), test.name, test.cmdline
236 return TestResults(test.name, {}, [test.name], [], [])
237 except subprocess.TimeoutExpired as e:
238 msg = f'{self.get_name()}: {test.name} ({test.cmdline}) timed out after {e.timeout:.1f} seconds.'
241 '%s: %s output when it timed out: %s',
246 self.persist_output(test, msg, e.output.decode('utf-8'))
254 except subprocess.CalledProcessError as e:
255 msg = f'{self.get_name()}: {test.name} ({test.cmdline}) failed; exit code {e.returncode}'
258 '%s: %s output when it failed: %s', self.get_name(), test.name, e.output
260 self.persist_output(test, msg, e.output.decode('utf-8'))
270 def begin(self, params: TestingParameters) -> TestResults:
271 logger.debug('Thread %s started.', self.get_name())
272 interesting_tests = self.identify_tests()
274 '%s: Identified %d tests to be run.',
276 len(interesting_tests),
279 # Note: because of @parallelize on run_tests it actually
280 # returns a SmartFuture with a TestResult inside of it.
281 # That's the reason for this Any business.
282 running: List[Any] = []
283 for test_to_run in interesting_tests:
284 running.append(self.run_test(test_to_run))
286 '%s: Test %s started in the background.',
290 self.test_results.tests_executed[test_to_run.name] = time.time()
292 for future in smart_future.wait_any(running, log_exceptions=False):
293 result = deferred_operand.DeferredOperand.resolve(future)
294 logger.debug('Test %s finished.', result.name)
296 # We sometimes run the same test more than once. Do not allow
297 # one run's results to klobber the other's.
298 self.test_results += result
299 if self.check_for_abort():
301 '%s: check_for_abort told us to exit early.', self.get_name()
303 return self.test_results
305 logger.debug('Thread %s finished running all tests', self.get_name())
306 return self.test_results
309 class UnittestTestRunner(TemplatedTestRunner):
310 """Run all known Unittests."""
313 def get_name(self) -> str:
317 def identify_tests(self) -> List[TestToRun]:
319 for test in file_utils.get_matching_files_recursive(ROOT, '*_test.py'):
320 basename = file_utils.without_path(test)
321 if basename in TESTS_TO_SKIP:
323 if config.config['coverage']:
327 kind='unittest capturing coverage',
328 cmdline=f'coverage run --source ../src {test} --unittests_ignore_perf 2>&1',
331 if basename in PERF_SENSATIVE_TESTS:
334 name=f'{basename}_no_coverage',
335 kind='unittest w/o coverage to record perf',
336 cmdline=f'{test} 2>&1',
344 cmdline=f'{test} 2>&1',
350 def run_test(self, test: TestToRun) -> TestResults:
351 return self.execute_commandline(test)
354 class DoctestTestRunner(TemplatedTestRunner):
355 """Run all known Doctests."""
358 def get_name(self) -> str:
362 def identify_tests(self) -> List[TestToRun]:
364 out = exec_utils.cmd(f'grep -lR "^ *import doctest" {ROOT}/*')
365 for test in out.split('\n'):
366 if re.match(r'.*\.py$', test):
367 basename = file_utils.without_path(test)
368 if basename in TESTS_TO_SKIP:
370 if config.config['coverage']:
374 kind='doctest capturing coverage',
375 cmdline=f'coverage run --source ../src {test} 2>&1',
378 if basename in PERF_SENSATIVE_TESTS:
381 name=f'{basename}_no_coverage',
382 kind='doctest w/o coverage to record perf',
383 cmdline=f'python3 {test} 2>&1',
391 cmdline=f'python3 {test} 2>&1',
397 def run_test(self, test: TestToRun) -> TestResults:
398 return self.execute_commandline(test)
401 class IntegrationTestRunner(TemplatedTestRunner):
402 """Run all know Integration tests."""
405 def get_name(self) -> str:
406 return "Integration Tests"
409 def identify_tests(self) -> List[TestToRun]:
411 for test in file_utils.get_matching_files_recursive(ROOT, '*_itest.py'):
412 basename = file_utils.without_path(test)
413 if basename in TESTS_TO_SKIP:
415 if config.config['coverage']:
419 kind='integration test capturing coverage',
420 cmdline=f'coverage run --source ../src {test} 2>&1',
423 if basename in PERF_SENSATIVE_TESTS:
426 name=f'{basename}_no_coverage',
427 kind='integration test w/o coverage to capture perf',
428 cmdline=f'{test} 2>&1',
434 name=basename, kind='integration test', cmdline=f'{test} 2>&1'
440 def run_test(self, test: TestToRun) -> TestResults:
441 return self.execute_commandline(test)
444 def test_results_report(results: Dict[str, Optional[TestResults]]) -> int:
445 """Give a final report about the tests that were run."""
447 for result in results.values():
449 print('Unexpected unhandled exception in test runner!!!')
452 print(result, end='')
453 total_problems += len(result.tests_failed)
454 total_problems += len(result.tests_timed_out)
456 if total_problems > 0:
457 print('Reminder: look in ./test_output to view test output logs')
458 return total_problems
461 def code_coverage_report():
462 """Give a final code coverage report."""
463 text_utils.header('Code Coverage')
464 exec_utils.cmd('coverage combine .coverage*')
465 out = exec_utils.cmd(
466 'coverage report --omit=config-3.*.py,*_test.py,*_itest.py --sort=-cover'
470 """To recall this report w/o re-running the tests:
472 $ coverage report --omit=config-3.*.py,*_test.py,*_itest.py --sort=-cover
474 ...from the 'tests' directory. Note that subsequent calls to
475 run_tests.py with --coverage will klobber previous results. See:
477 https://coverage.readthedocs.io/en/6.2/
482 @bootstrap.initialize
483 def main() -> Optional[int]:
485 halt_event = threading.Event()
486 threads: List[TestRunner] = []
489 params = TestingParameters(
491 halt_event=halt_event,
494 if config.config['coverage']:
495 logger.debug('Clearing existing coverage data via "coverage erase".')
496 exec_utils.cmd('coverage erase')
498 if config.config['unittests'] or config.config['all']:
500 threads.append(UnittestTestRunner(params))
501 if config.config['doctests'] or config.config['all']:
503 threads.append(DoctestTestRunner(params))
504 if config.config['integration'] or config.config['all']:
506 threads.append(IntegrationTestRunner(params))
510 print('ERROR: one of --unittests, --doctests or --integration is required.')
513 for thread in threads:
516 results: Dict[str, Optional[TestResults]] = {}
517 start_time = time.time()
518 last_update = start_time
521 while len(results) != len(threads):
526 for thread in threads:
528 tr = thread.get_status()
529 started += len(tr.tests_executed)
530 failed += len(tr.tests_failed) + len(tr.tests_timed_out)
531 done += failed + len(tr.tests_succeeded)
532 running = set(tr.tests_executed.keys())
533 running -= set(tr.tests_failed)
534 running -= set(tr.tests_succeeded)
535 running -= set(tr.tests_timed_out)
536 running_with_start_time = {
537 test: tr.tests_executed[test] for test in running
539 still_running[tid] = running_with_start_time
542 if now - start_time > 5.0:
543 if now - last_update > 3.0:
546 for _, running_dict in still_running.items():
547 for test_name, start_time in running_dict.items():
548 if now - start_time > 10.0:
549 update.append(f'{test_name}@{now-start_time:.1f}s')
551 update.append(test_name)
552 print(f'\r{ansi.clear_line()}')
554 print(f'Still running: {",".join(update)}')
556 print(f'Still running: {len(update)} tests.')
558 if not thread.is_alive():
559 if tid not in results:
560 result = thread.join()
562 results[tid] = result
563 if len(result.tests_failed) > 0:
565 'Thread %s returned abnormal results; killing the others.',
571 'Thread %s took an unhandled exception... bug in run_tests.py?! Aborting.',
578 color = ansi.fg('green')
580 color = ansi.fg('red')
583 percent_done = done / started * 100.0
587 if percent_done < 100.0:
589 text_utils.bar_graph_string(
592 text=text_utils.BarGraphText.FRACTION,
601 print(f'{ansi.clear_line()}Final Report:')
602 if config.config['coverage']:
603 code_coverage_report()
604 total_problems = test_results_report(results)
605 if total_problems > 0:
607 'Exiting with non-zero return code %d due to problems.', total_problems
609 return total_problems
612 if __name__ == '__main__':