#!/usr/bin/env python3
"""
-A smart, fast test runner.
+A smart, fast test runner. Used in a git pre-commit hook.
"""
import logging
import time
from abc import ABC, abstractmethod
from dataclasses import dataclass
-from typing import Any, Dict, List, Optional
+from typing import Any, Dict, List, Optional, Tuple
from overrides import overrides
import exec_utils
import file_utils
import parallelize as par
+import smart_future
import text_utils
import thread_utils
logger = logging.getLogger(__name__)
-args = config.add_commandline_args(f'({__file__})', 'Args related to __file__')
+args = config.add_commandline_args(f'({__file__})', f'Args related to {__file__}')
args.add_argument('--unittests', '-u', action='store_true', help='Run unittests.')
args.add_argument('--doctests', '-d', action='store_true', help='Run doctests.')
args.add_argument('--integration', '-i', action='store_true', help='Run integration tests.')
+args.add_argument(
+ '--all',
+ '-a',
+ action='store_true',
+ help='Run unittests, doctests and integration tests. Equivalient to -u -d -i',
+)
args.add_argument(
'--coverage', '-c', action='store_true', help='Run tests and capture code coverage data'
)
HOME = os.environ['HOME']
+# These tests will be run twice in --coverage mode: once to get code
+# coverage and then again with not coverage enabeled. This is because
+# they pay attention to code performance which is adversely affected
+# by coverage.
+PERF_SENSATIVE_TESTS = set(['/home/scott/lib/python_modules/tests/string_utils_test.py'])
+
@dataclass
class TestingParameters:
halt_on_error: bool
+ """Should we stop as soon as one error has occurred?"""
+
halt_event: threading.Event
+ """An event that, when set, indicates to stop ASAP."""
+
+
+@dataclass
+class TestToRun:
+ name: str
+ """The name of the test"""
+
+ kind: str
+ """The kind of the test"""
+
+ cmdline: str
+ """The command line to execute"""
@dataclass
class TestResults:
name: str
+ """The name of this test / set of tests."""
+
tests_executed: List[str]
+ """Tests that were executed."""
+
tests_succeeded: List[str]
+ """Tests that succeeded."""
+
tests_failed: List[str]
+ """Tests that failed."""
+
tests_timed_out: List[str]
+ """Tests that timed out."""
+
+ def __add__(self, other):
+ self.tests_executed.extend(other.tests_executed)
+ self.tests_succeeded.extend(other.tests_succeeded)
+ self.tests_failed.extend(other.tests_failed)
+ self.tests_timed_out.extend(other.tests_timed_out)
+ return self
+
+ __radd__ = __add__
+
+ def __repr__(self) -> str:
+ out = f'{self.name}: '
+ out += f'{ansi.fg("green")}'
+ out += f'{len(self.tests_succeeded)}/{len(self.tests_executed)} passed'
+ out += f'{ansi.reset()}.\n'
+
+ if len(self.tests_failed) > 0:
+ out += f' ..{ansi.fg("red")}'
+ out += f'{len(self.tests_failed)} tests failed'
+ out += f'{ansi.reset()}:\n'
+ for test in self.tests_failed:
+ out += f' {test}\n'
+ out += '\n'
+
+ if len(self.tests_timed_out) > 0:
+ out += f' ..{ansi.fg("yellow")}'
+ out += f'{len(self.tests_timed_out)} tests timed out'
+ out += f'{ansi.reset()}:\n'
+ for test in self.tests_failed:
+ out += f' {test}\n'
+ out += '\n'
+ return out
class TestRunner(ABC, thread_utils.ThreadWithReturnValue):
+ """A Base class for something that runs a test."""
+
def __init__(self, params: TestingParameters):
+ """Create a TestRunner.
+
+ Args:
+ params: Test running paramters.
+
+ """
super().__init__(self, target=self.begin, args=[params])
self.params = params
self.test_results = TestResults(
tests_failed=[],
tests_timed_out=[],
)
-
- def aggregate_test_results(self, result: TestResults):
- self.test_results.tests_executed.extend(result.tests_executed)
- self.test_results.tests_succeeded.extend(result.tests_succeeded)
- self.test_results.tests_failed.extend(result.tests_failed)
- self.test_results.tests_timed_out.extend(result.tests_timed_out)
+ self.tests_started = 0
+ self.lock = threading.Lock()
@abstractmethod
def get_name(self) -> str:
+ """The name of this test collection."""
pass
+ def get_status(self) -> Tuple[int, TestResults]:
+ """Ask the TestRunner for its status."""
+ with self.lock:
+ return (self.tests_started, self.test_results)
+
@abstractmethod
def begin(self, params: TestingParameters) -> TestResults:
+ """Start execution."""
pass
class TemplatedTestRunner(TestRunner, ABC):
+ """A TestRunner that has a recipe for executing the tests."""
+
@abstractmethod
- def identify_tests(self) -> List[Any]:
+ def identify_tests(self) -> List[TestToRun]:
+ """Return a list of tuples (test, cmdline) that should be executed."""
pass
@abstractmethod
- def run_test(self, test: Any) -> TestResults:
+ def run_test(self, test: TestToRun) -> TestResults:
+ """Run a single test and return its TestResults."""
pass
def check_for_abort(self):
+ """Periodically caled to check to see if we need to stop."""
+
if self.params.halt_event.is_set():
logger.debug('Thread %s saw halt event; exiting.', self.get_name())
raise Exception("Kill myself!")
logger.error('Thread %s saw abnormal results; exiting.', self.get_name())
raise Exception("Kill myself!")
- def status_report(self, running: List[Any], done: List[Any]):
- total = len(running) + len(done)
- logging.info(
- '%s: %d/%d in flight; %d/%d completed.',
- self.get_name(),
- len(running),
- total,
- len(done),
- total,
- )
+ def persist_output(self, test: TestToRun, message: str, output: str) -> None:
+ """Called to save the output of a test run."""
- def persist_output(self, test_name: str, message: str, output: str) -> None:
- basename = file_utils.without_path(test_name)
- dest = f'{basename}-output.txt'
+ dest = f'{test.name}-output.txt'
with open(f'./test_output/{dest}', 'w') as wf:
print(message, file=wf)
print('-' * len(message), file=wf)
def execute_commandline(
self,
- test_name: str,
- cmdline: str,
+ test: TestToRun,
*,
timeout: float = 120.0,
) -> TestResults:
+ """Execute a particular commandline to run a test."""
try:
- logger.debug('%s: Running %s (%s)', self.get_name(), test_name, cmdline)
output = exec_utils.cmd(
- cmdline,
+ test.cmdline,
timeout_seconds=timeout,
)
- self.persist_output(test_name, f'{test_name} ({cmdline}) succeeded.', output)
- logger.debug('%s (%s) succeeded', test_name, cmdline)
- return TestResults(test_name, [test_name], [test_name], [], [])
+ self.persist_output(test, f'{test.name} ({test.cmdline}) succeeded.', output)
+ logger.debug('%s: %s (%s) succeeded', self.get_name(), test.name, test.cmdline)
+ return TestResults(test.name, [test.name], [test.name], [], [])
except subprocess.TimeoutExpired as e:
- msg = f'{self.get_name()}: {test_name} ({cmdline}) timed out after {e.timeout:.1f} seconds.'
+ msg = f'{self.get_name()}: {test.name} ({test.cmdline}) timed out after {e.timeout:.1f} seconds.'
logger.error(msg)
logger.debug(
- '%s: %s output when it timed out: %s', self.get_name(), test_name, e.output
+ '%s: %s output when it timed out: %s', self.get_name(), test.name, e.output
)
- self.persist_output(test_name, msg, e.output)
+ self.persist_output(test, msg, e.output.decode('utf-8'))
return TestResults(
- test_name,
- [test_name],
+ test.name,
+ [test.name],
[],
[],
- [test_name],
+ [test.name],
)
except subprocess.CalledProcessError as e:
- msg = f'{self.get_name()}: {test_name} ({cmdline}) failed; exit code {e.returncode}'
+ msg = (
+ f'{self.get_name()}: {test.name} ({test.cmdline}) failed; exit code {e.returncode}'
+ )
logger.error(msg)
- logger.debug('%s: %s output when it failed: %s', self.get_name(), test_name, e.output)
- self.persist_output(test_name, msg, e.output)
+ logger.debug('%s: %s output when it failed: %s', self.get_name(), test.name, e.output)
+ self.persist_output(test, msg, e.output.decode('utf-8'))
return TestResults(
- test_name,
- [test_name],
+ test.name,
+ [test.name],
[],
- [test_name],
+ [test.name],
[],
)
def begin(self, params: TestingParameters) -> TestResults:
logger.debug('Thread %s started.', self.get_name())
interesting_tests = self.identify_tests()
+ logger.debug('%s: Identified %d tests to be run.', self.get_name(), len(interesting_tests))
+
+ # Note: because of @parallelize on run_tests it actually
+ # returns a SmartFuture with a TestResult inside of it.
+ # That's the reason for this Any business.
running: List[Any] = []
- done: List[Any] = []
- for test in interesting_tests:
- running.append(self.run_test(test))
+ for test_to_run in interesting_tests:
+ running.append(self.run_test(test_to_run))
+ logger.debug(
+ '%s: Test %s started in the background.', self.get_name(), test_to_run.name
+ )
+ self.tests_started += 1
- while len(running) > 0:
- self.status_report(running, done)
+ for future in smart_future.wait_any(running):
self.check_for_abort()
- newly_finished = []
- for fut in running:
- if fut.is_ready():
- newly_finished.append(fut)
- result = fut._resolve()
- logger.debug('Test %s finished.', result.name)
- self.aggregate_test_results(result)
-
- for fut in newly_finished:
- running.remove(fut)
- done.append(fut)
- time.sleep(1.0)
+ result = future._resolve()
+ logger.debug('Test %s finished.', result.name)
+ self.test_results += result
logger.debug('Thread %s finished.', self.get_name())
return self.test_results
class UnittestTestRunner(TemplatedTestRunner):
+ """Run all known Unittests."""
+
@overrides
def get_name(self) -> str:
- return "UnittestTestRunner"
+ return "Unittests"
@overrides
- def identify_tests(self) -> List[Any]:
- return list(file_utils.expand_globs('*_test.py'))
+ def identify_tests(self) -> List[TestToRun]:
+ ret = []
+ for test in file_utils.expand_globs('*_test.py'):
+ basename = file_utils.without_path(test)
+ if config.config['coverage']:
+ ret.append(
+ TestToRun(
+ name=basename,
+ kind='unittest capturing coverage',
+ cmdline=f'coverage run --source {HOME}/lib {test} --unittests_ignore_perf 2>&1',
+ )
+ )
+ if test in PERF_SENSATIVE_TESTS:
+ ret.append(
+ TestToRun(
+ name=basename,
+ kind='unittest w/o coverage to record perf',
+ cmdline=f'{test} 2>&1',
+ )
+ )
+ else:
+ ret.append(
+ TestToRun(
+ name=basename,
+ kind='unittest',
+ cmdline=f'{test} 2>&1',
+ )
+ )
+ return ret
@par.parallelize
- def run_test(self, test: Any) -> TestResults:
- if config.config['coverage']:
- cmdline = f'coverage run --source {HOME}/lib {test} --unittests_ignore_perf'
- else:
- cmdline = test
- return self.execute_commandline(test, cmdline)
+ def run_test(self, test: TestToRun) -> TestResults:
+ return self.execute_commandline(test)
class DoctestTestRunner(TemplatedTestRunner):
+ """Run all known Doctests."""
+
@overrides
def get_name(self) -> str:
- return "DoctestTestRunner"
+ return "Doctests"
@overrides
- def identify_tests(self) -> List[Any]:
+ def identify_tests(self) -> List[TestToRun]:
ret = []
out = exec_utils.cmd('grep -lR "^ *import doctest" /home/scott/lib/python_modules/*')
- for line in out.split('\n'):
- if re.match(r'.*\.py$', line):
- if 'run_tests.py' not in line:
- ret.append(line)
+ for test in out.split('\n'):
+ if re.match(r'.*\.py$', test):
+ if 'run_tests.py' not in test:
+ basename = file_utils.without_path(test)
+ if config.config['coverage']:
+ ret.append(
+ TestToRun(
+ name=basename,
+ kind='doctest capturing coverage',
+ cmdline=f'coverage run --source {HOME}/lib {test} 2>&1',
+ )
+ )
+ if test in PERF_SENSATIVE_TESTS:
+ ret.append(
+ TestToRun(
+ name=basename,
+ kind='doctest w/o coverage to record perf',
+ cmdline=f'python3 {test} 2>&1',
+ )
+ )
+ else:
+ ret.append(
+ TestToRun(name=basename, kind='doctest', cmdline=f'python3 {test} 2>&1')
+ )
return ret
@par.parallelize
- def run_test(self, test: Any) -> TestResults:
- if config.config['coverage']:
- cmdline = f'coverage run --source {HOME}/lib {test} 2>&1'
- else:
- cmdline = f'python3 {test}'
- return self.execute_commandline(test, cmdline)
+ def run_test(self, test: TestToRun) -> TestResults:
+ return self.execute_commandline(test)
class IntegrationTestRunner(TemplatedTestRunner):
+ """Run all know Integration tests."""
+
@overrides
def get_name(self) -> str:
- return "IntegrationTestRunner"
+ return "Integration Tests"
@overrides
- def identify_tests(self) -> List[Any]:
- return list(file_utils.expand_globs('*_itest.py'))
+ def identify_tests(self) -> List[TestToRun]:
+ ret = []
+ for test in file_utils.expand_globs('*_itest.py'):
+ basename = file_utils.without_path(test)
+ if config.config['coverage']:
+ ret.append(
+ TestToRun(
+ name=basename,
+ kind='integration test capturing coverage',
+ cmdline=f'coverage run --source {HOME}/lib {test} 2>&1',
+ )
+ )
+ if test in PERF_SENSATIVE_TESTS:
+ ret.append(
+ TestToRun(
+ name=basename,
+ kind='integration test w/o coverage to capture perf',
+ cmdline=f'{test} 2>&1',
+ )
+ )
+ else:
+ ret.append(
+ TestToRun(name=basename, kind='integration test', cmdline=f'{test} 2>&1')
+ )
+ return ret
@par.parallelize
- def run_test(self, test: Any) -> TestResults:
- if config.config['coverage']:
- cmdline = f'coverage run --source {HOME}/lib {test}'
- else:
- cmdline = test
- return self.execute_commandline(test, cmdline)
+ def run_test(self, test: TestToRun) -> TestResults:
+ return self.execute_commandline(test)
def test_results_report(results: Dict[str, TestResults]) -> int:
+ """Give a final report about the tests that were run."""
total_problems = 0
- for type, result in results.items():
- print(f'{result.name}: ', end='')
- print(
- f'{ansi.fg("green")}{len(result.tests_succeeded)}/{len(result.tests_executed)} passed{ansi.reset()}.'
- )
- if len(result.tests_failed) > 0:
- print(f' ..{ansi.fg("red")}{len(result.tests_failed)} tests failed{ansi.reset()}:')
- for test in result.tests_failed:
- print(f' {test}')
- total_problems += len(result.tests_failed)
-
- if len(result.tests_timed_out) > 0:
- print(
- f' ..{ansi.fg("yellow")}{len(result.tests_timed_out)} tests timed out{ansi.reset()}:'
- )
- for test in result.tests_failed:
- print(f' {test}')
- total_problems += len(result.tests_timed_out)
+ for result in results.values():
+ print(result, end='')
+ total_problems += len(result.tests_failed)
+ total_problems += len(result.tests_timed_out)
if total_problems > 0:
print('Reminder: look in ./test_output to view test output logs')
def code_coverage_report():
+ """Give a final code coverage report."""
text_utils.header('Code Coverage')
exec_utils.cmd('coverage combine .coverage*')
- out = exec_utils.cmd('coverage report --omit=config-3.8.py,*_test.py,*_itest.py --sort=-cover')
+ out = exec_utils.cmd('coverage report --omit=config-3.*.py,*_test.py,*_itest.py --sort=-cover')
print(out)
print(
- """
-To recall this report w/o re-running the tests:
+ """To recall this report w/o re-running the tests:
- $ coverage report --omit=config-3.8.py,*_test.py,*_itest.py --sort=-cover
+ $ coverage report --omit=config-3.*.py,*_test.py,*_itest.py --sort=-cover
...from the 'tests' directory. Note that subsequent calls to
run_tests.py with --coverage will klobber previous results. See:
logger.debug('Clearing existing coverage data via "coverage erase".')
exec_utils.cmd('coverage erase')
- if config.config['unittests']:
+ if config.config['unittests'] or config.config['all']:
saw_flag = True
threads.append(UnittestTestRunner(params))
- if config.config['doctests']:
+ if config.config['doctests'] or config.config['all']:
saw_flag = True
threads.append(DoctestTestRunner(params))
- if config.config['integration']:
+ if config.config['integration'] or config.config['all']:
saw_flag = True
threads.append(IntegrationTestRunner(params))
results: Dict[str, TestResults] = {}
while len(results) != len(threads):
+ started = 0
+ done = 0
+ failed = 0
+
for thread in threads:
+ (s, tr) = thread.get_status()
+ started += s
+ failed += len(tr.tests_failed) + len(tr.tests_timed_out)
+ done += failed + len(tr.tests_succeeded)
if not thread.is_alive():
tid = thread.name
if tid not in results:
'Thread %s returned abnormal results; killing the others.', tid
)
halt_event.set()
- time.sleep(1.0)
+ if started > 0:
+ percent_done = done / started
+ else:
+ percent_done = 0.0
+
+ if failed == 0:
+ color = ansi.fg('green')
+ else:
+ color = ansi.fg('red')
+
+ if percent_done < 100.0:
+ print(
+ text_utils.bar_graph_string(
+ done,
+ started,
+ text=text_utils.BarGraphText.FRACTION,
+ width=80,
+ fgcolor=color,
+ ),
+ end='\r',
+ flush=True,
+ )
+ time.sleep(0.5)
+
+ print(f'{ansi.clear_line()}Final Report:')
if config.config['coverage']:
code_coverage_report()
total_problems = test_results_report(results)