#!/usr/bin/env python3 """ A smart, fast test runner. """ import logging import os import re import subprocess import threading import time from abc import ABC, abstractmethod from dataclasses import dataclass from typing import Any, Dict, List, Optional from overrides import overrides import ansi import bootstrap import config import exec_utils import file_utils import parallelize as par import text_utils import thread_utils logger = logging.getLogger(__name__) args = config.add_commandline_args(f'({__file__})', 'Args related to __file__') args.add_argument('--unittests', '-u', action='store_true', help='Run unittests.') args.add_argument('--doctests', '-d', action='store_true', help='Run doctests.') args.add_argument('--integration', '-i', action='store_true', help='Run integration tests.') args.add_argument( '--coverage', '-c', action='store_true', help='Run tests and capture code coverage data' ) HOME = os.environ['HOME'] @dataclass class TestingParameters: halt_on_error: bool halt_event: threading.Event @dataclass class TestResults: name: str tests_executed: List[str] tests_succeeded: List[str] tests_failed: List[str] tests_timed_out: List[str] class TestRunner(ABC, thread_utils.ThreadWithReturnValue): def __init__(self, params: TestingParameters): super().__init__(self, target=self.begin, args=[params]) self.params = params self.test_results = TestResults( name=self.get_name(), tests_executed=[], tests_succeeded=[], tests_failed=[], tests_timed_out=[], ) def aggregate_test_results(self, result: TestResults): self.test_results.tests_executed.extend(result.tests_executed) self.test_results.tests_succeeded.extend(result.tests_succeeded) self.test_results.tests_failed.extend(result.tests_failed) self.test_results.tests_timed_out.extend(result.tests_timed_out) @abstractmethod def get_name(self) -> str: pass @abstractmethod def begin(self, params: TestingParameters) -> TestResults: pass class TemplatedTestRunner(TestRunner, ABC): @abstractmethod def identify_tests(self) -> List[Any]: pass @abstractmethod def run_test(self, test: Any) -> TestResults: pass def check_for_abort(self): if self.params.halt_event.is_set(): logger.debug('Thread %s saw halt event; exiting.', self.get_name()) raise Exception("Kill myself!") if self.params.halt_on_error: if len(self.test_results.tests_failed) > 0: logger.error('Thread %s saw abnormal results; exiting.', self.get_name()) raise Exception("Kill myself!") def status_report(self, running: List[Any], done: List[Any]): total = len(running) + len(done) logging.info( '%s: %d/%d in flight; %d/%d completed.', self.get_name(), len(running), total, len(done), total, ) def persist_output(self, test_name: str, message: str, output: str) -> None: basename = file_utils.without_path(test_name) dest = f'{basename}-output.txt' with open(f'./test_output/{dest}', 'w') as wf: print(message, file=wf) print('-' * len(message), file=wf) wf.write(output) def execute_commandline( self, test_name: str, cmdline: str, *, timeout: float = 120.0, ) -> TestResults: try: logger.debug('%s: Running %s (%s)', self.get_name(), test_name, cmdline) output = exec_utils.cmd( cmdline, timeout_seconds=timeout, ) self.persist_output(test_name, f'{test_name} ({cmdline}) succeeded.', output) logger.debug('%s (%s) succeeded', test_name, cmdline) return TestResults(test_name, [test_name], [test_name], [], []) except subprocess.TimeoutExpired as e: msg = f'{self.get_name()}: {test_name} ({cmdline}) timed out after {e.timeout:.1f} seconds.' logger.error(msg) logger.debug( '%s: %s output when it timed out: %s', self.get_name(), test_name, e.output ) self.persist_output(test_name, msg, e.output) return TestResults( test_name, [test_name], [], [], [test_name], ) except subprocess.CalledProcessError as e: msg = f'{self.get_name()}: {test_name} ({cmdline}) failed; exit code {e.returncode}' logger.error(msg) logger.debug('%s: %s output when it failed: %s', self.get_name(), test_name, e.output) self.persist_output(test_name, msg, e.output) return TestResults( test_name, [test_name], [], [test_name], [], ) @overrides def begin(self, params: TestingParameters) -> TestResults: logger.debug('Thread %s started.', self.get_name()) interesting_tests = self.identify_tests() running: List[Any] = [] done: List[Any] = [] for test in interesting_tests: running.append(self.run_test(test)) while len(running) > 0: self.status_report(running, done) self.check_for_abort() newly_finished = [] for fut in running: if fut.is_ready(): newly_finished.append(fut) result = fut._resolve() logger.debug('Test %s finished.', result.name) self.aggregate_test_results(result) for fut in newly_finished: running.remove(fut) done.append(fut) time.sleep(1.0) logger.debug('Thread %s finished.', self.get_name()) return self.test_results class UnittestTestRunner(TemplatedTestRunner): @overrides def get_name(self) -> str: return "UnittestTestRunner" @overrides def identify_tests(self) -> List[Any]: return list(file_utils.expand_globs('*_test.py')) @par.parallelize def run_test(self, test: Any) -> TestResults: if config.config['coverage']: cmdline = f'coverage run --source {HOME}/lib {test} --unittests_ignore_perf' else: cmdline = test return self.execute_commandline(test, cmdline) class DoctestTestRunner(TemplatedTestRunner): @overrides def get_name(self) -> str: return "DoctestTestRunner" @overrides def identify_tests(self) -> List[Any]: ret = [] out = exec_utils.cmd('grep -lR "^ *import doctest" /home/scott/lib/python_modules/*') for line in out.split('\n'): if re.match(r'.*\.py$', line): if 'run_tests.py' not in line: ret.append(line) return ret @par.parallelize def run_test(self, test: Any) -> TestResults: if config.config['coverage']: cmdline = f'coverage run --source {HOME}/lib {test} 2>&1' else: cmdline = f'python3 {test}' return self.execute_commandline(test, cmdline) class IntegrationTestRunner(TemplatedTestRunner): @overrides def get_name(self) -> str: return "IntegrationTestRunner" @overrides def identify_tests(self) -> List[Any]: return list(file_utils.expand_globs('*_itest.py')) @par.parallelize def run_test(self, test: Any) -> TestResults: if config.config['coverage']: cmdline = f'coverage run --source {HOME}/lib {test}' else: cmdline = test return self.execute_commandline(test, cmdline) def test_results_report(results: Dict[str, TestResults]) -> int: total_problems = 0 for type, result in results.items(): print(f'{result.name}: ', end='') print( f'{ansi.fg("green")}{len(result.tests_succeeded)}/{len(result.tests_executed)} passed{ansi.reset()}.' ) if len(result.tests_failed) > 0: print(f' ..{ansi.fg("red")}{len(result.tests_failed)} tests failed{ansi.reset()}:') for test in result.tests_failed: print(f' {test}') total_problems += len(result.tests_failed) if len(result.tests_timed_out) > 0: print( f' ..{ansi.fg("yellow")}{len(result.tests_timed_out)} tests timed out{ansi.reset()}:' ) for test in result.tests_failed: print(f' {test}') total_problems += len(result.tests_timed_out) if total_problems > 0: print('Reminder: look in ./test_output to view test output logs') return total_problems def code_coverage_report(): text_utils.header('Code Coverage') exec_utils.cmd('coverage combine .coverage*') out = exec_utils.cmd('coverage report --omit=config-3.8.py,*_test.py,*_itest.py --sort=-cover') print(out) print( """ To recall this report w/o re-running the tests: $ coverage report --omit=config-3.8.py,*_test.py,*_itest.py --sort=-cover ...from the 'tests' directory. Note that subsequent calls to run_tests.py with --coverage will klobber previous results. See: https://coverage.readthedocs.io/en/6.2/ """ ) @bootstrap.initialize def main() -> Optional[int]: saw_flag = False halt_event = threading.Event() threads: List[TestRunner] = [] halt_event.clear() params = TestingParameters( halt_on_error=True, halt_event=halt_event, ) if config.config['coverage']: logger.debug('Clearing existing coverage data via "coverage erase".') exec_utils.cmd('coverage erase') if config.config['unittests']: saw_flag = True threads.append(UnittestTestRunner(params)) if config.config['doctests']: saw_flag = True threads.append(DoctestTestRunner(params)) if config.config['integration']: saw_flag = True threads.append(IntegrationTestRunner(params)) if not saw_flag: config.print_usage() print('ERROR: one of --unittests, --doctests or --integration is required.') return 1 for thread in threads: thread.start() results: Dict[str, TestResults] = {} while len(results) != len(threads): for thread in threads: if not thread.is_alive(): tid = thread.name if tid not in results: result = thread.join() if result: results[tid] = result if len(result.tests_failed) > 0: logger.error( 'Thread %s returned abnormal results; killing the others.', tid ) halt_event.set() time.sleep(1.0) if config.config['coverage']: code_coverage_report() total_problems = test_results_report(results) return total_problems if __name__ == '__main__': main()