# © Copyright 2021-2022, Scott Gasch
-"""A :class:Future that can be treated as a substutute for the result
+"""A :class:`Future` that can be treated as a substutute for the result
that it contains and will not block until it is used. At that point,
if the underlying value is not yet available yet, it will block until
the internal result actually becomes available.
*,
callback: Callable = None,
log_exceptions: bool = True,
+ timeout: float = None,
):
"""Await the completion of any of a collection of SmartFutures and
invoke callback each time one completes, repeatedly, until they are
log_exceptions: Should we log (warning + exception) any
underlying exceptions raised during future processing or
silently ignore then?
+ timeout: invoke callback with a periodicity of timeout while
+ awaiting futures
"""
real_futures = []
smart_future_by_real_future[x.wrapped_future] = x
while len(completed_futures) != len(real_futures):
- newly_completed_futures = concurrent.futures.as_completed(real_futures)
- for f in newly_completed_futures:
+ try:
+ newly_completed_futures = concurrent.futures.as_completed(real_futures, timeout=timeout)
+ for f in newly_completed_futures:
+ if callback is not None:
+ callback()
+ completed_futures.add(f)
+ if log_exceptions and not f.cancelled():
+ exception = f.exception()
+ if exception is not None:
+ logger.warning(
+ 'Future 0x%x raised an unhandled exception and exited.', id(f)
+ )
+ logger.exception(exception)
+ raise exception
+ yield smart_future_by_real_future[f]
+ except TimeoutError:
if callback is not None:
callback()
- completed_futures.add(f)
- if log_exceptions and not f.cancelled():
- exception = f.exception()
- if exception is not None:
- logger.warning('Future 0x%x raised an unhandled exception and exited.', id(f))
- logger.exception(exception)
- raise exception
- yield smart_future_by_real_future[f]
if callback is not None:
callback()