#!/usr/bin/env python3
"""
-A smart, fast test runner.
+A smart, fast test runner. Used in a git pre-commit hook.
"""
import logging
from overrides import overrides
+import ansi
import bootstrap
import config
import exec_utils
@dataclass
class TestingParameters:
halt_on_error: bool
+ """Should we stop as soon as one error has occurred?"""
+
halt_event: threading.Event
+ """An event that, when set, indicates to stop ASAP."""
@dataclass
class TestResults:
name: str
- num_tests_executed: int
- num_tests_succeeded: int
- num_tests_failed: int
- normal_exit: bool
- output: str
+ """The name of this test / set of tests."""
+
+ tests_executed: List[str]
+ """Tests that were executed."""
+
+ tests_succeeded: List[str]
+ """Tests that succeeded."""
+
+ tests_failed: List[str]
+ """Tests that failed."""
+
+ tests_timed_out: List[str]
+ """Tests that timed out."""
+
+ def __add__(self, other):
+ self.tests_executed.extend(other.tests_executed)
+ self.tests_succeeded.extend(other.tests_succeeded)
+ self.tests_failed.extend(other.tests_failed)
+ self.tests_timed_out.extend(other.tests_timed_out)
+ return self
+
+ __radd__ = __add__
+
+ def __repr__(self) -> str:
+ out = f'{self.name}: '
+ out += f'{ansi.fg("green")}'
+ out += f'{len(self.tests_succeeded)}/{len(self.tests_executed)} passed'
+ out += f'{ansi.reset()}.\n'
+
+ if len(self.tests_failed) > 0:
+ out += f' ..{ansi.fg("red")}'
+ out += f'{len(self.tests_failed)} tests failed'
+ out += f'{ansi.reset()}:\n'
+ for test in self.tests_failed:
+ out += f' {test}\n'
+ out += '\n'
+
+ if len(self.tests_timed_out) > 0:
+ out += f' ..{ansi.fg("yellow")}'
+ out += f'{len(self.tests_timed_out)} tests timed out'
+ out += f'{ansi.reset()}:\n'
+ for test in self.tests_failed:
+ out += f' {test}\n'
+ out += '\n'
+ return out
class TestRunner(ABC, thread_utils.ThreadWithReturnValue):
+ """A Base class for something that runs a test."""
+
def __init__(self, params: TestingParameters):
+ """Create a TestRunner.
+
+ Args:
+ params: Test running paramters.
+
+ """
super().__init__(self, target=self.begin, args=[params])
self.params = params
self.test_results = TestResults(
- name=f"All {self.get_name()} tests",
- num_tests_executed=0,
- num_tests_succeeded=0,
- num_tests_failed=0,
- normal_exit=True,
- output="",
+ name=self.get_name(),
+ tests_executed=[],
+ tests_succeeded=[],
+ tests_failed=[],
+ tests_timed_out=[],
)
- def aggregate_test_results(self, result: TestResults):
- self.test_results.num_tests_executed += result.num_tests_executed
- self.test_results.num_tests_succeeded += result.num_tests_succeeded
- self.test_results.num_tests_failed += result.num_tests_failed
- self.test_results.normal_exit = self.test_results.normal_exit and result.normal_exit
- self.test_results.output += "\n\n\n" + result.output
-
@abstractmethod
def get_name(self) -> str:
+ """The name of this test collection."""
pass
@abstractmethod
def begin(self, params: TestingParameters) -> TestResults:
+ """Start execution."""
pass
class TemplatedTestRunner(TestRunner, ABC):
+ """A TestRunner that has a recipe for executing the tests."""
+
@abstractmethod
- def identify_tests(self) -> List[Any]:
+ def identify_tests(self) -> List[str]:
+ """Return a list of tests that should be executed."""
pass
@abstractmethod
def run_test(self, test: Any) -> TestResults:
+ """Run a single test and return its TestResults."""
pass
def check_for_abort(self):
+ """Periodically caled to check to see if we need to stop."""
+
if self.params.halt_event.is_set():
logger.debug('Thread %s saw halt event; exiting.', self.get_name())
raise Exception("Kill myself!")
- if not self.test_results.normal_exit:
- if self.params.halt_on_error:
+ if self.params.halt_on_error:
+ if len(self.test_results.tests_failed) > 0:
logger.error('Thread %s saw abnormal results; exiting.', self.get_name())
raise Exception("Kill myself!")
def status_report(self, running: List[Any], done: List[Any]):
+ """Periodically called to report current status."""
+
total = len(running) + len(done)
logging.info(
'%s: %d/%d in flight; %d/%d completed.',
total,
)
+ def persist_output(self, test_name: str, message: str, output: str) -> None:
+ """Called to save the output of a test run."""
+
+ basename = file_utils.without_path(test_name)
+ dest = f'{basename}-output.txt'
+ with open(f'./test_output/{dest}', 'w') as wf:
+ print(message, file=wf)
+ print('-' * len(message), file=wf)
+ wf.write(output)
+
+ def execute_commandline(
+ self,
+ test_name: str,
+ cmdline: str,
+ *,
+ timeout: float = 120.0,
+ ) -> TestResults:
+ """Execute a particular commandline to run a test."""
+
+ try:
+ logger.debug('%s: Running %s (%s)', self.get_name(), test_name, cmdline)
+ output = exec_utils.cmd(
+ cmdline,
+ timeout_seconds=timeout,
+ )
+ self.persist_output(test_name, f'{test_name} ({cmdline}) succeeded.', output)
+ logger.debug('%s (%s) succeeded', test_name, cmdline)
+ return TestResults(test_name, [test_name], [test_name], [], [])
+ except subprocess.TimeoutExpired as e:
+ msg = f'{self.get_name()}: {test_name} ({cmdline}) timed out after {e.timeout:.1f} seconds.'
+ logger.error(msg)
+ logger.debug(
+ '%s: %s output when it timed out: %s', self.get_name(), test_name, e.output
+ )
+ self.persist_output(test_name, msg, e.output)
+ return TestResults(
+ test_name,
+ [test_name],
+ [],
+ [],
+ [test_name],
+ )
+ except subprocess.CalledProcessError as e:
+ msg = f'{self.get_name()}: {test_name} ({cmdline}) failed; exit code {e.returncode}'
+ logger.error(msg)
+ logger.debug('%s: %s output when it failed: %s', self.get_name(), test_name, e.output)
+ self.persist_output(test_name, msg, e.output)
+ return TestResults(
+ test_name,
+ [test_name],
+ [],
+ [test_name],
+ [],
+ )
+
@overrides
def begin(self, params: TestingParameters) -> TestResults:
logger.debug('Thread %s started.', self.get_name())
newly_finished.append(fut)
result = fut._resolve()
logger.debug('Test %s finished.', result.name)
- self.aggregate_test_results(result)
+ self.test_results += result
for fut in newly_finished:
running.remove(fut)
done.append(fut)
- time.sleep(0.25)
+ time.sleep(1.0)
logger.debug('Thread %s finished.', self.get_name())
return self.test_results
class UnittestTestRunner(TemplatedTestRunner):
+ """Run all known Unittests."""
+
@overrides
def get_name(self) -> str:
return "UnittestTestRunner"
@overrides
- def identify_tests(self) -> List[Any]:
+ def identify_tests(self) -> List[str]:
return list(file_utils.expand_globs('*_test.py'))
@par.parallelize
def run_test(self, test: Any) -> TestResults:
if config.config['coverage']:
- cmdline = f'coverage run --source {HOME}/lib --append {test} --unittests_ignore_perf'
+ cmdline = f'coverage run --source {HOME}/lib {test} --unittests_ignore_perf'
else:
cmdline = test
-
- try:
- logger.debug('Running unittest %s (%s)', test, cmdline)
- output = exec_utils.cmd(
- cmdline,
- timeout_seconds=120.0,
- )
- except TimeoutError:
- logger.error('Unittest %s timed out; ran for > 120.0 seconds', test)
- return TestResults(
- test,
- 1,
- 0,
- 1,
- False,
- f"Unittest {test} timed out.",
- )
- except subprocess.CalledProcessError:
- logger.error('Unittest %s failed.', test)
- return TestResults(
- test,
- 1,
- 0,
- 1,
- False,
- f"Unittest {test} failed.",
- )
- return TestResults(test, 1, 1, 0, True, output)
+ return self.execute_commandline(test, cmdline)
class DoctestTestRunner(TemplatedTestRunner):
+ """Run all known Doctests."""
+
@overrides
def get_name(self) -> str:
return "DoctestTestRunner"
@overrides
- def identify_tests(self) -> List[Any]:
+ def identify_tests(self) -> List[str]:
ret = []
out = exec_utils.cmd('grep -lR "^ *import doctest" /home/scott/lib/python_modules/*')
for line in out.split('\n'):
@par.parallelize
def run_test(self, test: Any) -> TestResults:
if config.config['coverage']:
- cmdline = f'coverage run --source {HOME}/lib --append {test} 2>&1'
+ cmdline = f'coverage run --source {HOME}/lib {test} 2>&1'
else:
cmdline = f'python3 {test}'
- try:
- logger.debug('Running doctest %s (%s).', test, cmdline)
- output = exec_utils.cmd(
- cmdline,
- timeout_seconds=120.0,
- )
- except TimeoutError:
- logger.error('Doctest %s timed out; ran for > 120.0 seconds', test)
- return TestResults(
- test,
- 1,
- 0,
- 1,
- False,
- f"Doctest {test} timed out.",
- )
- except subprocess.CalledProcessError:
- logger.error('Doctest %s failed.', test)
- return TestResults(
- test,
- 1,
- 0,
- 1,
- False,
- f"Docttest {test} failed.",
- )
- return TestResults(
- test,
- 1,
- 1,
- 0,
- True,
- "",
- )
+ return self.execute_commandline(test, cmdline)
class IntegrationTestRunner(TemplatedTestRunner):
+ """Run all know Integration tests."""
+
@overrides
def get_name(self) -> str:
return "IntegrationTestRunner"
@overrides
- def identify_tests(self) -> List[Any]:
+ def identify_tests(self) -> List[str]:
return list(file_utils.expand_globs('*_itest.py'))
@par.parallelize
def run_test(self, test: Any) -> TestResults:
if config.config['coverage']:
- cmdline = f'coverage run --source {HOME}/lib --append {test}'
+ cmdline = f'coverage run --source {HOME}/lib {test}'
else:
cmdline = test
- try:
- logger.debug('Running integration test %s (%s).', test, cmdline)
- output = exec_utils.cmd(
- cmdline,
- timeout_seconds=240.0,
- )
- except TimeoutError:
- logger.error('Integration Test %s timed out; ran for > 240.0 seconds', test)
- return TestResults(
- test,
- 1,
- 0,
- 1,
- False,
- f"Integration Test {test} timed out.",
- )
- except subprocess.CalledProcessError:
- logger.error('Integration Test %s failed.', test)
- return TestResults(
- test,
- 1,
- 0,
- 1,
- False,
- f"Integration Test {test} failed.",
- )
- return TestResults(
- test,
- 1,
- 1,
- 0,
- True,
- "",
- )
+ return self.execute_commandline(test, cmdline)
+
+def test_results_report(results: Dict[str, TestResults]) -> int:
+ """Give a final report about the tests that were run."""
+ total_problems = 0
+ for result in results.values():
+ print(result, end='')
+ total_problems += len(result.tests_failed)
+ total_problems += len(result.tests_timed_out)
-def test_results_report(results: Dict[str, TestResults]):
- print(results)
+ if total_problems > 0:
+ print('Reminder: look in ./test_output to view test output logs')
+ return total_problems
def code_coverage_report():
+ """Give a final code coverage report."""
text_utils.header('Code Coverage')
+ exec_utils.cmd('coverage combine .coverage*')
out = exec_utils.cmd('coverage report --omit=config-3.8.py,*_test.py,*_itest.py --sort=-cover')
print(out)
print(
run_tests.py with --coverage will klobber previous results. See:
https://coverage.readthedocs.io/en/6.2/
-
"""
)
result = thread.join()
if result:
results[tid] = result
- if not result.normal_exit:
+ if len(result.tests_failed) > 0:
+ logger.error(
+ 'Thread %s returned abnormal results; killing the others.', tid
+ )
halt_event.set()
time.sleep(1.0)
- test_results_report(results)
if config.config['coverage']:
code_coverage_report()
- return 0
+ total_problems = test_results_report(results)
+ return total_problems
if __name__ == '__main__':