import time
from abc import ABC, abstractmethod
from dataclasses import dataclass
-from typing import Any, Dict, List, Optional, Tuple
+from typing import Any, Dict, List, Optional
from overrides import overrides
-from pyutils import ansi, bootstrap, config, exec_utils, text_utils
+from pyutils import ansi, bootstrap, config, dict_utils, exec_utils, text_utils
from pyutils.files import file_utils
from pyutils.parallelize import parallelize as par
from pyutils.parallelize import smart_future, thread_utils
logger = logging.getLogger(__name__)
-args = config.add_commandline_args(f'({__file__})', f'Args related to {__file__}')
+args = config.add_commandline_args(
+ f'Run Tests Driver ({__file__})', f'Args related to {__file__}'
+)
args.add_argument('--unittests', '-u', action='store_true', help='Run unittests.')
args.add_argument('--doctests', '-d', action='store_true', help='Run doctests.')
args.add_argument(
# they pay attention to code performance which is adversely affected
# by coverage.
PERF_SENSATIVE_TESTS = set(['string_utils_test.py'])
-TESTS_TO_SKIP = set(['zookeeper_test.py', 'run_tests.py'])
+TESTS_TO_SKIP = set(['zookeeper_test.py', 'zookeeper.py', 'run_tests.py'])
ROOT = ".."
name: str
"""The name of this test / set of tests."""
- tests_executed: List[str]
+ tests_executed: Dict[str, float]
"""Tests that were executed."""
tests_succeeded: List[str]
"""Tests that timed out."""
def __add__(self, other):
- self.tests_executed.extend(other.tests_executed)
+ merged = dict_utils.coalesce(
+ [self.tests_executed, other.tests_executed],
+ aggregation_function=dict_utils.raise_on_duplicated_keys,
+ )
+ self.tests_executed = merged
self.tests_succeeded.extend(other.tests_succeeded)
self.tests_failed.extend(other.tests_failed)
self.tests_timed_out.extend(other.tests_timed_out)
out += '\n'
if len(self.tests_timed_out) > 0:
- out += f' ..{ansi.fg("yellow")}'
+ out += f' ..{ansi.fg("lightning yellow")}'
out += f'{len(self.tests_timed_out)} tests timed out'
out += f'{ansi.reset()}:\n'
for test in self.tests_failed:
self.params = params
self.test_results = TestResults(
name=self.get_name(),
- tests_executed=[],
+ tests_executed={},
tests_succeeded=[],
tests_failed=[],
tests_timed_out=[],
)
- self.tests_started = 0
self.lock = threading.Lock()
@abstractmethod
"""The name of this test collection."""
pass
- def get_status(self) -> Tuple[int, TestResults]:
+ def get_status(self) -> TestResults:
"""Ask the TestRunner for its status."""
with self.lock:
- return (self.tests_started, self.test_results)
+ return self.test_results
@abstractmethod
def begin(self, params: TestingParameters) -> TestResults:
"""Run a single test and return its TestResults."""
pass
- def check_for_abort(self):
+ def check_for_abort(self) -> bool:
"""Periodically caled to check to see if we need to stop."""
if self.params.halt_event.is_set():
logger.debug('Thread %s saw halt event; exiting.', self.get_name())
- raise Exception("Kill myself!")
- if self.params.halt_on_error:
- if len(self.test_results.tests_failed) > 0:
- logger.error(
- 'Thread %s saw abnormal results; exiting.', self.get_name()
- )
- raise Exception("Kill myself!")
+ return True
+
+ if self.params.halt_on_error and len(self.test_results.tests_failed) > 0:
+ logger.error('Thread %s saw abnormal results; exiting.', self.get_name())
+ return True
+ return False
def persist_output(self, test: TestToRun, message: str, output: str) -> None:
"""Called to save the output of a test run."""
test.cmdline,
timeout_seconds=timeout,
)
+ if "***Test Failed***" in output:
+ msg = f'{self.get_name()}: {test.name} ({test.cmdline}) failed; doctest failure message detected'
+ logger.error(msg)
+ self.persist_output(test, msg, output)
+ return TestResults(
+ test.name,
+ {},
+ [],
+ [test.name],
+ [],
+ )
+
self.persist_output(
test, f'{test.name} ({test.cmdline}) succeeded.', output
)
logger.debug(
'%s: %s (%s) succeeded', self.get_name(), test.name, test.cmdline
)
- return TestResults(test.name, [test.name], [test.name], [], [])
+ return TestResults(test.name, {}, [test.name], [], [])
except subprocess.TimeoutExpired as e:
msg = f'{self.get_name()}: {test.name} ({test.cmdline}) timed out after {e.timeout:.1f} seconds.'
logger.error(msg)
self.persist_output(test, msg, e.output.decode('utf-8'))
return TestResults(
test.name,
- [test.name],
+ {},
[],
[],
[test.name],
self.persist_output(test, msg, e.output.decode('utf-8'))
return TestResults(
test.name,
- [test.name],
+ {},
[],
[test.name],
[],
self.get_name(),
test_to_run.name,
)
- self.tests_started += 1
+ self.test_results.tests_executed[test_to_run.name] = time.time()
- for future in smart_future.wait_any(running):
- self.check_for_abort()
- result = future._resolve()
+ for result in smart_future.wait_any(running, log_exceptions=False):
logger.debug('Test %s finished.', result.name)
+
+ # We sometimes run the same test more than once. Do not allow
+ # one run's results to klobber the other's.
self.test_results += result
+ if self.check_for_abort():
+ logger.debug(
+ '%s: check_for_abort told us to exit early.', self.get_name()
+ )
+ return self.test_results
- logger.debug('Thread %s finished.', self.get_name())
+ logger.debug('Thread %s finished running all tests', self.get_name())
return self.test_results
if basename in PERF_SENSATIVE_TESTS:
ret.append(
TestToRun(
- name=basename,
+ name=f'{basename}_no_coverage',
kind='unittest w/o coverage to record perf',
cmdline=f'{test} 2>&1',
)
if basename in PERF_SENSATIVE_TESTS:
ret.append(
TestToRun(
- name=basename,
+ name=f'{basename}_no_coverage',
kind='doctest w/o coverage to record perf',
cmdline=f'python3 {test} 2>&1',
)
if basename in PERF_SENSATIVE_TESTS:
ret.append(
TestToRun(
- name=basename,
+ name=f'{basename}_no_coverage',
kind='integration test w/o coverage to capture perf',
cmdline=f'{test} 2>&1',
)
return self.execute_commandline(test)
-def test_results_report(results: Dict[str, TestResults]) -> int:
+def test_results_report(results: Dict[str, Optional[TestResults]]) -> int:
"""Give a final report about the tests that were run."""
total_problems = 0
for result in results.values():
- print(result, end='')
- total_problems += len(result.tests_failed)
- total_problems += len(result.tests_timed_out)
+ if result is None:
+ print('Unexpected unhandled exception in test runner!!!')
+ total_problems += 1
+ else:
+ print(result, end='')
+ total_problems += len(result.tests_failed)
+ total_problems += len(result.tests_timed_out)
if total_problems > 0:
print('Reminder: look in ./test_output to view test output logs')
for thread in threads:
thread.start()
- results: Dict[str, TestResults] = {}
+ results: Dict[str, Optional[TestResults]] = {}
+ start_time = time.time()
+ last_update = start_time
+ still_running = {}
+
while len(results) != len(threads):
started = 0
done = 0
failed = 0
for thread in threads:
- (s, tr) = thread.get_status()
- started += s
+ tid = thread.name
+ tr = thread.get_status()
+ started += len(tr.tests_executed)
failed += len(tr.tests_failed) + len(tr.tests_timed_out)
done += failed + len(tr.tests_succeeded)
+ running = set(tr.tests_executed.keys())
+ running -= set(tr.tests_failed)
+ running -= set(tr.tests_succeeded)
+ running -= set(tr.tests_timed_out)
+ running_with_start_time = {
+ test: tr.tests_executed[test] for test in running
+ }
+ still_running[tid] = running_with_start_time
+
+ now = time.time()
+ if now - start_time > 5.0:
+ if now - last_update > 3.0:
+ last_update = now
+ update = []
+ for _, running_dict in still_running.items():
+ for test_name, start_time in running_dict.items():
+ if now - start_time > 10.0:
+ update.append(f'{test_name}@{now-start_time:.1f}s')
+ else:
+ update.append(test_name)
+ print(f'\r{ansi.clear_line()}')
+ if len(update) < 4:
+ print(f'Still running: {",".join(update)}')
+ else:
+ print(f'Still running: {len(update)} tests.')
+
if not thread.is_alive():
- tid = thread.name
if tid not in results:
result = thread.join()
if result:
tid,
)
halt_event.set()
-
- if started > 0:
- percent_done = done / started
- else:
- percent_done = 0.0
+ else:
+ logger.error(
+ 'Thread %s took an unhandled exception... bug in run_tests.py?! Aborting.',
+ tid,
+ )
+ halt_event.set()
+ results[tid] = None
if failed == 0:
color = ansi.fg('green')
else:
color = ansi.fg('red')
+ if started > 0:
+ percent_done = done / started * 100.0
+ else:
+ percent_done = 0.0
+
if percent_done < 100.0:
print(
text_utils.bar_graph_string(
done,
started,
text=text_utils.BarGraphText.FRACTION,
- width=80,
+ width=72,
fgcolor=color,
),
- end='\r',
+ end='',
flush=True,
)
- time.sleep(0.5)
+ print(f' {color}{now - start_time:.1f}s{ansi.reset()}', end='\r')
+ time.sleep(0.1)
print(f'{ansi.clear_line()}Final Report:')
if config.config['coverage']:
code_coverage_report()
total_problems = test_results_report(results)
+ if total_problems > 0:
+ logging.error(
+ 'Exiting with non-zero return code %d due to problems.', total_problems
+ )
return total_problems