X-Git-Url: https://wannabe.guru.org/gitweb/?a=blobdiff_plain;f=tests%2Frun_tests.py;h=ecd47dd4347e39e0ac98a492e8f0c327329bfb49;hb=54b5493516c71cd8f5ac65a698c73c2adc629838;hp=2b2d2389f3f0c5a53fd7d160d6e21d406f6934c6;hpb=a0722abe80c416e0c174f3ff861566834402d43b;p=python_utils.git diff --git a/tests/run_tests.py b/tests/run_tests.py index 2b2d238..ecd47dd 100755 --- a/tests/run_tests.py +++ b/tests/run_tests.py @@ -1,7 +1,7 @@ #!/usr/bin/env python3 """ -A smart, fast test runner. +A smart, fast test runner. Used in a git pre-commit hook. """ import logging @@ -12,305 +12,410 @@ import threading import time from abc import ABC, abstractmethod from dataclasses import dataclass -from typing import Any, Dict, List, Optional +from typing import Any, Dict, List, Optional, Tuple from overrides import overrides +import ansi import bootstrap import config import exec_utils import file_utils import parallelize as par +import smart_future import text_utils import thread_utils logger = logging.getLogger(__name__) -args = config.add_commandline_args(f'({__file__})', 'Args related to __file__') +args = config.add_commandline_args(f'({__file__})', f'Args related to {__file__}') args.add_argument('--unittests', '-u', action='store_true', help='Run unittests.') args.add_argument('--doctests', '-d', action='store_true', help='Run doctests.') args.add_argument('--integration', '-i', action='store_true', help='Run integration tests.') +args.add_argument( + '--all', + '-a', + action='store_true', + help='Run unittests, doctests and integration tests. Equivalient to -u -d -i', +) args.add_argument( '--coverage', '-c', action='store_true', help='Run tests and capture code coverage data' ) HOME = os.environ['HOME'] +# These tests will be run twice in --coverage mode: once to get code +# coverage and then again with not coverage enabeled. This is because +# they pay attention to code performance which is adversely affected +# by coverage. +PERF_SENSATIVE_TESTS = set(['/home/scott/lib/python_modules/tests/string_utils_test.py']) + @dataclass class TestingParameters: halt_on_error: bool + """Should we stop as soon as one error has occurred?""" + halt_event: threading.Event + """An event that, when set, indicates to stop ASAP.""" + + +@dataclass +class TestToRun: + name: str + """The name of the test""" + + kind: str + """The kind of the test""" + + cmdline: str + """The command line to execute""" @dataclass class TestResults: name: str - num_tests_executed: int - num_tests_succeeded: int - num_tests_failed: int - normal_exit: bool - output: str + """The name of this test / set of tests.""" + + tests_executed: List[str] + """Tests that were executed.""" + + tests_succeeded: List[str] + """Tests that succeeded.""" + + tests_failed: List[str] + """Tests that failed.""" + + tests_timed_out: List[str] + """Tests that timed out.""" + + def __add__(self, other): + self.tests_executed.extend(other.tests_executed) + self.tests_succeeded.extend(other.tests_succeeded) + self.tests_failed.extend(other.tests_failed) + self.tests_timed_out.extend(other.tests_timed_out) + return self + + __radd__ = __add__ + + def __repr__(self) -> str: + out = f'{self.name}: ' + out += f'{ansi.fg("green")}' + out += f'{len(self.tests_succeeded)}/{len(self.tests_executed)} passed' + out += f'{ansi.reset()}.\n' + + if len(self.tests_failed) > 0: + out += f' ..{ansi.fg("red")}' + out += f'{len(self.tests_failed)} tests failed' + out += f'{ansi.reset()}:\n' + for test in self.tests_failed: + out += f' {test}\n' + out += '\n' + + if len(self.tests_timed_out) > 0: + out += f' ..{ansi.fg("yellow")}' + out += f'{len(self.tests_timed_out)} tests timed out' + out += f'{ansi.reset()}:\n' + for test in self.tests_failed: + out += f' {test}\n' + out += '\n' + return out class TestRunner(ABC, thread_utils.ThreadWithReturnValue): + """A Base class for something that runs a test.""" + def __init__(self, params: TestingParameters): + """Create a TestRunner. + + Args: + params: Test running paramters. + + """ super().__init__(self, target=self.begin, args=[params]) self.params = params self.test_results = TestResults( - name=f"All {self.get_name()} tests", - num_tests_executed=0, - num_tests_succeeded=0, - num_tests_failed=0, - normal_exit=True, - output="", + name=self.get_name(), + tests_executed=[], + tests_succeeded=[], + tests_failed=[], + tests_timed_out=[], ) - - def aggregate_test_results(self, result: TestResults): - self.test_results.num_tests_executed += result.num_tests_executed - self.test_results.num_tests_succeeded += result.num_tests_succeeded - self.test_results.num_tests_failed += result.num_tests_failed - self.test_results.normal_exit = self.test_results.normal_exit and result.normal_exit - self.test_results.output += "\n\n\n" + result.output + self.tests_started = 0 + self.lock = threading.Lock() @abstractmethod def get_name(self) -> str: + """The name of this test collection.""" pass + def get_status(self) -> Tuple[int, TestResults]: + """Ask the TestRunner for its status.""" + with self.lock: + return (self.tests_started, self.test_results) + @abstractmethod def begin(self, params: TestingParameters) -> TestResults: + """Start execution.""" pass class TemplatedTestRunner(TestRunner, ABC): + """A TestRunner that has a recipe for executing the tests.""" + @abstractmethod - def identify_tests(self) -> List[Any]: + def identify_tests(self) -> List[TestToRun]: + """Return a list of tuples (test, cmdline) that should be executed.""" pass @abstractmethod - def run_test(self, test: Any) -> TestResults: + def run_test(self, test: TestToRun) -> TestResults: + """Run a single test and return its TestResults.""" pass def check_for_abort(self): + """Periodically caled to check to see if we need to stop.""" + if self.params.halt_event.is_set(): logger.debug('Thread %s saw halt event; exiting.', self.get_name()) raise Exception("Kill myself!") - if not self.test_results.normal_exit: - if self.params.halt_on_error: + if self.params.halt_on_error: + if len(self.test_results.tests_failed) > 0: logger.error('Thread %s saw abnormal results; exiting.', self.get_name()) raise Exception("Kill myself!") - def status_report(self, running: List[Any], done: List[Any]): - total = len(running) + len(done) - logging.info( - '%s: %d/%d in flight; %d/%d completed.', - self.get_name(), - len(running), - total, - len(done), - total, - ) + def persist_output(self, test: TestToRun, message: str, output: str) -> None: + """Called to save the output of a test run.""" + + dest = f'{test.name}-output.txt' + with open(f'./test_output/{dest}', 'w') as wf: + print(message, file=wf) + print('-' * len(message), file=wf) + wf.write(output) + + def execute_commandline( + self, + test: TestToRun, + *, + timeout: float = 120.0, + ) -> TestResults: + """Execute a particular commandline to run a test.""" + + try: + output = exec_utils.cmd( + test.cmdline, + timeout_seconds=timeout, + ) + self.persist_output(test, f'{test.name} ({test.cmdline}) succeeded.', output) + logger.debug('%s: %s (%s) succeeded', self.get_name(), test.name, test.cmdline) + return TestResults(test.name, [test.name], [test.name], [], []) + except subprocess.TimeoutExpired as e: + msg = f'{self.get_name()}: {test.name} ({test.cmdline}) timed out after {e.timeout:.1f} seconds.' + logger.error(msg) + logger.debug( + '%s: %s output when it timed out: %s', self.get_name(), test.name, e.output + ) + self.persist_output(test, msg, e.output.decode('utf-8')) + return TestResults( + test.name, + [test.name], + [], + [], + [test.name], + ) + except subprocess.CalledProcessError as e: + msg = ( + f'{self.get_name()}: {test.name} ({test.cmdline}) failed; exit code {e.returncode}' + ) + logger.error(msg) + logger.debug('%s: %s output when it failed: %s', self.get_name(), test.name, e.output) + self.persist_output(test, msg, e.output.decode('utf-8')) + return TestResults( + test.name, + [test.name], + [], + [test.name], + [], + ) @overrides def begin(self, params: TestingParameters) -> TestResults: logger.debug('Thread %s started.', self.get_name()) interesting_tests = self.identify_tests() + logger.debug('%s: Identified %d tests to be run.', self.get_name(), len(interesting_tests)) + + # Note: because of @parallelize on run_tests it actually + # returns a SmartFuture with a TestResult inside of it. + # That's the reason for this Any business. running: List[Any] = [] - done: List[Any] = [] - for test in interesting_tests: - running.append(self.run_test(test)) + for test_to_run in interesting_tests: + running.append(self.run_test(test_to_run)) + logger.debug( + '%s: Test %s started in the background.', self.get_name(), test_to_run.name + ) + self.tests_started += 1 - while len(running) > 0: - self.status_report(running, done) + for future in smart_future.wait_any(running): self.check_for_abort() - newly_finished = [] - for fut in running: - if fut.is_ready(): - newly_finished.append(fut) - result = fut._resolve() - logger.debug('Test %s finished.', result.name) - self.aggregate_test_results(result) - - for fut in newly_finished: - running.remove(fut) - done.append(fut) - time.sleep(0.25) + result = future._resolve() + logger.debug('Test %s finished.', result.name) + self.test_results += result logger.debug('Thread %s finished.', self.get_name()) return self.test_results class UnittestTestRunner(TemplatedTestRunner): + """Run all known Unittests.""" + @overrides def get_name(self) -> str: - return "UnittestTestRunner" + return "Unittests" @overrides - def identify_tests(self) -> List[Any]: - return list(file_utils.expand_globs('*_test.py')) + def identify_tests(self) -> List[TestToRun]: + ret = [] + for test in file_utils.expand_globs('*_test.py'): + basename = file_utils.without_path(test) + if config.config['coverage']: + ret.append( + TestToRun( + name=basename, + kind='unittest capturing coverage', + cmdline=f'coverage run --source {HOME}/lib {test} --unittests_ignore_perf 2>&1', + ) + ) + if test in PERF_SENSATIVE_TESTS: + ret.append( + TestToRun( + name=basename, + kind='unittest w/o coverage to record perf', + cmdline=f'{test} 2>&1', + ) + ) + else: + ret.append( + TestToRun( + name=basename, + kind='unittest', + cmdline=f'{test} 2>&1', + ) + ) + return ret @par.parallelize - def run_test(self, test: Any) -> TestResults: - if config.config['coverage']: - cmdline = f'coverage run --source {HOME}/lib --append {test} --unittests_ignore_perf' - else: - cmdline = test - - try: - logger.debug('Running unittest %s (%s)', test, cmdline) - output = exec_utils.cmd( - cmdline, - timeout_seconds=120.0, - ) - except TimeoutError: - logger.error('Unittest %s timed out; ran for > 120.0 seconds', test) - return TestResults( - test, - 1, - 0, - 1, - False, - f"Unittest {test} timed out.", - ) - except subprocess.CalledProcessError: - logger.error('Unittest %s failed.', test) - return TestResults( - test, - 1, - 0, - 1, - False, - f"Unittest {test} failed.", - ) - return TestResults(test, 1, 1, 0, True, output) + def run_test(self, test: TestToRun) -> TestResults: + return self.execute_commandline(test) class DoctestTestRunner(TemplatedTestRunner): + """Run all known Doctests.""" + @overrides def get_name(self) -> str: - return "DoctestTestRunner" + return "Doctests" @overrides - def identify_tests(self) -> List[Any]: + def identify_tests(self) -> List[TestToRun]: ret = [] out = exec_utils.cmd('grep -lR "^ *import doctest" /home/scott/lib/python_modules/*') - for line in out.split('\n'): - if re.match(r'.*\.py$', line): - if 'run_tests.py' not in line: - ret.append(line) + for test in out.split('\n'): + if re.match(r'.*\.py$', test): + if 'run_tests.py' not in test: + basename = file_utils.without_path(test) + if config.config['coverage']: + ret.append( + TestToRun( + name=basename, + kind='doctest capturing coverage', + cmdline=f'coverage run --source {HOME}/lib {test} 2>&1', + ) + ) + if test in PERF_SENSATIVE_TESTS: + ret.append( + TestToRun( + name=basename, + kind='doctest w/o coverage to record perf', + cmdline=f'python3 {test} 2>&1', + ) + ) + else: + ret.append( + TestToRun(name=basename, kind='doctest', cmdline=f'python3 {test} 2>&1') + ) return ret @par.parallelize - def run_test(self, test: Any) -> TestResults: - if config.config['coverage']: - cmdline = f'coverage run --source {HOME}/lib --append {test} 2>&1' - else: - cmdline = f'python3 {test}' - try: - logger.debug('Running doctest %s (%s).', test, cmdline) - output = exec_utils.cmd( - cmdline, - timeout_seconds=120.0, - ) - except TimeoutError: - logger.error('Doctest %s timed out; ran for > 120.0 seconds', test) - return TestResults( - test, - 1, - 0, - 1, - False, - f"Doctest {test} timed out.", - ) - except subprocess.CalledProcessError: - logger.error('Doctest %s failed.', test) - return TestResults( - test, - 1, - 0, - 1, - False, - f"Docttest {test} failed.", - ) - return TestResults( - test, - 1, - 1, - 0, - True, - "", - ) + def run_test(self, test: TestToRun) -> TestResults: + return self.execute_commandline(test) class IntegrationTestRunner(TemplatedTestRunner): + """Run all know Integration tests.""" + @overrides def get_name(self) -> str: - return "IntegrationTestRunner" + return "Integration Tests" @overrides - def identify_tests(self) -> List[Any]: - return list(file_utils.expand_globs('*_itest.py')) + def identify_tests(self) -> List[TestToRun]: + ret = [] + for test in file_utils.expand_globs('*_itest.py'): + basename = file_utils.without_path(test) + if config.config['coverage']: + ret.append( + TestToRun( + name=basename, + kind='integration test capturing coverage', + cmdline=f'coverage run --source {HOME}/lib {test} 2>&1', + ) + ) + if test in PERF_SENSATIVE_TESTS: + ret.append( + TestToRun( + name=basename, + kind='integration test w/o coverage to capture perf', + cmdline=f'{test} 2>&1', + ) + ) + else: + ret.append( + TestToRun(name=basename, kind='integration test', cmdline=f'{test} 2>&1') + ) + return ret @par.parallelize - def run_test(self, test: Any) -> TestResults: - if config.config['coverage']: - cmdline = f'coverage run --source {HOME}/lib --append {test}' - else: - cmdline = test - try: - logger.debug('Running integration test %s (%s).', test, cmdline) - output = exec_utils.cmd( - cmdline, - timeout_seconds=240.0, - ) - except TimeoutError: - logger.error('Integration Test %s timed out; ran for > 240.0 seconds', test) - return TestResults( - test, - 1, - 0, - 1, - False, - f"Integration Test {test} timed out.", - ) - except subprocess.CalledProcessError: - logger.error('Integration Test %s failed.', test) - return TestResults( - test, - 1, - 0, - 1, - False, - f"Integration Test {test} failed.", - ) - return TestResults( - test, - 1, - 1, - 0, - True, - "", - ) + def run_test(self, test: TestToRun) -> TestResults: + return self.execute_commandline(test) + +def test_results_report(results: Dict[str, TestResults]) -> int: + """Give a final report about the tests that were run.""" + total_problems = 0 + for result in results.values(): + print(result, end='') + total_problems += len(result.tests_failed) + total_problems += len(result.tests_timed_out) -def test_results_report(results: Dict[str, TestResults]): - print(results) + if total_problems > 0: + print('Reminder: look in ./test_output to view test output logs') + return total_problems def code_coverage_report(): + """Give a final code coverage report.""" text_utils.header('Code Coverage') - out = exec_utils.cmd('coverage report --omit=config-3.8.py,*_test.py,*_itest.py --sort=-cover') + exec_utils.cmd('coverage combine .coverage*') + out = exec_utils.cmd('coverage report --omit=config-3.*.py,*_test.py,*_itest.py --sort=-cover') print(out) print( - """ -To recall this report w/o re-running the tests: + """To recall this report w/o re-running the tests: - $ coverage report --omit=config-3.8.py,*_test.py,*_itest.py --sort=-cover + $ coverage report --omit=config-3.*.py,*_test.py,*_itest.py --sort=-cover ...from the 'tests' directory. Note that subsequent calls to run_tests.py with --coverage will klobber previous results. See: https://coverage.readthedocs.io/en/6.2/ - """ ) @@ -331,13 +436,13 @@ def main() -> Optional[int]: logger.debug('Clearing existing coverage data via "coverage erase".') exec_utils.cmd('coverage erase') - if config.config['unittests']: + if config.config['unittests'] or config.config['all']: saw_flag = True threads.append(UnittestTestRunner(params)) - if config.config['doctests']: + if config.config['doctests'] or config.config['all']: saw_flag = True threads.append(DoctestTestRunner(params)) - if config.config['integration']: + if config.config['integration'] or config.config['all']: saw_flag = True threads.append(IntegrationTestRunner(params)) @@ -351,21 +456,56 @@ def main() -> Optional[int]: results: Dict[str, TestResults] = {} while len(results) != len(threads): + started = 0 + done = 0 + failed = 0 + for thread in threads: + (s, tr) = thread.get_status() + started += s + failed += len(tr.tests_failed) + len(tr.tests_timed_out) + done += failed + len(tr.tests_succeeded) if not thread.is_alive(): tid = thread.name if tid not in results: result = thread.join() if result: results[tid] = result - if not result.normal_exit: + if len(result.tests_failed) > 0: + logger.error( + 'Thread %s returned abnormal results; killing the others.', tid + ) halt_event.set() - time.sleep(1.0) - test_results_report(results) + if started > 0: + percent_done = done / started + else: + percent_done = 0.0 + + if failed == 0: + color = ansi.fg('green') + else: + color = ansi.fg('red') + + if percent_done < 100.0: + print( + text_utils.bar_graph_string( + done, + started, + text=text_utils.BarGraphText.FRACTION, + width=80, + fgcolor=color, + ), + end='\r', + flush=True, + ) + time.sleep(0.5) + + print(f'{ansi.clear_line()}Final Report:') if config.config['coverage']: code_coverage_report() - return 0 + total_problems = test_results_report(results) + return total_problems if __name__ == '__main__':