+ completed_futures.add(f)
+ if log_exceptions and not f.cancelled():
+ exception = f.exception()
+ if exception is not None:
+ logger.warning(
+ f'Future {id(f)} raised an unhandled exception and exited.'
+ )
+ logger.exception(exception)
+ raise exception
+ yield smart_future_by_real_future[f]
+ if callback is not None:
+ callback()
+ return
+
+
+def wait_all(
+ futures: List[SmartFuture],
+ *,
+ log_exceptions: bool = True,
+) -> None:
+ real_futures = []
+ for x in futures:
+ assert type(x) == SmartFuture
+ real_futures.append(x.wrapped_future)
+
+ (done, not_done) = concurrent.futures.wait(
+ real_futures, timeout=None, return_when=concurrent.futures.ALL_COMPLETED
+ )
+ if log_exceptions:
+ for f in real_futures:
+ if not f.cancelled():
+ exception = f.exception()
+ if exception is not None:
+ logger.warning(
+ f'Future {id(f)} raised an unhandled exception and exited.'
+ )
+ logger.exception(exception)
+ raise exception
+ assert len(done) == len(real_futures)
+ assert len(not_done) == 0