X-Git-Url: https://wannabe.guru.org/gitweb/?a=blobdiff_plain;f=tests%2Frun_tests.py;h=3d587ef6f5ba8d79279cf8e08ddd1e620894474e;hb=8ba38a9764d74522530a4a1e95fa85064039fc1c;hp=06d4a9c839f71013674f907391c2b64eea610d67;hpb=7278ecc869330a341d194eecacaed4c00596971e;p=pyutils.git diff --git a/tests/run_tests.py b/tests/run_tests.py index 06d4a9c..3d587ef 100755 --- a/tests/run_tests.py +++ b/tests/run_tests.py @@ -4,6 +4,8 @@ A smart, fast test runner. Used in a git pre-commit hook. """ +from __future__ import annotations + import logging import os import re @@ -18,7 +20,6 @@ from overrides import overrides from pyutils import ansi, bootstrap, config, dict_utils, exec_utils, text_utils from pyutils.files import file_utils -from pyutils.parallelize import deferred_operand from pyutils.parallelize import parallelize as par from pyutils.parallelize import smart_future, thread_utils @@ -107,6 +108,40 @@ class TestResults: __radd__ = __add__ + @staticmethod + def empty_test_results(suite_name: str) -> TestResults: + return TestResults( + name=suite_name, + tests_executed={}, + tests_succeeded=[], + tests_failed=[], + tests_timed_out=[], + ) + + @staticmethod + def single_test_succeeded(name: str) -> TestResults: + return TestResults(name, {}, [name], [], []) + + @staticmethod + def single_test_failed(name: str) -> TestResults: + return TestResults( + name, + {}, + [], + [name], + [], + ) + + @staticmethod + def single_test_timed_out(name: str) -> TestResults: + return TestResults( + name, + {}, + [], + [], + [name], + ) + def __repr__(self) -> str: out = f'{self.name}: ' out += f'{ansi.fg("green")}' @@ -122,7 +157,7 @@ class TestResults: out += '\n' if len(self.tests_timed_out) > 0: - out += f' ..{ansi.fg("yellow")}' + out += f' ..{ansi.fg("lightning yellow")}' out += f'{len(self.tests_timed_out)} tests timed out' out += f'{ansi.reset()}:\n' for test in self.tests_failed: @@ -143,13 +178,7 @@ class TestRunner(ABC, thread_utils.ThreadWithReturnValue): """ super().__init__(self, target=self.begin, args=[params]) self.params = params - self.test_results = TestResults( - name=self.get_name(), - tests_executed={}, - tests_succeeded=[], - tests_failed=[], - tests_timed_out=[], - ) + self.test_results = TestResults.empty_test_results(self.get_name()) self.lock = threading.Lock() @abstractmethod @@ -182,7 +211,7 @@ class TemplatedTestRunner(TestRunner, ABC): pass def check_for_abort(self) -> bool: - """Periodically caled to check to see if we need to stop.""" + """Periodically called to check to see if we need to stop.""" if self.params.halt_event.is_set(): logger.debug('Thread %s saw halt event; exiting.', self.get_name()) @@ -210,32 +239,25 @@ class TemplatedTestRunner(TestRunner, ABC): ) -> TestResults: """Execute a particular commandline to run a test.""" + msg = f'{self.get_name()}: {test.name} ({test.cmdline}) ' try: output = exec_utils.cmd( test.cmdline, timeout_seconds=timeout, ) if "***Test Failed***" in output: - msg = f'{self.get_name()}: {test.name} ({test.cmdline}) failed; doctest failure message detected' + msg += 'failed; doctest failure message detected.' logger.error(msg) self.persist_output(test, msg, output) - return TestResults( - test.name, - {}, - [], - [test.name], - [], - ) + return TestResults.single_test_failed(test.name) + + msg += 'succeeded.' + self.persist_output(test, msg, output) + logger.debug(msg) + return TestResults.single_test_succeeded(test.name) - self.persist_output( - test, f'{test.name} ({test.cmdline}) succeeded.', output - ) - logger.debug( - '%s: %s (%s) succeeded', self.get_name(), test.name, test.cmdline - ) - return TestResults(test.name, {}, [test.name], [], []) except subprocess.TimeoutExpired as e: - msg = f'{self.get_name()}: {test.name} ({test.cmdline}) timed out after {e.timeout:.1f} seconds.' + msg += f'timed out after {e.timeout:.1f} seconds.' logger.error(msg) logger.debug( '%s: %s output when it timed out: %s', @@ -244,27 +266,16 @@ class TemplatedTestRunner(TestRunner, ABC): e.output, ) self.persist_output(test, msg, e.output.decode('utf-8')) - return TestResults( - test.name, - {}, - [], - [], - [test.name], - ) + return TestResults.single_test_timed_out(test.name) + except subprocess.CalledProcessError as e: - msg = f'{self.get_name()}: {test.name} ({test.cmdline}) failed; exit code {e.returncode}' + msg += f'failed with exit code {e.returncode}.' logger.error(msg) logger.debug( '%s: %s output when it failed: %s', self.get_name(), test.name, e.output ) self.persist_output(test, msg, e.output.decode('utf-8')) - return TestResults( - test.name, - {}, - [], - [test.name], - [], - ) + return TestResults.single_test_failed(test.name) @overrides def begin(self, params: TestingParameters) -> TestResults: @@ -289,13 +300,10 @@ class TemplatedTestRunner(TestRunner, ABC): ) self.test_results.tests_executed[test_to_run.name] = time.time() - for future in smart_future.wait_any(running, log_exceptions=False): - result = deferred_operand.DeferredOperand.resolve(future) + for result in smart_future.wait_any(running, log_exceptions=False): logger.debug('Test %s finished.', result.name) - - # We sometimes run the same test more than once. Do not allow - # one run's results to klobber the other's. self.test_results += result + if self.check_for_abort(): logger.debug( '%s: check_for_abort told us to exit early.', self.get_name() @@ -467,9 +475,9 @@ def code_coverage_report(): ) print(out) print( - """To recall this report w/o re-running the tests: + f"""To recall this report w/o re-running the tests: - $ coverage report --omit=config-3.*.py,*_test.py,*_itest.py --sort=-cover + $ {ansi.bold()}coverage report --omit=config-3.*.py,*_test.py,*_itest.py --sort=-cover{ansi.reset()} ...from the 'tests' directory. Note that subsequent calls to run_tests.py with --coverage will klobber previous results. See: @@ -482,9 +490,9 @@ run_tests.py with --coverage will klobber previous results. See: @bootstrap.initialize def main() -> Optional[int]: saw_flag = False - halt_event = threading.Event() threads: List[TestRunner] = [] + halt_event = threading.Event() halt_event.clear() params = TestingParameters( halt_on_error=True, @@ -494,7 +502,6 @@ def main() -> Optional[int]: if config.config['coverage']: logger.debug('Clearing existing coverage data via "coverage erase".') exec_utils.cmd('coverage erase') - if config.config['unittests'] or config.config['all']: saw_flag = True threads.append(UnittestTestRunner(params)) @@ -507,15 +514,14 @@ def main() -> Optional[int]: if not saw_flag: config.print_usage() - print('ERROR: one of --unittests, --doctests or --integration is required.') - return 1 + config.error('One of --unittests, --doctests or --integration is required.', 1) for thread in threads: thread.start() - results: Dict[str, Optional[TestResults]] = {} start_time = time.time() last_update = start_time + results: Dict[str, Optional[TestResults]] = {} still_running = {} while len(results) != len(threads): @@ -538,6 +544,7 @@ def main() -> Optional[int]: } still_running[tid] = running_with_start_time + # Maybe print tests that are still running. now = time.time() if now - start_time > 5.0: if now - last_update > 3.0: @@ -555,12 +562,13 @@ def main() -> Optional[int]: else: print(f'Still running: {len(update)} tests.') + # Maybe signal the other threads to stop too. if not thread.is_alive(): if tid not in results: result = thread.join() if result: results[tid] = result - if len(result.tests_failed) > 0: + if (len(result.tests_failed) + len(result.tests_timed_out)) > 0: logger.error( 'Thread %s returned abnormal results; killing the others.', tid, @@ -574,9 +582,8 @@ def main() -> Optional[int]: halt_event.set() results[tid] = None - if failed == 0: - color = ansi.fg('green') - else: + color = ansi.fg('green') + if failed > 0: color = ansi.fg('red') if started > 0: @@ -590,17 +597,19 @@ def main() -> Optional[int]: done, started, text=text_utils.BarGraphText.FRACTION, - width=80, + width=72, fgcolor=color, ), - end='\r', + end='', flush=True, ) - time.sleep(0.5) + print(f' {color}{now - start_time:.1f}s{ansi.reset()}', end='\r') + time.sleep(0.1) - print(f'{ansi.clear_line()}Final Report:') + print(f'{ansi.clear_line()}\n{ansi.underline()}Final Report:{ansi.reset()}') if config.config['coverage']: code_coverage_report() + print(f'Test suite runtime: {time.time() - start_time:.1f}s') total_problems = test_results_report(results) if total_problems > 0: logging.error(