3 from __future__ import annotations
6 import concurrent.futures as fut
9 from typing import Callable, List, Set, TypeVar
11 from overrides import overrides
15 # This module is commonly used by others in here and should avoid
16 # taking any unnecessary dependencies back on them.
17 from deferred_operand import DeferredOperand
19 logger = logging.getLogger(__name__)
25 futures: List[SmartFuture],
27 callback: Callable = None,
28 log_exceptions: bool = True,
31 smart_future_by_real_future = {}
32 completed_futures: Set[fut.Future] = set()
34 assert type(x) == SmartFuture
35 real_futures.append(x.wrapped_future)
36 smart_future_by_real_future[x.wrapped_future] = x
38 while len(completed_futures) != len(real_futures):
39 newly_completed_futures = concurrent.futures.as_completed(real_futures)
40 for f in newly_completed_futures:
41 if callback is not None:
43 completed_futures.add(f)
44 if log_exceptions and not f.cancelled():
45 exception = f.exception()
46 if exception is not None:
48 f'Future {id(f)} raised an unhandled exception and exited.'
50 logger.exception(exception)
52 yield smart_future_by_real_future[f]
53 if callback is not None:
59 futures: List[SmartFuture],
61 log_exceptions: bool = True,
65 assert type(x) == SmartFuture
66 real_futures.append(x.wrapped_future)
68 (done, not_done) = concurrent.futures.wait(
69 real_futures, timeout=None, return_when=concurrent.futures.ALL_COMPLETED
72 for f in real_futures:
74 exception = f.exception()
75 if exception is not None:
77 f'Future {id(f)} raised an unhandled exception and exited.'
79 logger.exception(exception)
81 assert len(done) == len(real_futures)
82 assert len(not_done) == 0
85 class SmartFuture(DeferredOperand):
86 """This is a SmartFuture, a class that wraps a normal Future and can
87 then be used, mostly, like a normal (non-Future) identifier.
89 Using a FutureWrapper in expressions will block and wait until
90 the result of the deferred operation is known.
93 def __init__(self, wrapped_future: fut.Future) -> None:
94 assert type(wrapped_future) == fut.Future
95 self.wrapped_future = wrapped_future
96 self.id = id_generator.get("smart_future_id")
98 def get_id(self) -> int:
101 def is_ready(self) -> bool:
102 return self.wrapped_future.done()
104 # You shouldn't have to call this; instead, have a look at defining a
105 # method on DeferredOperand base class.
107 def _resolve(self, *, timeout=None) -> T:
108 return self.wrapped_future.result(timeout)