A smart, fast test runner. Used in a git pre-commit hook.
"""
+from __future__ import annotations
+
import logging
import os
import re
import time
from abc import ABC, abstractmethod
from dataclasses import dataclass
-from typing import Any, Dict, List, Optional, Tuple
+from typing import Any, Dict, List, Optional
from overrides import overrides
-from pyutils import ansi, bootstrap, config, exec_utils, text_utils
+from pyutils import ansi, bootstrap, config, dict_utils, exec_utils, text_utils
from pyutils.files import file_utils
from pyutils.parallelize import parallelize as par
from pyutils.parallelize import smart_future, thread_utils
logger = logging.getLogger(__name__)
-args = config.add_commandline_args(f'({__file__})', f'Args related to {__file__}')
+args = config.add_commandline_args(
+ f'Run Tests Driver ({__file__})', f'Args related to {__file__}'
+)
args.add_argument('--unittests', '-u', action='store_true', help='Run unittests.')
args.add_argument('--doctests', '-d', action='store_true', help='Run doctests.')
args.add_argument(
# they pay attention to code performance which is adversely affected
# by coverage.
PERF_SENSATIVE_TESTS = set(['string_utils_test.py'])
-TESTS_TO_SKIP = set(['zookeeper_test.py', 'run_tests.py'])
+TESTS_TO_SKIP = set(['zookeeper_test.py', 'zookeeper.py', 'run_tests.py'])
ROOT = ".."
name: str
"""The name of this test / set of tests."""
- tests_executed: List[str]
+ tests_executed: Dict[str, float]
"""Tests that were executed."""
tests_succeeded: List[str]
"""Tests that timed out."""
def __add__(self, other):
- self.tests_executed.extend(other.tests_executed)
+ merged = dict_utils.coalesce(
+ [self.tests_executed, other.tests_executed],
+ aggregation_function=dict_utils.raise_on_duplicated_keys,
+ )
+ self.tests_executed = merged
self.tests_succeeded.extend(other.tests_succeeded)
self.tests_failed.extend(other.tests_failed)
self.tests_timed_out.extend(other.tests_timed_out)
__radd__ = __add__
+ @staticmethod
+ def empty_test_results(suite_name: str) -> TestResults:
+ return TestResults(
+ name=suite_name,
+ tests_executed={},
+ tests_succeeded=[],
+ tests_failed=[],
+ tests_timed_out=[],
+ )
+
+ @staticmethod
+ def single_test_succeeded(name: str) -> TestResults:
+ return TestResults(name, {}, [name], [], [])
+
+ @staticmethod
+ def single_test_failed(name: str) -> TestResults:
+ return TestResults(
+ name,
+ {},
+ [],
+ [name],
+ [],
+ )
+
+ @staticmethod
+ def single_test_timed_out(name: str) -> TestResults:
+ return TestResults(
+ name,
+ {},
+ [],
+ [],
+ [name],
+ )
+
def __repr__(self) -> str:
out = f'{self.name}: '
out += f'{ansi.fg("green")}'
out += '\n'
if len(self.tests_timed_out) > 0:
- out += f' ..{ansi.fg("yellow")}'
+ out += f' ..{ansi.fg("lightning yellow")}'
out += f'{len(self.tests_timed_out)} tests timed out'
out += f'{ansi.reset()}:\n'
for test in self.tests_failed:
"""
super().__init__(self, target=self.begin, args=[params])
self.params = params
- self.test_results = TestResults(
- name=self.get_name(),
- tests_executed=[],
- tests_succeeded=[],
- tests_failed=[],
- tests_timed_out=[],
- )
- self.tests_started = 0
+ self.test_results = TestResults.empty_test_results(self.get_name())
self.lock = threading.Lock()
@abstractmethod
"""The name of this test collection."""
pass
- def get_status(self) -> Tuple[int, TestResults]:
+ def get_status(self) -> TestResults:
"""Ask the TestRunner for its status."""
with self.lock:
- return (self.tests_started, self.test_results)
+ return self.test_results
@abstractmethod
def begin(self, params: TestingParameters) -> TestResults:
"""Run a single test and return its TestResults."""
pass
- def check_for_abort(self):
- """Periodically caled to check to see if we need to stop."""
+ def check_for_abort(self) -> bool:
+ """Periodically called to check to see if we need to stop."""
if self.params.halt_event.is_set():
logger.debug('Thread %s saw halt event; exiting.', self.get_name())
- raise Exception("Kill myself!")
- if self.params.halt_on_error:
- if len(self.test_results.tests_failed) > 0:
- logger.error(
- 'Thread %s saw abnormal results; exiting.', self.get_name()
- )
- raise Exception("Kill myself!")
+ return True
+
+ if self.params.halt_on_error and len(self.test_results.tests_failed) > 0:
+ logger.error('Thread %s saw abnormal results; exiting.', self.get_name())
+ return True
+ return False
def persist_output(self, test: TestToRun, message: str, output: str) -> None:
"""Called to save the output of a test run."""
) -> TestResults:
"""Execute a particular commandline to run a test."""
+ msg = f'{self.get_name()}: {test.name} ({test.cmdline}) '
try:
output = exec_utils.cmd(
test.cmdline,
timeout_seconds=timeout,
)
- self.persist_output(
- test, f'{test.name} ({test.cmdline}) succeeded.', output
- )
- logger.debug(
- '%s: %s (%s) succeeded', self.get_name(), test.name, test.cmdline
- )
- return TestResults(test.name, [test.name], [test.name], [], [])
+ if "***Test Failed***" in output:
+ msg += 'failed; doctest failure message detected.'
+ logger.error(msg)
+ self.persist_output(test, msg, output)
+ return TestResults.single_test_failed(test.name)
+
+ msg += 'succeeded.'
+ self.persist_output(test, msg, output)
+ logger.debug(msg)
+ return TestResults.single_test_succeeded(test.name)
+
except subprocess.TimeoutExpired as e:
- msg = f'{self.get_name()}: {test.name} ({test.cmdline}) timed out after {e.timeout:.1f} seconds.'
+ msg += f'timed out after {e.timeout:.1f} seconds.'
logger.error(msg)
logger.debug(
'%s: %s output when it timed out: %s',
e.output,
)
self.persist_output(test, msg, e.output.decode('utf-8'))
- return TestResults(
- test.name,
- [test.name],
- [],
- [],
- [test.name],
- )
+ return TestResults.single_test_timed_out(test.name)
+
except subprocess.CalledProcessError as e:
- msg = f'{self.get_name()}: {test.name} ({test.cmdline}) failed; exit code {e.returncode}'
+ msg += f'failed with exit code {e.returncode}.'
logger.error(msg)
logger.debug(
'%s: %s output when it failed: %s', self.get_name(), test.name, e.output
)
self.persist_output(test, msg, e.output.decode('utf-8'))
- return TestResults(
- test.name,
- [test.name],
- [],
- [test.name],
- [],
- )
+ return TestResults.single_test_failed(test.name)
@overrides
def begin(self, params: TestingParameters) -> TestResults:
self.get_name(),
test_to_run.name,
)
- self.tests_started += 1
+ self.test_results.tests_executed[test_to_run.name] = time.time()
- for future in smart_future.wait_any(running):
- self.check_for_abort()
- result = future._resolve()
+ for result in smart_future.wait_any(running, log_exceptions=False):
logger.debug('Test %s finished.', result.name)
self.test_results += result
- logger.debug('Thread %s finished.', self.get_name())
+ if self.check_for_abort():
+ logger.debug(
+ '%s: check_for_abort told us to exit early.', self.get_name()
+ )
+ return self.test_results
+
+ logger.debug('Thread %s finished running all tests', self.get_name())
return self.test_results
if basename in PERF_SENSATIVE_TESTS:
ret.append(
TestToRun(
- name=basename,
+ name=f'{basename}_no_coverage',
kind='unittest w/o coverage to record perf',
cmdline=f'{test} 2>&1',
)
if basename in PERF_SENSATIVE_TESTS:
ret.append(
TestToRun(
- name=basename,
+ name=f'{basename}_no_coverage',
kind='doctest w/o coverage to record perf',
cmdline=f'python3 {test} 2>&1',
)
if basename in PERF_SENSATIVE_TESTS:
ret.append(
TestToRun(
- name=basename,
+ name=f'{basename}_no_coverage',
kind='integration test w/o coverage to capture perf',
cmdline=f'{test} 2>&1',
)
return self.execute_commandline(test)
-def test_results_report(results: Dict[str, TestResults]) -> int:
+def test_results_report(results: Dict[str, Optional[TestResults]]) -> int:
"""Give a final report about the tests that were run."""
total_problems = 0
for result in results.values():
- print(result, end='')
- total_problems += len(result.tests_failed)
- total_problems += len(result.tests_timed_out)
+ if result is None:
+ print('Unexpected unhandled exception in test runner!!!')
+ total_problems += 1
+ else:
+ print(result, end='')
+ total_problems += len(result.tests_failed)
+ total_problems += len(result.tests_timed_out)
if total_problems > 0:
print('Reminder: look in ./test_output to view test output logs')
)
print(out)
print(
- """To recall this report w/o re-running the tests:
+ f"""To recall this report w/o re-running the tests:
- $ coverage report --omit=config-3.*.py,*_test.py,*_itest.py --sort=-cover
+ $ {ansi.bold()}coverage report --omit=config-3.*.py,*_test.py,*_itest.py --sort=-cover{ansi.reset()}
...from the 'tests' directory. Note that subsequent calls to
run_tests.py with --coverage will klobber previous results. See:
@bootstrap.initialize
def main() -> Optional[int]:
saw_flag = False
- halt_event = threading.Event()
threads: List[TestRunner] = []
+ halt_event = threading.Event()
halt_event.clear()
params = TestingParameters(
halt_on_error=True,
if config.config['coverage']:
logger.debug('Clearing existing coverage data via "coverage erase".')
exec_utils.cmd('coverage erase')
-
if config.config['unittests'] or config.config['all']:
saw_flag = True
threads.append(UnittestTestRunner(params))
if not saw_flag:
config.print_usage()
- print('ERROR: one of --unittests, --doctests or --integration is required.')
- return 1
+ config.error('One of --unittests, --doctests or --integration is required.', 1)
for thread in threads:
thread.start()
- results: Dict[str, TestResults] = {}
+ start_time = time.time()
+ last_update = start_time
+ results: Dict[str, Optional[TestResults]] = {}
+ still_running = {}
+
while len(results) != len(threads):
started = 0
done = 0
failed = 0
for thread in threads:
- (s, tr) = thread.get_status()
- started += s
+ tid = thread.name
+ tr = thread.get_status()
+ started += len(tr.tests_executed)
failed += len(tr.tests_failed) + len(tr.tests_timed_out)
done += failed + len(tr.tests_succeeded)
+ running = set(tr.tests_executed.keys())
+ running -= set(tr.tests_failed)
+ running -= set(tr.tests_succeeded)
+ running -= set(tr.tests_timed_out)
+ running_with_start_time = {
+ test: tr.tests_executed[test] for test in running
+ }
+ still_running[tid] = running_with_start_time
+
+ # Maybe print tests that are still running.
+ now = time.time()
+ if now - start_time > 5.0:
+ if now - last_update > 3.0:
+ last_update = now
+ update = []
+ for _, running_dict in still_running.items():
+ for test_name, start_time in running_dict.items():
+ if now - start_time > 10.0:
+ update.append(f'{test_name}@{now-start_time:.1f}s')
+ else:
+ update.append(test_name)
+ print(f'\r{ansi.clear_line()}')
+ if len(update) < 4:
+ print(f'Still running: {",".join(update)}')
+ else:
+ print(f'Still running: {len(update)} tests.')
+
+ # Maybe signal the other threads to stop too.
if not thread.is_alive():
- tid = thread.name
if tid not in results:
result = thread.join()
if result:
results[tid] = result
- if len(result.tests_failed) > 0:
+ if (len(result.tests_failed) + len(result.tests_timed_out)) > 0:
logger.error(
'Thread %s returned abnormal results; killing the others.',
tid,
)
halt_event.set()
+ else:
+ logger.error(
+ 'Thread %s took an unhandled exception... bug in run_tests.py?! Aborting.',
+ tid,
+ )
+ halt_event.set()
+ results[tid] = None
+
+ color = ansi.fg('green')
+ if failed > 0:
+ color = ansi.fg('red')
if started > 0:
- percent_done = done / started
+ percent_done = done / started * 100.0
else:
percent_done = 0.0
- if failed == 0:
- color = ansi.fg('green')
- else:
- color = ansi.fg('red')
-
if percent_done < 100.0:
print(
text_utils.bar_graph_string(
done,
started,
text=text_utils.BarGraphText.FRACTION,
- width=80,
+ width=72,
fgcolor=color,
),
- end='\r',
+ end='',
flush=True,
)
- time.sleep(0.5)
+ print(f' {color}{now - start_time:.1f}s{ansi.reset()}', end='\r')
+ time.sleep(0.1)
- print(f'{ansi.clear_line()}Final Report:')
+ print(f'{ansi.clear_line()}\n{ansi.underline()}Final Report:{ansi.reset()}')
if config.config['coverage']:
code_coverage_report()
+ print(f'Test suite runtime: {time.time() - start_time:.1f}s')
total_problems = test_results_report(results)
+ if total_problems > 0:
+ logging.error(
+ 'Exiting with non-zero return code %d due to problems.', total_problems
+ )
return total_problems