A smart, fast test runner. Used in a git pre-commit hook.
"""
+from __future__ import annotations
+
import logging
import os
import re
__radd__ = __add__
+ @staticmethod
+ def empty_test_results(suite_name: str) -> TestResults:
+ return TestResults(
+ name=suite_name,
+ tests_executed={},
+ tests_succeeded=[],
+ tests_failed=[],
+ tests_timed_out=[],
+ )
+
+ @staticmethod
+ def single_test_succeeded(name: str) -> TestResults:
+ return TestResults(name, {}, [name], [], [])
+
+ @staticmethod
+ def single_test_failed(name: str) -> TestResults:
+ return TestResults(
+ name,
+ {},
+ [],
+ [name],
+ [],
+ )
+
+ @staticmethod
+ def single_test_timed_out(name: str) -> TestResults:
+ return TestResults(
+ name,
+ {},
+ [],
+ [],
+ [name],
+ )
+
def __repr__(self) -> str:
out = f'{self.name}: '
out += f'{ansi.fg("green")}'
"""
super().__init__(self, target=self.begin, args=[params])
self.params = params
- self.test_results = TestResults(
- name=self.get_name(),
- tests_executed={},
- tests_succeeded=[],
- tests_failed=[],
- tests_timed_out=[],
- )
+ self.test_results = TestResults.empty_test_results(self.get_name())
self.lock = threading.Lock()
@abstractmethod
pass
def check_for_abort(self) -> bool:
- """Periodically caled to check to see if we need to stop."""
+ """Periodically called to check to see if we need to stop."""
if self.params.halt_event.is_set():
logger.debug('Thread %s saw halt event; exiting.', self.get_name())
) -> TestResults:
"""Execute a particular commandline to run a test."""
+ msg = f'{self.get_name()}: {test.name} ({test.cmdline}) '
try:
output = exec_utils.cmd(
test.cmdline,
timeout_seconds=timeout,
)
if "***Test Failed***" in output:
- msg = f'{self.get_name()}: {test.name} ({test.cmdline}) failed; doctest failure message detected'
+ msg += 'failed; doctest failure message detected.'
logger.error(msg)
self.persist_output(test, msg, output)
- return TestResults(
- test.name,
- {},
- [],
- [test.name],
- [],
- )
+ return TestResults.single_test_failed(test.name)
+
+ msg += 'succeeded.'
+ self.persist_output(test, msg, output)
+ logger.debug(msg)
+ return TestResults.single_test_succeeded(test.name)
- self.persist_output(
- test, f'{test.name} ({test.cmdline}) succeeded.', output
- )
- logger.debug(
- '%s: %s (%s) succeeded', self.get_name(), test.name, test.cmdline
- )
- return TestResults(test.name, {}, [test.name], [], [])
except subprocess.TimeoutExpired as e:
- msg = f'{self.get_name()}: {test.name} ({test.cmdline}) timed out after {e.timeout:.1f} seconds.'
+ msg += f'timed out after {e.timeout:.1f} seconds.'
logger.error(msg)
logger.debug(
'%s: %s output when it timed out: %s',
e.output,
)
self.persist_output(test, msg, e.output.decode('utf-8'))
- return TestResults(
- test.name,
- {},
- [],
- [],
- [test.name],
- )
+ return TestResults.single_test_timed_out(test.name)
+
except subprocess.CalledProcessError as e:
- msg = f'{self.get_name()}: {test.name} ({test.cmdline}) failed; exit code {e.returncode}'
+ msg += f'failed with exit code {e.returncode}.'
logger.error(msg)
logger.debug(
'%s: %s output when it failed: %s', self.get_name(), test.name, e.output
)
self.persist_output(test, msg, e.output.decode('utf-8'))
- return TestResults(
- test.name,
- {},
- [],
- [test.name],
- [],
- )
+ return TestResults.single_test_failed(test.name)
@overrides
def begin(self, params: TestingParameters) -> TestResults:
for result in smart_future.wait_any(running, log_exceptions=False):
logger.debug('Test %s finished.', result.name)
-
- # We sometimes run the same test more than once. Do not allow
- # one run's results to klobber the other's.
self.test_results += result
+
if self.check_for_abort():
logger.debug(
'%s: check_for_abort told us to exit early.', self.get_name()
)
print(out)
print(
- """To recall this report w/o re-running the tests:
+ f"""To recall this report w/o re-running the tests:
- $ coverage report --omit=config-3.*.py,*_test.py,*_itest.py --sort=-cover
+ $ {ansi.bold()}coverage report --omit=config-3.*.py,*_test.py,*_itest.py --sort=-cover{ansi.reset()}
...from the 'tests' directory. Note that subsequent calls to
run_tests.py with --coverage will klobber previous results. See:
@bootstrap.initialize
def main() -> Optional[int]:
saw_flag = False
- halt_event = threading.Event()
threads: List[TestRunner] = []
+ halt_event = threading.Event()
halt_event.clear()
params = TestingParameters(
halt_on_error=True,
if config.config['coverage']:
logger.debug('Clearing existing coverage data via "coverage erase".')
exec_utils.cmd('coverage erase')
-
if config.config['unittests'] or config.config['all']:
saw_flag = True
threads.append(UnittestTestRunner(params))
if not saw_flag:
config.print_usage()
- print('ERROR: one of --unittests, --doctests or --integration is required.')
- return 1
+ config.error('One of --unittests, --doctests or --integration is required.', 1)
for thread in threads:
thread.start()
- results: Dict[str, Optional[TestResults]] = {}
start_time = time.time()
last_update = start_time
+ results: Dict[str, Optional[TestResults]] = {}
still_running = {}
while len(results) != len(threads):
}
still_running[tid] = running_with_start_time
+ # Maybe print tests that are still running.
now = time.time()
if now - start_time > 5.0:
if now - last_update > 3.0:
else:
print(f'Still running: {len(update)} tests.')
+ # Maybe signal the other threads to stop too.
if not thread.is_alive():
if tid not in results:
result = thread.join()
if result:
results[tid] = result
- if len(result.tests_failed) > 0:
+ if (len(result.tests_failed) + len(result.tests_timed_out)) > 0:
logger.error(
'Thread %s returned abnormal results; killing the others.',
tid,
halt_event.set()
results[tid] = None
- if failed == 0:
- color = ansi.fg('green')
- else:
+ color = ansi.fg('green')
+ if failed > 0:
color = ansi.fg('red')
if started > 0:
print(f' {color}{now - start_time:.1f}s{ansi.reset()}', end='\r')
time.sleep(0.1)
- print(f'{ansi.clear_line()}{ansi.underline()}Final Report:{ansi.reset()}')
+ print(f'{ansi.clear_line()}\n{ansi.underline()}Final Report:{ansi.reset()}')
if config.config['coverage']:
code_coverage_report()
print(f'Test suite runtime: {time.time() - start_time:.1f}s')