3 from __future__ import annotations
5 import concurrent.futures as fut
8 from typing import Callable, List, TypeVar
10 from overrides import overrides
12 # This module is commonly used by others in here and should avoid
13 # taking any unnecessary dependencies back on them.
14 from deferred_operand import DeferredOperand
17 logger = logging.getLogger(__name__)
23 futures: List[SmartFuture],
25 callback: Callable = None,
26 log_exceptions: bool = True,
29 smart_future_by_real_future = {}
30 completed_futures = set()
32 real_futures.append(_.wrapped_future)
33 smart_future_by_real_future[_.wrapped_future] = _
34 while len(completed_futures) != len(real_futures):
35 newly_completed_futures = concurrent.futures.as_completed(real_futures)
36 for f in newly_completed_futures:
37 if callback is not None:
39 completed_futures.add(f)
40 if log_exceptions and not f.cancelled():
41 exception = f.exception()
42 if exception is not None:
44 f'Future {id(f)} raised an unhandled exception and exited.'
46 logger.exception(exception)
48 yield smart_future_by_real_future[f]
49 if callback is not None:
55 futures: List[SmartFuture],
57 log_exceptions: bool = True,
59 real_futures = [x.wrapped_future for x in futures]
60 (done, not_done) = concurrent.futures.wait(
61 real_futures, timeout=None, return_when=concurrent.futures.ALL_COMPLETED
64 for f in real_futures:
66 exception = f.exception()
67 if exception is not None:
69 f'Future {id(f)} raised an unhandled exception and exited.'
71 logger.exception(exception)
73 assert len(done) == len(real_futures)
74 assert len(not_done) == 0
77 class SmartFuture(DeferredOperand):
78 """This is a SmartFuture, a class that wraps a normal Future and can
79 then be used, mostly, like a normal (non-Future) identifier.
81 Using a FutureWrapper in expressions will block and wait until
82 the result of the deferred operation is known.
85 def __init__(self, wrapped_future: fut.Future) -> None:
86 self.wrapped_future = wrapped_future
87 self.id = id_generator.get("smart_future_id")
89 def get_id(self) -> int:
92 def is_ready(self) -> bool:
93 return self.wrapped_future.done()
95 # You shouldn't have to call this; instead, have a look at defining a
96 # method on DeferredOperand base class.
98 def _resolve(self, *, timeout=None) -> T:
99 return self.wrapped_future.result(timeout)