4 A smart, fast test runner. Used in a git pre-commit hook.
13 from abc import ABC, abstractmethod
14 from dataclasses import dataclass
15 from typing import Any, Dict, List, Optional, Tuple
17 from overrides import overrides
19 from pyutils import ansi, bootstrap, config, exec_utils, text_utils
20 from pyutils.files import file_utils
21 from pyutils.parallelize import parallelize as par
22 from pyutils.parallelize import smart_future, thread_utils
24 logger = logging.getLogger(__name__)
25 args = config.add_commandline_args(f'({__file__})', f'Args related to {__file__}')
26 args.add_argument('--unittests', '-u', action='store_true', help='Run unittests.')
27 args.add_argument('--doctests', '-d', action='store_true', help='Run doctests.')
29 '--integration', '-i', action='store_true', help='Run integration tests.'
35 help='Run unittests, doctests and integration tests. Equivalient to -u -d -i',
41 help='Run tests and capture code coverage data',
44 HOME = os.environ['HOME']
46 # These tests will be run twice in --coverage mode: once to get code
47 # coverage and then again with not coverage enabeled. This is because
48 # they pay attention to code performance which is adversely affected
50 PERF_SENSATIVE_TESTS = set(['string_utils_test.py'])
51 TESTS_TO_SKIP = set(['zookeeper_test.py', 'run_tests.py'])
57 class TestingParameters:
59 """Should we stop as soon as one error has occurred?"""
61 halt_event: threading.Event
62 """An event that, when set, indicates to stop ASAP."""
68 """The name of the test"""
71 """The kind of the test"""
74 """The command line to execute"""
80 """The name of this test / set of tests."""
82 tests_executed: List[str]
83 """Tests that were executed."""
85 tests_succeeded: List[str]
86 """Tests that succeeded."""
88 tests_failed: List[str]
89 """Tests that failed."""
91 tests_timed_out: List[str]
92 """Tests that timed out."""
94 def __add__(self, other):
95 self.tests_executed.extend(other.tests_executed)
96 self.tests_succeeded.extend(other.tests_succeeded)
97 self.tests_failed.extend(other.tests_failed)
98 self.tests_timed_out.extend(other.tests_timed_out)
103 def __repr__(self) -> str:
104 out = f'{self.name}: '
105 out += f'{ansi.fg("green")}'
106 out += f'{len(self.tests_succeeded)}/{len(self.tests_executed)} passed'
107 out += f'{ansi.reset()}.\n'
109 if len(self.tests_failed) > 0:
110 out += f' ..{ansi.fg("red")}'
111 out += f'{len(self.tests_failed)} tests failed'
112 out += f'{ansi.reset()}:\n'
113 for test in self.tests_failed:
117 if len(self.tests_timed_out) > 0:
118 out += f' ..{ansi.fg("yellow")}'
119 out += f'{len(self.tests_timed_out)} tests timed out'
120 out += f'{ansi.reset()}:\n'
121 for test in self.tests_failed:
127 class TestRunner(ABC, thread_utils.ThreadWithReturnValue):
128 """A Base class for something that runs a test."""
130 def __init__(self, params: TestingParameters):
131 """Create a TestRunner.
134 params: Test running paramters.
137 super().__init__(self, target=self.begin, args=[params])
139 self.test_results = TestResults(
140 name=self.get_name(),
146 self.tests_started = 0
147 self.lock = threading.Lock()
150 def get_name(self) -> str:
151 """The name of this test collection."""
154 def get_status(self) -> Tuple[int, TestResults]:
155 """Ask the TestRunner for its status."""
157 return (self.tests_started, self.test_results)
160 def begin(self, params: TestingParameters) -> TestResults:
161 """Start execution."""
165 class TemplatedTestRunner(TestRunner, ABC):
166 """A TestRunner that has a recipe for executing the tests."""
169 def identify_tests(self) -> List[TestToRun]:
170 """Return a list of tuples (test, cmdline) that should be executed."""
174 def run_test(self, test: TestToRun) -> TestResults:
175 """Run a single test and return its TestResults."""
178 def check_for_abort(self):
179 """Periodically caled to check to see if we need to stop."""
181 if self.params.halt_event.is_set():
182 logger.debug('Thread %s saw halt event; exiting.', self.get_name())
183 raise Exception("Kill myself!")
184 if self.params.halt_on_error:
185 if len(self.test_results.tests_failed) > 0:
187 'Thread %s saw abnormal results; exiting.', self.get_name()
189 raise Exception("Kill myself!")
191 def persist_output(self, test: TestToRun, message: str, output: str) -> None:
192 """Called to save the output of a test run."""
194 dest = f'{test.name}-output.txt'
195 with open(f'./test_output/{dest}', 'w') as wf:
196 print(message, file=wf)
197 print('-' * len(message), file=wf)
200 def execute_commandline(
204 timeout: float = 120.0,
206 """Execute a particular commandline to run a test."""
209 output = exec_utils.cmd(
211 timeout_seconds=timeout,
214 test, f'{test.name} ({test.cmdline}) succeeded.', output
217 '%s: %s (%s) succeeded', self.get_name(), test.name, test.cmdline
219 return TestResults(test.name, [test.name], [test.name], [], [])
220 except subprocess.TimeoutExpired as e:
221 msg = f'{self.get_name()}: {test.name} ({test.cmdline}) timed out after {e.timeout:.1f} seconds.'
224 '%s: %s output when it timed out: %s',
229 self.persist_output(test, msg, e.output.decode('utf-8'))
237 except subprocess.CalledProcessError as e:
238 msg = f'{self.get_name()}: {test.name} ({test.cmdline}) failed; exit code {e.returncode}'
241 '%s: %s output when it failed: %s', self.get_name(), test.name, e.output
243 self.persist_output(test, msg, e.output.decode('utf-8'))
253 def begin(self, params: TestingParameters) -> TestResults:
254 logger.debug('Thread %s started.', self.get_name())
255 interesting_tests = self.identify_tests()
257 '%s: Identified %d tests to be run.',
259 len(interesting_tests),
262 # Note: because of @parallelize on run_tests it actually
263 # returns a SmartFuture with a TestResult inside of it.
264 # That's the reason for this Any business.
265 running: List[Any] = []
266 for test_to_run in interesting_tests:
267 running.append(self.run_test(test_to_run))
269 '%s: Test %s started in the background.',
273 self.tests_started += 1
275 for future in smart_future.wait_any(running):
276 self.check_for_abort()
277 result = future._resolve()
278 logger.debug('Test %s finished.', result.name)
279 self.test_results += result
281 logger.debug('Thread %s finished.', self.get_name())
282 return self.test_results
285 class UnittestTestRunner(TemplatedTestRunner):
286 """Run all known Unittests."""
289 def get_name(self) -> str:
293 def identify_tests(self) -> List[TestToRun]:
295 for test in file_utils.get_matching_files_recursive(ROOT, '*_test.py'):
296 basename = file_utils.without_path(test)
297 if basename in TESTS_TO_SKIP:
299 if config.config['coverage']:
303 kind='unittest capturing coverage',
304 cmdline=f'coverage run --source ../src {test} --unittests_ignore_perf 2>&1',
307 if basename in PERF_SENSATIVE_TESTS:
311 kind='unittest w/o coverage to record perf',
312 cmdline=f'{test} 2>&1',
320 cmdline=f'{test} 2>&1',
326 def run_test(self, test: TestToRun) -> TestResults:
327 return self.execute_commandline(test)
330 class DoctestTestRunner(TemplatedTestRunner):
331 """Run all known Doctests."""
334 def get_name(self) -> str:
338 def identify_tests(self) -> List[TestToRun]:
340 out = exec_utils.cmd(f'grep -lR "^ *import doctest" {ROOT}/*')
341 for test in out.split('\n'):
342 if re.match(r'.*\.py$', test):
343 basename = file_utils.without_path(test)
344 if basename in TESTS_TO_SKIP:
346 if config.config['coverage']:
350 kind='doctest capturing coverage',
351 cmdline=f'coverage run --source ../src {test} 2>&1',
354 if basename in PERF_SENSATIVE_TESTS:
358 kind='doctest w/o coverage to record perf',
359 cmdline=f'python3 {test} 2>&1',
367 cmdline=f'python3 {test} 2>&1',
373 def run_test(self, test: TestToRun) -> TestResults:
374 return self.execute_commandline(test)
377 class IntegrationTestRunner(TemplatedTestRunner):
378 """Run all know Integration tests."""
381 def get_name(self) -> str:
382 return "Integration Tests"
385 def identify_tests(self) -> List[TestToRun]:
387 for test in file_utils.get_matching_files_recursive(ROOT, '*_itest.py'):
388 basename = file_utils.without_path(test)
389 if basename in TESTS_TO_SKIP:
391 if config.config['coverage']:
395 kind='integration test capturing coverage',
396 cmdline=f'coverage run --source ../src {test} 2>&1',
399 if basename in PERF_SENSATIVE_TESTS:
403 kind='integration test w/o coverage to capture perf',
404 cmdline=f'{test} 2>&1',
410 name=basename, kind='integration test', cmdline=f'{test} 2>&1'
416 def run_test(self, test: TestToRun) -> TestResults:
417 return self.execute_commandline(test)
420 def test_results_report(results: Dict[str, TestResults]) -> int:
421 """Give a final report about the tests that were run."""
423 for result in results.values():
424 print(result, end='')
425 total_problems += len(result.tests_failed)
426 total_problems += len(result.tests_timed_out)
428 if total_problems > 0:
429 print('Reminder: look in ./test_output to view test output logs')
430 return total_problems
433 def code_coverage_report():
434 """Give a final code coverage report."""
435 text_utils.header('Code Coverage')
436 exec_utils.cmd('coverage combine .coverage*')
437 out = exec_utils.cmd(
438 'coverage report --omit=config-3.*.py,*_test.py,*_itest.py --sort=-cover'
442 """To recall this report w/o re-running the tests:
444 $ coverage report --omit=config-3.*.py,*_test.py,*_itest.py --sort=-cover
446 ...from the 'tests' directory. Note that subsequent calls to
447 run_tests.py with --coverage will klobber previous results. See:
449 https://coverage.readthedocs.io/en/6.2/
454 @bootstrap.initialize
455 def main() -> Optional[int]:
457 halt_event = threading.Event()
458 threads: List[TestRunner] = []
461 params = TestingParameters(
463 halt_event=halt_event,
466 if config.config['coverage']:
467 logger.debug('Clearing existing coverage data via "coverage erase".')
468 exec_utils.cmd('coverage erase')
470 if config.config['unittests'] or config.config['all']:
472 threads.append(UnittestTestRunner(params))
473 if config.config['doctests'] or config.config['all']:
475 threads.append(DoctestTestRunner(params))
476 if config.config['integration'] or config.config['all']:
478 threads.append(IntegrationTestRunner(params))
482 print('ERROR: one of --unittests, --doctests or --integration is required.')
485 for thread in threads:
488 results: Dict[str, TestResults] = {}
489 while len(results) != len(threads):
494 for thread in threads:
495 (s, tr) = thread.get_status()
497 failed += len(tr.tests_failed) + len(tr.tests_timed_out)
498 done += failed + len(tr.tests_succeeded)
499 if not thread.is_alive():
501 if tid not in results:
502 result = thread.join()
504 results[tid] = result
505 if len(result.tests_failed) > 0:
507 'Thread %s returned abnormal results; killing the others.',
513 percent_done = done / started
518 color = ansi.fg('green')
520 color = ansi.fg('red')
522 if percent_done < 100.0:
524 text_utils.bar_graph_string(
527 text=text_utils.BarGraphText.FRACTION,
536 print(f'{ansi.clear_line()}Final Report:')
537 if config.config['coverage']:
538 code_coverage_report()
539 total_problems = test_results_report(results)
540 return total_problems
543 if __name__ == '__main__':