-def wait_any(futures: List[SmartFuture], *, callback: Callable = None):
- finished: Mapping[int, bool] = {}
- x = 0
- while True:
- future = futures[x]
- if not finished.get(future.get_id(), False):
- if future.is_ready():
- finished[future.get_id()] = True
- yield future
- else:
+def wait_any(
+ futures: List[SmartFuture],
+ *,
+ callback: Callable = None,
+ log_exceptions: bool = True,
+ timeout: float = None,
+):
+ """Await the completion of any of a collection of SmartFutures and
+ invoke callback each time one completes, repeatedly, until they are
+ all finished.
+
+ Args:
+ futures: A collection of SmartFutures to wait on
+ callback: An optional callback to invoke whenever one of the
+ futures completes
+ log_exceptions: Should we log (warning + exception) any
+ underlying exceptions raised during future processing or
+ silently ignore then?
+ timeout: invoke callback with a periodicity of timeout while
+ awaiting futures
+ """
+
+ real_futures = []
+ smart_future_by_real_future = {}
+ completed_futures: Set[fut.Future] = set()
+ for x in futures:
+ assert isinstance(x, SmartFuture)
+ real_futures.append(x.wrapped_future)
+ smart_future_by_real_future[x.wrapped_future] = x
+
+ while len(completed_futures) != len(real_futures):
+ try:
+ newly_completed_futures = concurrent.futures.as_completed(real_futures, timeout=timeout)
+ for f in newly_completed_futures: