3 # © Copyright 2021-2022, Scott Gasch
5 """A future that can be treated like the result that it contains and
6 will not block until it is used. At that point, if the underlying
7 value is not yet available, it will block until it becomes
12 from __future__ import annotations
14 import concurrent.futures as fut
16 from typing import Callable, List, Set, TypeVar
18 from overrides import overrides
22 # This module is commonly used by others in here and should avoid
23 # taking any unnecessary dependencies back on them.
24 from deferred_operand import DeferredOperand
26 logger = logging.getLogger(__name__)
32 futures: List[SmartFuture],
34 callback: Callable = None,
35 log_exceptions: bool = True,
38 smart_future_by_real_future = {}
39 completed_futures: Set[fut.Future] = set()
41 assert isinstance(x, SmartFuture)
42 real_futures.append(x.wrapped_future)
43 smart_future_by_real_future[x.wrapped_future] = x
45 while len(completed_futures) != len(real_futures):
46 newly_completed_futures = concurrent.futures.as_completed(real_futures)
47 for f in newly_completed_futures:
48 if callback is not None:
50 completed_futures.add(f)
51 if log_exceptions and not f.cancelled():
52 exception = f.exception()
53 if exception is not None:
54 logger.warning('Future 0x%x raised an unhandled exception and exited.', id(f))
55 logger.exception(exception)
57 yield smart_future_by_real_future[f]
58 if callback is not None:
63 futures: List[SmartFuture],
65 log_exceptions: bool = True,
69 assert isinstance(x, SmartFuture)
70 real_futures.append(x.wrapped_future)
72 (done, not_done) = concurrent.futures.wait(
73 real_futures, timeout=None, return_when=concurrent.futures.ALL_COMPLETED
76 for f in real_futures:
78 exception = f.exception()
79 if exception is not None:
80 logger.warning('Future 0x%x raised an unhandled exception and exited.', id(f))
81 logger.exception(exception)
83 assert len(done) == len(real_futures)
84 assert len(not_done) == 0
87 class SmartFuture(DeferredOperand):
88 """This is a SmartFuture, a class that wraps a normal Future and can
89 then be used, mostly, like a normal (non-Future) identifier.
91 Using a FutureWrapper in expressions will block and wait until
92 the result of the deferred operation is known.
95 def __init__(self, wrapped_future: fut.Future) -> None:
96 assert isinstance(wrapped_future, fut.Future)
97 self.wrapped_future = wrapped_future
98 self.id = id_generator.get("smart_future_id")
100 def get_id(self) -> int:
103 def is_ready(self) -> bool:
104 return self.wrapped_future.done()
106 # You shouldn't have to call this; instead, have a look at defining a
107 # method on DeferredOperand base class.
109 def _resolve(self, timeout=None) -> T:
110 return self.wrapped_future.result(timeout)