4 A smart, fast test runner. Used in a git pre-commit hook.
13 from abc import ABC, abstractmethod
14 from dataclasses import dataclass
15 from typing import Any, Dict, List, Optional, Tuple
17 from overrides import overrides
19 from pyutils import ansi, bootstrap, config, exec_utils, text_utils
20 from pyutils.files import file_utils
21 from pyutils.parallelize import parallelize as par
22 from pyutils.parallelize import smart_future, thread_utils
24 logger = logging.getLogger(__name__)
25 args = config.add_commandline_args(f'({__file__})', f'Args related to {__file__}')
26 args.add_argument('--unittests', '-u', action='store_true', help='Run unittests.')
27 args.add_argument('--doctests', '-d', action='store_true', help='Run doctests.')
29 '--integration', '-i', action='store_true', help='Run integration tests.'
35 help='Run unittests, doctests and integration tests. Equivalient to -u -d -i',
41 help='Run tests and capture code coverage data',
44 HOME = os.environ['HOME']
46 # These tests will be run twice in --coverage mode: once to get code
47 # coverage and then again with not coverage enabeled. This is because
48 # they pay attention to code performance which is adversely affected
50 PERF_SENSATIVE_TESTS = set(['string_utils_test.py'])
51 TESTS_TO_SKIP = set(['zookeeper_test.py', 'zookeeper.py', 'run_tests.py'])
57 class TestingParameters:
59 """Should we stop as soon as one error has occurred?"""
61 halt_event: threading.Event
62 """An event that, when set, indicates to stop ASAP."""
68 """The name of the test"""
71 """The kind of the test"""
74 """The command line to execute"""
80 """The name of this test / set of tests."""
82 tests_executed: List[str]
83 """Tests that were executed."""
85 tests_succeeded: List[str]
86 """Tests that succeeded."""
88 tests_failed: List[str]
89 """Tests that failed."""
91 tests_timed_out: List[str]
92 """Tests that timed out."""
94 def __add__(self, other):
95 self.tests_executed.extend(other.tests_executed)
96 self.tests_succeeded.extend(other.tests_succeeded)
97 self.tests_failed.extend(other.tests_failed)
98 self.tests_timed_out.extend(other.tests_timed_out)
103 def __repr__(self) -> str:
104 out = f'{self.name}: '
105 out += f'{ansi.fg("green")}'
106 out += f'{len(self.tests_succeeded)}/{len(self.tests_executed)} passed'
107 out += f'{ansi.reset()}.\n'
109 if len(self.tests_failed) > 0:
110 out += f' ..{ansi.fg("red")}'
111 out += f'{len(self.tests_failed)} tests failed'
112 out += f'{ansi.reset()}:\n'
113 for test in self.tests_failed:
117 if len(self.tests_timed_out) > 0:
118 out += f' ..{ansi.fg("yellow")}'
119 out += f'{len(self.tests_timed_out)} tests timed out'
120 out += f'{ansi.reset()}:\n'
121 for test in self.tests_failed:
127 class TestRunner(ABC, thread_utils.ThreadWithReturnValue):
128 """A Base class for something that runs a test."""
130 def __init__(self, params: TestingParameters):
131 """Create a TestRunner.
134 params: Test running paramters.
137 super().__init__(self, target=self.begin, args=[params])
139 self.test_results = TestResults(
140 name=self.get_name(),
146 self.tests_started = 0
147 self.lock = threading.Lock()
150 def get_name(self) -> str:
151 """The name of this test collection."""
154 def get_status(self) -> Tuple[int, TestResults]:
155 """Ask the TestRunner for its status."""
157 return (self.tests_started, self.test_results)
160 def begin(self, params: TestingParameters) -> TestResults:
161 """Start execution."""
165 class TemplatedTestRunner(TestRunner, ABC):
166 """A TestRunner that has a recipe for executing the tests."""
169 def identify_tests(self) -> List[TestToRun]:
170 """Return a list of tuples (test, cmdline) that should be executed."""
174 def run_test(self, test: TestToRun) -> TestResults:
175 """Run a single test and return its TestResults."""
178 def check_for_abort(self) -> bool:
179 """Periodically caled to check to see if we need to stop."""
181 if self.params.halt_event.is_set():
182 logger.debug('Thread %s saw halt event; exiting.', self.get_name())
185 if self.params.halt_on_error and len(self.test_results.tests_failed) > 0:
186 logger.error('Thread %s saw abnormal results; exiting.', self.get_name())
190 def persist_output(self, test: TestToRun, message: str, output: str) -> None:
191 """Called to save the output of a test run."""
193 dest = f'{test.name}-output.txt'
194 with open(f'./test_output/{dest}', 'w') as wf:
195 print(message, file=wf)
196 print('-' * len(message), file=wf)
199 def execute_commandline(
203 timeout: float = 120.0,
205 """Execute a particular commandline to run a test."""
208 output = exec_utils.cmd(
210 timeout_seconds=timeout,
212 if "***Test Failed***" in output:
213 msg = f'{self.get_name()}: {test.name} ({test.cmdline}) failed; doctest failure message detected'
215 self.persist_output(test, msg, output)
225 test, f'{test.name} ({test.cmdline}) succeeded.', output
228 '%s: %s (%s) succeeded', self.get_name(), test.name, test.cmdline
230 return TestResults(test.name, [test.name], [test.name], [], [])
231 except subprocess.TimeoutExpired as e:
232 msg = f'{self.get_name()}: {test.name} ({test.cmdline}) timed out after {e.timeout:.1f} seconds.'
235 '%s: %s output when it timed out: %s',
240 self.persist_output(test, msg, e.output.decode('utf-8'))
248 except subprocess.CalledProcessError as e:
249 msg = f'{self.get_name()}: {test.name} ({test.cmdline}) failed; exit code {e.returncode}'
252 '%s: %s output when it failed: %s', self.get_name(), test.name, e.output
254 self.persist_output(test, msg, e.output.decode('utf-8'))
264 def begin(self, params: TestingParameters) -> TestResults:
265 logger.debug('Thread %s started.', self.get_name())
266 interesting_tests = self.identify_tests()
268 '%s: Identified %d tests to be run.',
270 len(interesting_tests),
273 # Note: because of @parallelize on run_tests it actually
274 # returns a SmartFuture with a TestResult inside of it.
275 # That's the reason for this Any business.
276 running: List[Any] = []
277 for test_to_run in interesting_tests:
278 running.append(self.run_test(test_to_run))
280 '%s: Test %s started in the background.',
284 self.tests_started += 1
286 for future in smart_future.wait_any(running, log_exceptions=False):
287 result = future._resolve()
288 logger.debug('Test %s finished.', result.name)
289 self.test_results += result
290 if self.check_for_abort():
292 '%s: check_for_abort told us to exit early.', self.get_name()
294 return self.test_results
296 logger.debug('Thread %s finished running all tests', self.get_name())
297 return self.test_results
300 class UnittestTestRunner(TemplatedTestRunner):
301 """Run all known Unittests."""
304 def get_name(self) -> str:
308 def identify_tests(self) -> List[TestToRun]:
310 for test in file_utils.get_matching_files_recursive(ROOT, '*_test.py'):
311 basename = file_utils.without_path(test)
312 if basename in TESTS_TO_SKIP:
314 if config.config['coverage']:
318 kind='unittest capturing coverage',
319 cmdline=f'coverage run --source ../src {test} --unittests_ignore_perf 2>&1',
322 if basename in PERF_SENSATIVE_TESTS:
326 kind='unittest w/o coverage to record perf',
327 cmdline=f'{test} 2>&1',
335 cmdline=f'{test} 2>&1',
341 def run_test(self, test: TestToRun) -> TestResults:
342 return self.execute_commandline(test)
345 class DoctestTestRunner(TemplatedTestRunner):
346 """Run all known Doctests."""
349 def get_name(self) -> str:
353 def identify_tests(self) -> List[TestToRun]:
355 out = exec_utils.cmd(f'grep -lR "^ *import doctest" {ROOT}/*')
356 for test in out.split('\n'):
357 if re.match(r'.*\.py$', test):
358 basename = file_utils.without_path(test)
359 if basename in TESTS_TO_SKIP:
361 if config.config['coverage']:
365 kind='doctest capturing coverage',
366 cmdline=f'coverage run --source ../src {test} 2>&1',
369 if basename in PERF_SENSATIVE_TESTS:
373 kind='doctest w/o coverage to record perf',
374 cmdline=f'python3 {test} 2>&1',
382 cmdline=f'python3 {test} 2>&1',
388 def run_test(self, test: TestToRun) -> TestResults:
389 return self.execute_commandline(test)
392 class IntegrationTestRunner(TemplatedTestRunner):
393 """Run all know Integration tests."""
396 def get_name(self) -> str:
397 return "Integration Tests"
400 def identify_tests(self) -> List[TestToRun]:
402 for test in file_utils.get_matching_files_recursive(ROOT, '*_itest.py'):
403 basename = file_utils.without_path(test)
404 if basename in TESTS_TO_SKIP:
406 if config.config['coverage']:
410 kind='integration test capturing coverage',
411 cmdline=f'coverage run --source ../src {test} 2>&1',
414 if basename in PERF_SENSATIVE_TESTS:
418 kind='integration test w/o coverage to capture perf',
419 cmdline=f'{test} 2>&1',
425 name=basename, kind='integration test', cmdline=f'{test} 2>&1'
431 def run_test(self, test: TestToRun) -> TestResults:
432 return self.execute_commandline(test)
435 def test_results_report(results: Dict[str, Optional[TestResults]]) -> int:
436 """Give a final report about the tests that were run."""
438 for result in results.values():
440 print('Unexpected unhandled exception in test runner!!!')
443 print(result, end='')
444 total_problems += len(result.tests_failed)
445 total_problems += len(result.tests_timed_out)
447 if total_problems > 0:
448 print('Reminder: look in ./test_output to view test output logs')
449 return total_problems
452 def code_coverage_report():
453 """Give a final code coverage report."""
454 text_utils.header('Code Coverage')
455 exec_utils.cmd('coverage combine .coverage*')
456 out = exec_utils.cmd(
457 'coverage report --omit=config-3.*.py,*_test.py,*_itest.py --sort=-cover'
461 """To recall this report w/o re-running the tests:
463 $ coverage report --omit=config-3.*.py,*_test.py,*_itest.py --sort=-cover
465 ...from the 'tests' directory. Note that subsequent calls to
466 run_tests.py with --coverage will klobber previous results. See:
468 https://coverage.readthedocs.io/en/6.2/
473 @bootstrap.initialize
474 def main() -> Optional[int]:
476 halt_event = threading.Event()
477 threads: List[TestRunner] = []
480 params = TestingParameters(
482 halt_event=halt_event,
485 if config.config['coverage']:
486 logger.debug('Clearing existing coverage data via "coverage erase".')
487 exec_utils.cmd('coverage erase')
489 if config.config['unittests'] or config.config['all']:
491 threads.append(UnittestTestRunner(params))
492 if config.config['doctests'] or config.config['all']:
494 threads.append(DoctestTestRunner(params))
495 if config.config['integration'] or config.config['all']:
497 threads.append(IntegrationTestRunner(params))
501 print('ERROR: one of --unittests, --doctests or --integration is required.')
504 for thread in threads:
507 results: Dict[str, Optional[TestResults]] = {}
508 while len(results) != len(threads):
513 for thread in threads:
514 (s, tr) = thread.get_status()
516 failed += len(tr.tests_failed) + len(tr.tests_timed_out)
517 done += failed + len(tr.tests_succeeded)
518 if not thread.is_alive():
520 if tid not in results:
521 result = thread.join()
523 results[tid] = result
524 if len(result.tests_failed) > 0:
526 'Thread %s returned abnormal results; killing the others.',
532 'Thread %s took an unhandled exception... bug in run_tests.py?! Aborting.',
539 percent_done = done / started
544 color = ansi.fg('green')
546 color = ansi.fg('red')
548 if percent_done < 100.0:
550 text_utils.bar_graph_string(
553 text=text_utils.BarGraphText.FRACTION,
562 print(f'{ansi.clear_line()}Final Report:')
563 if config.config['coverage']:
564 code_coverage_report()
565 total_problems = test_results_report(results)
566 return total_problems
569 if __name__ == '__main__':