*,
callback: Callable = None,
log_exceptions: bool = True,
+ timeout: float = None,
):
"""Await the completion of any of a collection of SmartFutures and
invoke callback each time one completes, repeatedly, until they are
log_exceptions: Should we log (warning + exception) any
underlying exceptions raised during future processing or
silently ignore then?
+ timeout: invoke callback with a periodicity of timeout while
+ awaiting futures
"""
real_futures = []
smart_future_by_real_future[x.wrapped_future] = x
while len(completed_futures) != len(real_futures):
- newly_completed_futures = concurrent.futures.as_completed(real_futures)
- for f in newly_completed_futures:
+ print("TOP...")
+ try:
+ newly_completed_futures = concurrent.futures.as_completed(real_futures, timeout=timeout)
+ for f in newly_completed_futures:
+ if callback is not None:
+ callback()
+ completed_futures.add(f)
+ if log_exceptions and not f.cancelled():
+ exception = f.exception()
+ if exception is not None:
+ logger.warning(
+ 'Future 0x%x raised an unhandled exception and exited.', id(f)
+ )
+ logger.exception(exception)
+ raise exception
+ yield smart_future_by_real_future[f]
+ except TimeoutError:
+ print(f"HERE!!! {len(completed_futures)} / {len(real_futures)}.")
if callback is not None:
callback()
- completed_futures.add(f)
- if log_exceptions and not f.cancelled():
- exception = f.exception()
- if exception is not None:
- logger.warning('Future 0x%x raised an unhandled exception and exited.', id(f))
- logger.exception(exception)
- raise exception
- yield smart_future_by_real_future[f]
if callback is not None:
callback()