import time
from abc import ABC, abstractmethod
from dataclasses import dataclass
-from typing import Any, Dict, List, Optional, Tuple
+from typing import Any, Dict, List, Optional
from overrides import overrides
from pyutils.parallelize import smart_future, thread_utils
logger = logging.getLogger(__name__)
-args = config.add_commandline_args(f'({__file__})', f'Args related to {__file__}')
+args = config.add_commandline_args(
+ f'Run Tests Driver ({__file__})', f'Args related to {__file__}'
+)
args.add_argument('--unittests', '-u', action='store_true', help='Run unittests.')
args.add_argument('--doctests', '-d', action='store_true', help='Run doctests.')
args.add_argument(
tests_failed=[],
tests_timed_out=[],
)
- self.tests_started = 0
self.lock = threading.Lock()
@abstractmethod
"""The name of this test collection."""
pass
- def get_status(self) -> Tuple[int, TestResults]:
+ def get_status(self) -> TestResults:
"""Ask the TestRunner for its status."""
with self.lock:
- return (self.tests_started, self.test_results)
+ return self.test_results
@abstractmethod
def begin(self, params: TestingParameters) -> TestResults:
self.persist_output(test, msg, output)
return TestResults(
test.name,
- [test.name],
+ [],
[],
[test.name],
[],
logger.debug(
'%s: %s (%s) succeeded', self.get_name(), test.name, test.cmdline
)
- return TestResults(test.name, [test.name], [test.name], [], [])
+ return TestResults(test.name, [], [test.name], [], [])
except subprocess.TimeoutExpired as e:
msg = f'{self.get_name()}: {test.name} ({test.cmdline}) timed out after {e.timeout:.1f} seconds.'
logger.error(msg)
self.persist_output(test, msg, e.output.decode('utf-8'))
return TestResults(
test.name,
- [test.name],
+ [],
[],
[],
[test.name],
self.persist_output(test, msg, e.output.decode('utf-8'))
return TestResults(
test.name,
- [test.name],
+ [],
[],
[test.name],
[],
self.get_name(),
test_to_run.name,
)
- self.tests_started += 1
+ self.test_results.tests_executed.append(test_to_run.name)
for future in smart_future.wait_any(running, log_exceptions=False):
result = future._resolve()
thread.start()
results: Dict[str, Optional[TestResults]] = {}
+ start_time = time.time()
+ last_update = start_time
+ still_running = {}
+
while len(results) != len(threads):
started = 0
done = 0
failed = 0
for thread in threads:
- (s, tr) = thread.get_status()
- started += s
+ tid = thread.name
+ tr = thread.get_status()
+ started += len(tr.tests_executed)
failed += len(tr.tests_failed) + len(tr.tests_timed_out)
done += failed + len(tr.tests_succeeded)
+ running = set(tr.tests_executed)
+ running -= set(tr.tests_failed)
+ running -= set(tr.tests_succeeded)
+ running -= set(tr.tests_timed_out)
+ still_running[tid] = running
+
+ if time.time() - start_time > 5.0:
+ if time.time() - last_update > 3.0:
+ last_update = time.time()
+ update = []
+ for _, running_set in still_running.items():
+ for test_name in running_set:
+ update.append(test_name)
+ print(f'\r{ansi.clear_line()}')
+ if len(update) < 5:
+ print(f'Still running: {" ".join(update)}')
+ else:
+ print(f'Still running: {len(update)} tests.')
+
if not thread.is_alive():
- tid = thread.name
if tid not in results:
result = thread.join()
if result: