From a0722abe80c416e0c174f3ff861566834402d43b Mon Sep 17 00:00:00 2001 From: Scott Gasch Date: Thu, 2 Jun 2022 08:23:18 -0700 Subject: [PATCH] Initial stab at a smarter doc/unit/integration/coverage test runner. --- config.py | 15 +- executors.py | 10 +- smart_future.py | 31 ++-- tests/run_tests.py | 372 +++++++++++++++++++++++++++++++++++++++++++++ thread_utils.py | 21 +++ 5 files changed, 436 insertions(+), 13 deletions(-) create mode 100755 tests/run_tests.py diff --git a/config.py b/config.py index 7bf812e..c7c686d 100644 --- a/config.py +++ b/config.py @@ -254,6 +254,19 @@ def _reorder_arg_action_groups_before_help(entry_module: Optional[str]): return reordered_action_groups +def print_usage() -> None: + """Prints the normal help usage message out.""" + ARGS.print_help() + + +def usage() -> str: + """ + Returns: + program usage help text as a string. + """ + return ARGS.format_usage() + + def _augment_sys_argv_from_environment_variables(): """Internal. Look at the system environment for variables that match arg names. This is done via some munging such that: @@ -269,7 +282,7 @@ def _augment_sys_argv_from_environment_variables(): """ - usage_message = ARGS.format_usage() + usage_message = usage() optional = False var = '' for x in usage_message.split(): diff --git a/executors.py b/executors.py index 9a73248..a9d25da 100644 --- a/executors.py +++ b/executors.py @@ -150,7 +150,10 @@ class ThreadExecutor(BaseExecutor): workers = max_workers elif 'executors_threadpool_size' in config.config: workers = config.config['executors_threadpool_size'] - logger.debug('Creating threadpool executor with %d workers', workers) + if workers is not None: + logger.debug('Creating threadpool executor with %d workers', workers) + else: + logger.debug('Creating a default sized threadpool executor') self._thread_pool_executor = fut.ThreadPoolExecutor( max_workers=workers, thread_name_prefix="thread_executor_helper" ) @@ -203,7 +206,10 @@ class ProcessExecutor(BaseExecutor): workers = max_workers elif 'executors_processpool_size' in config.config: workers = config.config['executors_processpool_size'] - logger.debug('Creating processpool executor with %d workers.', workers) + if workers is not None: + logger.debug('Creating processpool executor with %d workers.', workers) + else: + logger.debug('Creating a default sized processpool executor') self._process_executor = fut.ProcessPoolExecutor( max_workers=workers, ) diff --git a/smart_future.py b/smart_future.py index c0fce3a..86f1b1c 100644 --- a/smart_future.py +++ b/smart_future.py @@ -32,6 +32,7 @@ def wait_any( *, callback: Callable = None, log_exceptions: bool = True, + timeout: float = None, ): """Await the completion of any of a collection of SmartFutures and invoke callback each time one completes, repeatedly, until they are @@ -44,6 +45,8 @@ def wait_any( log_exceptions: Should we log (warning + exception) any underlying exceptions raised during future processing or silently ignore then? + timeout: invoke callback with a periodicity of timeout while + awaiting futures """ real_futures = [] @@ -55,18 +58,26 @@ def wait_any( smart_future_by_real_future[x.wrapped_future] = x while len(completed_futures) != len(real_futures): - newly_completed_futures = concurrent.futures.as_completed(real_futures) - for f in newly_completed_futures: + print("TOP...") + try: + newly_completed_futures = concurrent.futures.as_completed(real_futures, timeout=timeout) + for f in newly_completed_futures: + if callback is not None: + callback() + completed_futures.add(f) + if log_exceptions and not f.cancelled(): + exception = f.exception() + if exception is not None: + logger.warning( + 'Future 0x%x raised an unhandled exception and exited.', id(f) + ) + logger.exception(exception) + raise exception + yield smart_future_by_real_future[f] + except TimeoutError: + print(f"HERE!!! {len(completed_futures)} / {len(real_futures)}.") if callback is not None: callback() - completed_futures.add(f) - if log_exceptions and not f.cancelled(): - exception = f.exception() - if exception is not None: - logger.warning('Future 0x%x raised an unhandled exception and exited.', id(f)) - logger.exception(exception) - raise exception - yield smart_future_by_real_future[f] if callback is not None: callback() diff --git a/tests/run_tests.py b/tests/run_tests.py new file mode 100755 index 0000000..2b2d238 --- /dev/null +++ b/tests/run_tests.py @@ -0,0 +1,372 @@ +#!/usr/bin/env python3 + +""" +A smart, fast test runner. +""" + +import logging +import os +import re +import subprocess +import threading +import time +from abc import ABC, abstractmethod +from dataclasses import dataclass +from typing import Any, Dict, List, Optional + +from overrides import overrides + +import bootstrap +import config +import exec_utils +import file_utils +import parallelize as par +import text_utils +import thread_utils + +logger = logging.getLogger(__name__) +args = config.add_commandline_args(f'({__file__})', 'Args related to __file__') +args.add_argument('--unittests', '-u', action='store_true', help='Run unittests.') +args.add_argument('--doctests', '-d', action='store_true', help='Run doctests.') +args.add_argument('--integration', '-i', action='store_true', help='Run integration tests.') +args.add_argument( + '--coverage', '-c', action='store_true', help='Run tests and capture code coverage data' +) + +HOME = os.environ['HOME'] + + +@dataclass +class TestingParameters: + halt_on_error: bool + halt_event: threading.Event + + +@dataclass +class TestResults: + name: str + num_tests_executed: int + num_tests_succeeded: int + num_tests_failed: int + normal_exit: bool + output: str + + +class TestRunner(ABC, thread_utils.ThreadWithReturnValue): + def __init__(self, params: TestingParameters): + super().__init__(self, target=self.begin, args=[params]) + self.params = params + self.test_results = TestResults( + name=f"All {self.get_name()} tests", + num_tests_executed=0, + num_tests_succeeded=0, + num_tests_failed=0, + normal_exit=True, + output="", + ) + + def aggregate_test_results(self, result: TestResults): + self.test_results.num_tests_executed += result.num_tests_executed + self.test_results.num_tests_succeeded += result.num_tests_succeeded + self.test_results.num_tests_failed += result.num_tests_failed + self.test_results.normal_exit = self.test_results.normal_exit and result.normal_exit + self.test_results.output += "\n\n\n" + result.output + + @abstractmethod + def get_name(self) -> str: + pass + + @abstractmethod + def begin(self, params: TestingParameters) -> TestResults: + pass + + +class TemplatedTestRunner(TestRunner, ABC): + @abstractmethod + def identify_tests(self) -> List[Any]: + pass + + @abstractmethod + def run_test(self, test: Any) -> TestResults: + pass + + def check_for_abort(self): + if self.params.halt_event.is_set(): + logger.debug('Thread %s saw halt event; exiting.', self.get_name()) + raise Exception("Kill myself!") + if not self.test_results.normal_exit: + if self.params.halt_on_error: + logger.error('Thread %s saw abnormal results; exiting.', self.get_name()) + raise Exception("Kill myself!") + + def status_report(self, running: List[Any], done: List[Any]): + total = len(running) + len(done) + logging.info( + '%s: %d/%d in flight; %d/%d completed.', + self.get_name(), + len(running), + total, + len(done), + total, + ) + + @overrides + def begin(self, params: TestingParameters) -> TestResults: + logger.debug('Thread %s started.', self.get_name()) + interesting_tests = self.identify_tests() + running: List[Any] = [] + done: List[Any] = [] + for test in interesting_tests: + running.append(self.run_test(test)) + + while len(running) > 0: + self.status_report(running, done) + self.check_for_abort() + newly_finished = [] + for fut in running: + if fut.is_ready(): + newly_finished.append(fut) + result = fut._resolve() + logger.debug('Test %s finished.', result.name) + self.aggregate_test_results(result) + + for fut in newly_finished: + running.remove(fut) + done.append(fut) + time.sleep(0.25) + + logger.debug('Thread %s finished.', self.get_name()) + return self.test_results + + +class UnittestTestRunner(TemplatedTestRunner): + @overrides + def get_name(self) -> str: + return "UnittestTestRunner" + + @overrides + def identify_tests(self) -> List[Any]: + return list(file_utils.expand_globs('*_test.py')) + + @par.parallelize + def run_test(self, test: Any) -> TestResults: + if config.config['coverage']: + cmdline = f'coverage run --source {HOME}/lib --append {test} --unittests_ignore_perf' + else: + cmdline = test + + try: + logger.debug('Running unittest %s (%s)', test, cmdline) + output = exec_utils.cmd( + cmdline, + timeout_seconds=120.0, + ) + except TimeoutError: + logger.error('Unittest %s timed out; ran for > 120.0 seconds', test) + return TestResults( + test, + 1, + 0, + 1, + False, + f"Unittest {test} timed out.", + ) + except subprocess.CalledProcessError: + logger.error('Unittest %s failed.', test) + return TestResults( + test, + 1, + 0, + 1, + False, + f"Unittest {test} failed.", + ) + return TestResults(test, 1, 1, 0, True, output) + + +class DoctestTestRunner(TemplatedTestRunner): + @overrides + def get_name(self) -> str: + return "DoctestTestRunner" + + @overrides + def identify_tests(self) -> List[Any]: + ret = [] + out = exec_utils.cmd('grep -lR "^ *import doctest" /home/scott/lib/python_modules/*') + for line in out.split('\n'): + if re.match(r'.*\.py$', line): + if 'run_tests.py' not in line: + ret.append(line) + return ret + + @par.parallelize + def run_test(self, test: Any) -> TestResults: + if config.config['coverage']: + cmdline = f'coverage run --source {HOME}/lib --append {test} 2>&1' + else: + cmdline = f'python3 {test}' + try: + logger.debug('Running doctest %s (%s).', test, cmdline) + output = exec_utils.cmd( + cmdline, + timeout_seconds=120.0, + ) + except TimeoutError: + logger.error('Doctest %s timed out; ran for > 120.0 seconds', test) + return TestResults( + test, + 1, + 0, + 1, + False, + f"Doctest {test} timed out.", + ) + except subprocess.CalledProcessError: + logger.error('Doctest %s failed.', test) + return TestResults( + test, + 1, + 0, + 1, + False, + f"Docttest {test} failed.", + ) + return TestResults( + test, + 1, + 1, + 0, + True, + "", + ) + + +class IntegrationTestRunner(TemplatedTestRunner): + @overrides + def get_name(self) -> str: + return "IntegrationTestRunner" + + @overrides + def identify_tests(self) -> List[Any]: + return list(file_utils.expand_globs('*_itest.py')) + + @par.parallelize + def run_test(self, test: Any) -> TestResults: + if config.config['coverage']: + cmdline = f'coverage run --source {HOME}/lib --append {test}' + else: + cmdline = test + try: + logger.debug('Running integration test %s (%s).', test, cmdline) + output = exec_utils.cmd( + cmdline, + timeout_seconds=240.0, + ) + except TimeoutError: + logger.error('Integration Test %s timed out; ran for > 240.0 seconds', test) + return TestResults( + test, + 1, + 0, + 1, + False, + f"Integration Test {test} timed out.", + ) + except subprocess.CalledProcessError: + logger.error('Integration Test %s failed.', test) + return TestResults( + test, + 1, + 0, + 1, + False, + f"Integration Test {test} failed.", + ) + return TestResults( + test, + 1, + 1, + 0, + True, + "", + ) + + +def test_results_report(results: Dict[str, TestResults]): + print(results) + + +def code_coverage_report(): + text_utils.header('Code Coverage') + out = exec_utils.cmd('coverage report --omit=config-3.8.py,*_test.py,*_itest.py --sort=-cover') + print(out) + print( + """ +To recall this report w/o re-running the tests: + + $ coverage report --omit=config-3.8.py,*_test.py,*_itest.py --sort=-cover + +...from the 'tests' directory. Note that subsequent calls to +run_tests.py with --coverage will klobber previous results. See: + + https://coverage.readthedocs.io/en/6.2/ + +""" + ) + + +@bootstrap.initialize +def main() -> Optional[int]: + saw_flag = False + halt_event = threading.Event() + threads: List[TestRunner] = [] + + halt_event.clear() + params = TestingParameters( + halt_on_error=True, + halt_event=halt_event, + ) + + if config.config['coverage']: + logger.debug('Clearing existing coverage data via "coverage erase".') + exec_utils.cmd('coverage erase') + + if config.config['unittests']: + saw_flag = True + threads.append(UnittestTestRunner(params)) + if config.config['doctests']: + saw_flag = True + threads.append(DoctestTestRunner(params)) + if config.config['integration']: + saw_flag = True + threads.append(IntegrationTestRunner(params)) + + if not saw_flag: + config.print_usage() + print('ERROR: one of --unittests, --doctests or --integration is required.') + return 1 + + for thread in threads: + thread.start() + + results: Dict[str, TestResults] = {} + while len(results) != len(threads): + for thread in threads: + if not thread.is_alive(): + tid = thread.name + if tid not in results: + result = thread.join() + if result: + results[tid] = result + if not result.normal_exit: + halt_event.set() + time.sleep(1.0) + + test_results_report(results) + if config.config['coverage']: + code_coverage_report() + return 0 + + +if __name__ == '__main__': + main() diff --git a/thread_utils.py b/thread_utils.py index 65f6037..df637e0 100644 --- a/thread_utils.py +++ b/thread_utils.py @@ -120,6 +120,27 @@ def background_thread( return wrapper(_funct) +class ThreadWithReturnValue(threading.Thread): + """A thread whose return value is plumbed back out as the return + value of :meth:`join`. + """ + + def __init__(self, group=None, target=None, name=None, args=(), kwargs={}, Verbose=None): + threading.Thread.__init__( + self, group=None, target=target, name=None, args=args, kwargs=kwargs + ) + self._target = target + self._return = None + + def run(self): + if self._target is not None: + self._return = self._target(*self._args, **self._kwargs) + + def join(self, *args): + threading.Thread.join(self, *args) + return self._return + + def periodically_invoke( period_sec: float, stop_after: Optional[int], -- 2.47.1