X-Git-Url: https://wannabe.guru.org/gitweb/?a=blobdiff_plain;f=tests%2Frun_tests.py;h=ecd47dd4347e39e0ac98a492e8f0c327329bfb49;hb=e46158e49121b8a955bb07b73f5bcf9928b79c90;hp=ce9d63e93c349b4a9f6c08f213cd6d1294fc0955;hpb=9fd5e4976ff85670ab308e24aad9f033f92de812;p=python_utils.git diff --git a/tests/run_tests.py b/tests/run_tests.py index ce9d63e..ecd47dd 100755 --- a/tests/run_tests.py +++ b/tests/run_tests.py @@ -1,7 +1,7 @@ #!/usr/bin/env python3 """ -A smart, fast test runner. +A smart, fast test runner. Used in a git pre-commit hook. """ import logging @@ -12,7 +12,7 @@ import threading import time from abc import ABC, abstractmethod from dataclasses import dataclass -from typing import Any, Dict, List, Optional +from typing import Any, Dict, List, Optional, Tuple from overrides import overrides @@ -22,38 +22,115 @@ import config import exec_utils import file_utils import parallelize as par +import smart_future import text_utils import thread_utils logger = logging.getLogger(__name__) -args = config.add_commandline_args(f'({__file__})', 'Args related to __file__') +args = config.add_commandline_args(f'({__file__})', f'Args related to {__file__}') args.add_argument('--unittests', '-u', action='store_true', help='Run unittests.') args.add_argument('--doctests', '-d', action='store_true', help='Run doctests.') args.add_argument('--integration', '-i', action='store_true', help='Run integration tests.') +args.add_argument( + '--all', + '-a', + action='store_true', + help='Run unittests, doctests and integration tests. Equivalient to -u -d -i', +) args.add_argument( '--coverage', '-c', action='store_true', help='Run tests and capture code coverage data' ) HOME = os.environ['HOME'] +# These tests will be run twice in --coverage mode: once to get code +# coverage and then again with not coverage enabeled. This is because +# they pay attention to code performance which is adversely affected +# by coverage. +PERF_SENSATIVE_TESTS = set(['/home/scott/lib/python_modules/tests/string_utils_test.py']) + @dataclass class TestingParameters: halt_on_error: bool + """Should we stop as soon as one error has occurred?""" + halt_event: threading.Event + """An event that, when set, indicates to stop ASAP.""" + + +@dataclass +class TestToRun: + name: str + """The name of the test""" + + kind: str + """The kind of the test""" + + cmdline: str + """The command line to execute""" @dataclass class TestResults: name: str + """The name of this test / set of tests.""" + tests_executed: List[str] + """Tests that were executed.""" + tests_succeeded: List[str] + """Tests that succeeded.""" + tests_failed: List[str] + """Tests that failed.""" + tests_timed_out: List[str] + """Tests that timed out.""" + + def __add__(self, other): + self.tests_executed.extend(other.tests_executed) + self.tests_succeeded.extend(other.tests_succeeded) + self.tests_failed.extend(other.tests_failed) + self.tests_timed_out.extend(other.tests_timed_out) + return self + + __radd__ = __add__ + + def __repr__(self) -> str: + out = f'{self.name}: ' + out += f'{ansi.fg("green")}' + out += f'{len(self.tests_succeeded)}/{len(self.tests_executed)} passed' + out += f'{ansi.reset()}.\n' + + if len(self.tests_failed) > 0: + out += f' ..{ansi.fg("red")}' + out += f'{len(self.tests_failed)} tests failed' + out += f'{ansi.reset()}:\n' + for test in self.tests_failed: + out += f' {test}\n' + out += '\n' + + if len(self.tests_timed_out) > 0: + out += f' ..{ansi.fg("yellow")}' + out += f'{len(self.tests_timed_out)} tests timed out' + out += f'{ansi.reset()}:\n' + for test in self.tests_failed: + out += f' {test}\n' + out += '\n' + return out class TestRunner(ABC, thread_utils.ThreadWithReturnValue): + """A Base class for something that runs a test.""" + def __init__(self, params: TestingParameters): + """Create a TestRunner. + + Args: + params: Test running paramters. + + """ super().__init__(self, target=self.begin, args=[params]) self.params = params self.test_results = TestResults( @@ -63,32 +140,41 @@ class TestRunner(ABC, thread_utils.ThreadWithReturnValue): tests_failed=[], tests_timed_out=[], ) - - def aggregate_test_results(self, result: TestResults): - self.test_results.tests_executed.extend(result.tests_executed) - self.test_results.tests_succeeded.extend(result.tests_succeeded) - self.test_results.tests_failed.extend(result.tests_failed) - self.test_results.tests_timed_out.extend(result.tests_timed_out) + self.tests_started = 0 + self.lock = threading.Lock() @abstractmethod def get_name(self) -> str: + """The name of this test collection.""" pass + def get_status(self) -> Tuple[int, TestResults]: + """Ask the TestRunner for its status.""" + with self.lock: + return (self.tests_started, self.test_results) + @abstractmethod def begin(self, params: TestingParameters) -> TestResults: + """Start execution.""" pass class TemplatedTestRunner(TestRunner, ABC): + """A TestRunner that has a recipe for executing the tests.""" + @abstractmethod - def identify_tests(self) -> List[Any]: + def identify_tests(self) -> List[TestToRun]: + """Return a list of tuples (test, cmdline) that should be executed.""" pass @abstractmethod - def run_test(self, test: Any) -> TestResults: + def run_test(self, test: TestToRun) -> TestResults: + """Run a single test and return its TestResults.""" pass def check_for_abort(self): + """Periodically caled to check to see if we need to stop.""" + if self.params.halt_event.is_set(): logger.debug('Thread %s saw halt event; exiting.', self.get_name()) raise Exception("Kill myself!") @@ -97,20 +183,10 @@ class TemplatedTestRunner(TestRunner, ABC): logger.error('Thread %s saw abnormal results; exiting.', self.get_name()) raise Exception("Kill myself!") - def status_report(self, running: List[Any], done: List[Any]): - total = len(running) + len(done) - logging.info( - '%s: %d/%d in flight; %d/%d completed.', - self.get_name(), - len(running), - total, - len(done), - total, - ) + def persist_output(self, test: TestToRun, message: str, output: str) -> None: + """Called to save the output of a test run.""" - def persist_output(self, test_name: str, message: str, output: str) -> None: - basename = file_utils.without_path(test_name) - dest = f'{basename}-output.txt' + dest = f'{test.name}-output.txt' with open(f'./test_output/{dest}', 'w') as wf: print(message, file=wf) print('-' * len(message), file=wf) @@ -118,45 +194,46 @@ class TemplatedTestRunner(TestRunner, ABC): def execute_commandline( self, - test_name: str, - cmdline: str, + test: TestToRun, *, timeout: float = 120.0, ) -> TestResults: + """Execute a particular commandline to run a test.""" try: - logger.debug('%s: Running %s (%s)', self.get_name(), test_name, cmdline) output = exec_utils.cmd( - cmdline, + test.cmdline, timeout_seconds=timeout, ) - self.persist_output(test_name, f'{test_name} ({cmdline}) succeeded.', output) - logger.debug('%s (%s) succeeded', test_name, cmdline) - return TestResults(test_name, [test_name], [test_name], [], []) + self.persist_output(test, f'{test.name} ({test.cmdline}) succeeded.', output) + logger.debug('%s: %s (%s) succeeded', self.get_name(), test.name, test.cmdline) + return TestResults(test.name, [test.name], [test.name], [], []) except subprocess.TimeoutExpired as e: - msg = f'{self.get_name()}: {test_name} ({cmdline}) timed out after {e.timeout:.1f} seconds.' + msg = f'{self.get_name()}: {test.name} ({test.cmdline}) timed out after {e.timeout:.1f} seconds.' logger.error(msg) logger.debug( - '%s: %s output when it timed out: %s', self.get_name(), test_name, e.output + '%s: %s output when it timed out: %s', self.get_name(), test.name, e.output ) - self.persist_output(test_name, msg, e.output) + self.persist_output(test, msg, e.output.decode('utf-8')) return TestResults( - test_name, - [test_name], + test.name, + [test.name], [], [], - [test_name], + [test.name], ) except subprocess.CalledProcessError as e: - msg = f'{self.get_name()}: {test_name} ({cmdline}) failed; exit code {e.returncode}' + msg = ( + f'{self.get_name()}: {test.name} ({test.cmdline}) failed; exit code {e.returncode}' + ) logger.error(msg) - logger.debug('%s: %s output when it failed: %s', self.get_name(), test_name, e.output) - self.persist_output(test_name, msg, e.output) + logger.debug('%s: %s output when it failed: %s', self.get_name(), test.name, e.output) + self.persist_output(test, msg, e.output.decode('utf-8')) return TestResults( - test_name, - [test_name], + test.name, + [test.name], [], - [test_name], + [test.name], [], ) @@ -164,111 +241,160 @@ class TemplatedTestRunner(TestRunner, ABC): def begin(self, params: TestingParameters) -> TestResults: logger.debug('Thread %s started.', self.get_name()) interesting_tests = self.identify_tests() + logger.debug('%s: Identified %d tests to be run.', self.get_name(), len(interesting_tests)) + + # Note: because of @parallelize on run_tests it actually + # returns a SmartFuture with a TestResult inside of it. + # That's the reason for this Any business. running: List[Any] = [] - done: List[Any] = [] - for test in interesting_tests: - running.append(self.run_test(test)) + for test_to_run in interesting_tests: + running.append(self.run_test(test_to_run)) + logger.debug( + '%s: Test %s started in the background.', self.get_name(), test_to_run.name + ) + self.tests_started += 1 - while len(running) > 0: - self.status_report(running, done) + for future in smart_future.wait_any(running): self.check_for_abort() - newly_finished = [] - for fut in running: - if fut.is_ready(): - newly_finished.append(fut) - result = fut._resolve() - logger.debug('Test %s finished.', result.name) - self.aggregate_test_results(result) - - for fut in newly_finished: - running.remove(fut) - done.append(fut) - time.sleep(1.0) + result = future._resolve() + logger.debug('Test %s finished.', result.name) + self.test_results += result logger.debug('Thread %s finished.', self.get_name()) return self.test_results class UnittestTestRunner(TemplatedTestRunner): + """Run all known Unittests.""" + @overrides def get_name(self) -> str: - return "UnittestTestRunner" + return "Unittests" @overrides - def identify_tests(self) -> List[Any]: - return list(file_utils.expand_globs('*_test.py')) + def identify_tests(self) -> List[TestToRun]: + ret = [] + for test in file_utils.expand_globs('*_test.py'): + basename = file_utils.without_path(test) + if config.config['coverage']: + ret.append( + TestToRun( + name=basename, + kind='unittest capturing coverage', + cmdline=f'coverage run --source {HOME}/lib {test} --unittests_ignore_perf 2>&1', + ) + ) + if test in PERF_SENSATIVE_TESTS: + ret.append( + TestToRun( + name=basename, + kind='unittest w/o coverage to record perf', + cmdline=f'{test} 2>&1', + ) + ) + else: + ret.append( + TestToRun( + name=basename, + kind='unittest', + cmdline=f'{test} 2>&1', + ) + ) + return ret @par.parallelize - def run_test(self, test: Any) -> TestResults: - if config.config['coverage']: - cmdline = f'coverage run --source {HOME}/lib {test} --unittests_ignore_perf' - else: - cmdline = test - return self.execute_commandline(test, cmdline) + def run_test(self, test: TestToRun) -> TestResults: + return self.execute_commandline(test) class DoctestTestRunner(TemplatedTestRunner): + """Run all known Doctests.""" + @overrides def get_name(self) -> str: - return "DoctestTestRunner" + return "Doctests" @overrides - def identify_tests(self) -> List[Any]: + def identify_tests(self) -> List[TestToRun]: ret = [] out = exec_utils.cmd('grep -lR "^ *import doctest" /home/scott/lib/python_modules/*') - for line in out.split('\n'): - if re.match(r'.*\.py$', line): - if 'run_tests.py' not in line: - ret.append(line) + for test in out.split('\n'): + if re.match(r'.*\.py$', test): + if 'run_tests.py' not in test: + basename = file_utils.without_path(test) + if config.config['coverage']: + ret.append( + TestToRun( + name=basename, + kind='doctest capturing coverage', + cmdline=f'coverage run --source {HOME}/lib {test} 2>&1', + ) + ) + if test in PERF_SENSATIVE_TESTS: + ret.append( + TestToRun( + name=basename, + kind='doctest w/o coverage to record perf', + cmdline=f'python3 {test} 2>&1', + ) + ) + else: + ret.append( + TestToRun(name=basename, kind='doctest', cmdline=f'python3 {test} 2>&1') + ) return ret @par.parallelize - def run_test(self, test: Any) -> TestResults: - if config.config['coverage']: - cmdline = f'coverage run --source {HOME}/lib {test} 2>&1' - else: - cmdline = f'python3 {test}' - return self.execute_commandline(test, cmdline) + def run_test(self, test: TestToRun) -> TestResults: + return self.execute_commandline(test) class IntegrationTestRunner(TemplatedTestRunner): + """Run all know Integration tests.""" + @overrides def get_name(self) -> str: - return "IntegrationTestRunner" + return "Integration Tests" @overrides - def identify_tests(self) -> List[Any]: - return list(file_utils.expand_globs('*_itest.py')) + def identify_tests(self) -> List[TestToRun]: + ret = [] + for test in file_utils.expand_globs('*_itest.py'): + basename = file_utils.without_path(test) + if config.config['coverage']: + ret.append( + TestToRun( + name=basename, + kind='integration test capturing coverage', + cmdline=f'coverage run --source {HOME}/lib {test} 2>&1', + ) + ) + if test in PERF_SENSATIVE_TESTS: + ret.append( + TestToRun( + name=basename, + kind='integration test w/o coverage to capture perf', + cmdline=f'{test} 2>&1', + ) + ) + else: + ret.append( + TestToRun(name=basename, kind='integration test', cmdline=f'{test} 2>&1') + ) + return ret @par.parallelize - def run_test(self, test: Any) -> TestResults: - if config.config['coverage']: - cmdline = f'coverage run --source {HOME}/lib {test}' - else: - cmdline = test - return self.execute_commandline(test, cmdline) + def run_test(self, test: TestToRun) -> TestResults: + return self.execute_commandline(test) def test_results_report(results: Dict[str, TestResults]) -> int: + """Give a final report about the tests that were run.""" total_problems = 0 - for type, result in results.items(): - print(f'{result.name}: ', end='') - print( - f'{ansi.fg("green")}{len(result.tests_succeeded)}/{len(result.tests_executed)} passed{ansi.reset()}.' - ) - if len(result.tests_failed) > 0: - print(f' ..{ansi.fg("red")}{len(result.tests_failed)} tests failed{ansi.reset()}:') - for test in result.tests_failed: - print(f' {test}') - total_problems += len(result.tests_failed) - - if len(result.tests_timed_out) > 0: - print( - f' ..{ansi.fg("yellow")}{len(result.tests_timed_out)} tests timed out{ansi.reset()}:' - ) - for test in result.tests_failed: - print(f' {test}') - total_problems += len(result.tests_timed_out) + for result in results.values(): + print(result, end='') + total_problems += len(result.tests_failed) + total_problems += len(result.tests_timed_out) if total_problems > 0: print('Reminder: look in ./test_output to view test output logs') @@ -276,15 +402,15 @@ def test_results_report(results: Dict[str, TestResults]) -> int: def code_coverage_report(): + """Give a final code coverage report.""" text_utils.header('Code Coverage') exec_utils.cmd('coverage combine .coverage*') - out = exec_utils.cmd('coverage report --omit=config-3.8.py,*_test.py,*_itest.py --sort=-cover') + out = exec_utils.cmd('coverage report --omit=config-3.*.py,*_test.py,*_itest.py --sort=-cover') print(out) print( - """ -To recall this report w/o re-running the tests: + """To recall this report w/o re-running the tests: - $ coverage report --omit=config-3.8.py,*_test.py,*_itest.py --sort=-cover + $ coverage report --omit=config-3.*.py,*_test.py,*_itest.py --sort=-cover ...from the 'tests' directory. Note that subsequent calls to run_tests.py with --coverage will klobber previous results. See: @@ -310,13 +436,13 @@ def main() -> Optional[int]: logger.debug('Clearing existing coverage data via "coverage erase".') exec_utils.cmd('coverage erase') - if config.config['unittests']: + if config.config['unittests'] or config.config['all']: saw_flag = True threads.append(UnittestTestRunner(params)) - if config.config['doctests']: + if config.config['doctests'] or config.config['all']: saw_flag = True threads.append(DoctestTestRunner(params)) - if config.config['integration']: + if config.config['integration'] or config.config['all']: saw_flag = True threads.append(IntegrationTestRunner(params)) @@ -330,7 +456,15 @@ def main() -> Optional[int]: results: Dict[str, TestResults] = {} while len(results) != len(threads): + started = 0 + done = 0 + failed = 0 + for thread in threads: + (s, tr) = thread.get_status() + started += s + failed += len(tr.tests_failed) + len(tr.tests_timed_out) + done += failed + len(tr.tests_succeeded) if not thread.is_alive(): tid = thread.name if tid not in results: @@ -342,8 +476,32 @@ def main() -> Optional[int]: 'Thread %s returned abnormal results; killing the others.', tid ) halt_event.set() - time.sleep(1.0) + if started > 0: + percent_done = done / started + else: + percent_done = 0.0 + + if failed == 0: + color = ansi.fg('green') + else: + color = ansi.fg('red') + + if percent_done < 100.0: + print( + text_utils.bar_graph_string( + done, + started, + text=text_utils.BarGraphText.FRACTION, + width=80, + fgcolor=color, + ), + end='\r', + flush=True, + ) + time.sleep(0.5) + + print(f'{ansi.clear_line()}Final Report:') if config.config['coverage']: code_coverage_report() total_problems = test_results_report(results)