import time
from abc import ABC, abstractmethod
from dataclasses import dataclass
-from typing import Any, Dict, List, Optional, Tuple
+from typing import Any, Dict, List, Optional
from overrides import overrides
-from pyutils import ansi, bootstrap, config, exec_utils, text_utils
+from pyutils import ansi, bootstrap, config, dict_utils, exec_utils, text_utils
from pyutils.files import file_utils
from pyutils.parallelize import parallelize as par
from pyutils.parallelize import smart_future, thread_utils
logger = logging.getLogger(__name__)
-args = config.add_commandline_args(f'({__file__})', f'Args related to {__file__}')
+args = config.add_commandline_args(
+ f'Run Tests Driver ({__file__})', f'Args related to {__file__}'
+)
args.add_argument('--unittests', '-u', action='store_true', help='Run unittests.')
args.add_argument('--doctests', '-d', action='store_true', help='Run doctests.')
args.add_argument(
name: str
"""The name of this test / set of tests."""
- tests_executed: List[str]
+ tests_executed: Dict[str, float]
"""Tests that were executed."""
tests_succeeded: List[str]
"""Tests that timed out."""
def __add__(self, other):
- self.tests_executed.extend(other.tests_executed)
+ merged = dict_utils.coalesce(
+ [self.tests_executed, other.tests_executed],
+ aggregation_function=dict_utils.raise_on_duplicated_keys,
+ )
+ self.tests_executed = merged
self.tests_succeeded.extend(other.tests_succeeded)
self.tests_failed.extend(other.tests_failed)
self.tests_timed_out.extend(other.tests_timed_out)
out += '\n'
if len(self.tests_timed_out) > 0:
- out += f' ..{ansi.fg("yellow")}'
+ out += f' ..{ansi.fg("lightning yellow")}'
out += f'{len(self.tests_timed_out)} tests timed out'
out += f'{ansi.reset()}:\n'
for test in self.tests_failed:
self.params = params
self.test_results = TestResults(
name=self.get_name(),
- tests_executed=[],
+ tests_executed={},
tests_succeeded=[],
tests_failed=[],
tests_timed_out=[],
)
- self.tests_started = 0
self.lock = threading.Lock()
@abstractmethod
"""The name of this test collection."""
pass
- def get_status(self) -> Tuple[int, TestResults]:
+ def get_status(self) -> TestResults:
"""Ask the TestRunner for its status."""
with self.lock:
- return (self.tests_started, self.test_results)
+ return self.test_results
@abstractmethod
def begin(self, params: TestingParameters) -> TestResults:
self.persist_output(test, msg, output)
return TestResults(
test.name,
- [test.name],
+ {},
[],
[test.name],
[],
logger.debug(
'%s: %s (%s) succeeded', self.get_name(), test.name, test.cmdline
)
- return TestResults(test.name, [test.name], [test.name], [], [])
+ return TestResults(test.name, {}, [test.name], [], [])
except subprocess.TimeoutExpired as e:
msg = f'{self.get_name()}: {test.name} ({test.cmdline}) timed out after {e.timeout:.1f} seconds.'
logger.error(msg)
self.persist_output(test, msg, e.output.decode('utf-8'))
return TestResults(
test.name,
- [test.name],
+ {},
[],
[],
[test.name],
self.persist_output(test, msg, e.output.decode('utf-8'))
return TestResults(
test.name,
- [test.name],
+ {},
[],
[test.name],
[],
self.get_name(),
test_to_run.name,
)
- self.tests_started += 1
+ self.test_results.tests_executed[test_to_run.name] = time.time()
- for future in smart_future.wait_any(running, log_exceptions=False):
- result = future._resolve()
+ for result in smart_future.wait_any(running, log_exceptions=False):
logger.debug('Test %s finished.', result.name)
+
+ # We sometimes run the same test more than once. Do not allow
+ # one run's results to klobber the other's.
self.test_results += result
if self.check_for_abort():
logger.debug(
if basename in PERF_SENSATIVE_TESTS:
ret.append(
TestToRun(
- name=basename,
+ name=f'{basename}_no_coverage',
kind='unittest w/o coverage to record perf',
cmdline=f'{test} 2>&1',
)
if basename in PERF_SENSATIVE_TESTS:
ret.append(
TestToRun(
- name=basename,
+ name=f'{basename}_no_coverage',
kind='doctest w/o coverage to record perf',
cmdline=f'python3 {test} 2>&1',
)
if basename in PERF_SENSATIVE_TESTS:
ret.append(
TestToRun(
- name=basename,
+ name=f'{basename}_no_coverage',
kind='integration test w/o coverage to capture perf',
cmdline=f'{test} 2>&1',
)
thread.start()
results: Dict[str, Optional[TestResults]] = {}
+ start_time = time.time()
+ last_update = start_time
+ still_running = {}
+
while len(results) != len(threads):
started = 0
done = 0
failed = 0
for thread in threads:
- (s, tr) = thread.get_status()
- started += s
+ tid = thread.name
+ tr = thread.get_status()
+ started += len(tr.tests_executed)
failed += len(tr.tests_failed) + len(tr.tests_timed_out)
done += failed + len(tr.tests_succeeded)
+ running = set(tr.tests_executed.keys())
+ running -= set(tr.tests_failed)
+ running -= set(tr.tests_succeeded)
+ running -= set(tr.tests_timed_out)
+ running_with_start_time = {
+ test: tr.tests_executed[test] for test in running
+ }
+ still_running[tid] = running_with_start_time
+
+ now = time.time()
+ if now - start_time > 5.0:
+ if now - last_update > 3.0:
+ last_update = now
+ update = []
+ for _, running_dict in still_running.items():
+ for test_name, start_time in running_dict.items():
+ if now - start_time > 10.0:
+ update.append(f'{test_name}@{now-start_time:.1f}s')
+ else:
+ update.append(test_name)
+ print(f'\r{ansi.clear_line()}')
+ if len(update) < 4:
+ print(f'Still running: {",".join(update)}')
+ else:
+ print(f'Still running: {len(update)} tests.')
+
if not thread.is_alive():
- tid = thread.name
if tid not in results:
result = thread.join()
if result:
halt_event.set()
results[tid] = None
- if started > 0:
- percent_done = done / started
- else:
- percent_done = 0.0
-
if failed == 0:
color = ansi.fg('green')
else:
color = ansi.fg('red')
+ if started > 0:
+ percent_done = done / started * 100.0
+ else:
+ percent_done = 0.0
+
if percent_done < 100.0:
print(
text_utils.bar_graph_string(
done,
started,
text=text_utils.BarGraphText.FRACTION,
- width=80,
+ width=72,
fgcolor=color,
),
- end='\r',
+ end='',
flush=True,
)
- time.sleep(0.5)
+ print(f' {color}{now - start_time:.1f}s{ansi.reset()}', end='\r')
+ time.sleep(0.1)
print(f'{ansi.clear_line()}Final Report:')
if config.config['coverage']:
code_coverage_report()
total_problems = test_results_report(results)
+ if total_problems > 0:
+ logging.error(
+ 'Exiting with non-zero return code %d due to problems.', total_problems
+ )
return total_problems