3 # © Copyright 2021-2022, Scott Gasch
5 """A :class:`Future` that can be treated as a substutute for the result
6 that it contains and will not block until it is used. At that point,
7 if the underlying value is not yet available yet, it will block until
8 the internal result actually becomes available.
11 from __future__ import annotations
13 import concurrent.futures as fut
15 from typing import Callable, List, Set, TypeVar
17 from overrides import overrides
21 # This module is commonly used by others in here and should avoid
22 # taking any unnecessary dependencies back on them.
23 from deferred_operand import DeferredOperand
25 logger = logging.getLogger(__name__)
31 futures: List[SmartFuture],
33 callback: Callable = None,
34 log_exceptions: bool = True,
35 timeout: float = None,
37 """Await the completion of any of a collection of SmartFutures and
38 invoke callback each time one completes, repeatedly, until they are
42 futures: A collection of SmartFutures to wait on
43 callback: An optional callback to invoke whenever one of the
45 log_exceptions: Should we log (warning + exception) any
46 underlying exceptions raised during future processing or
48 timeout: invoke callback with a periodicity of timeout while
53 smart_future_by_real_future = {}
54 completed_futures: Set[fut.Future] = set()
56 assert isinstance(x, SmartFuture)
57 real_futures.append(x.wrapped_future)
58 smart_future_by_real_future[x.wrapped_future] = x
60 while len(completed_futures) != len(real_futures):
62 newly_completed_futures = concurrent.futures.as_completed(real_futures, timeout=timeout)
63 for f in newly_completed_futures:
64 if callback is not None:
66 completed_futures.add(f)
67 if log_exceptions and not f.cancelled():
68 exception = f.exception()
69 if exception is not None:
71 'Future 0x%x raised an unhandled exception and exited.', id(f)
73 logger.exception(exception)
75 yield smart_future_by_real_future[f]
77 if callback is not None:
79 if callback is not None:
84 futures: List[SmartFuture],
86 log_exceptions: bool = True,
88 """Wait for all of the SmartFutures in the collection to finish before
92 futures: A collection of futures that we're waiting for
93 log_exceptions: Should we log (warning + exception) any
94 underlying exceptions raised during future processing or
100 assert isinstance(x, SmartFuture)
101 real_futures.append(x.wrapped_future)
103 (done, not_done) = concurrent.futures.wait(
104 real_futures, timeout=None, return_when=concurrent.futures.ALL_COMPLETED
107 for f in real_futures:
108 if not f.cancelled():
109 exception = f.exception()
110 if exception is not None:
111 logger.warning('Future 0x%x raised an unhandled exception and exited.', id(f))
112 logger.exception(exception)
114 assert len(done) == len(real_futures)
115 assert len(not_done) == 0
118 class SmartFuture(DeferredOperand):
119 """This is a SmartFuture, a class that wraps a normal :class:`Future`
120 and can then be used, mostly, like a normal (non-Future)
121 identifier of the type of that SmartFuture's result.
123 Using a FutureWrapper in expressions will block and wait until
124 the result of the deferred operation is known.
127 def __init__(self, wrapped_future: fut.Future) -> None:
128 assert isinstance(wrapped_future, fut.Future)
129 self.wrapped_future = wrapped_future
130 self.id = id_generator.get("smart_future_id")
132 def get_id(self) -> int:
135 def is_ready(self) -> bool:
136 return self.wrapped_future.done()
138 # You shouldn't have to call this; instead, have a look at defining a
139 # method on DeferredOperand base class.
141 def _resolve(self, timeout=None) -> T:
142 return self.wrapped_future.result(timeout)