3 # © Copyright 2021-2022, Scott Gasch
5 """A :class:`Future` that can be treated as a substutute for the result
6 that it contains and will not block until it is used. At that point,
7 if the underlying value is not yet available yet, it will block until
8 the internal result actually becomes available.
11 from __future__ import annotations
13 import concurrent.futures as fut
15 from typing import Callable, List, Set, TypeVar
17 from overrides import overrides
21 # This module is commonly used by others in here and should avoid
22 # taking any unnecessary dependencies back on them.
23 from deferred_operand import DeferredOperand
25 logger = logging.getLogger(__name__)
31 futures: List[SmartFuture],
33 callback: Callable = None,
34 log_exceptions: bool = True,
35 timeout: float = None,
37 """Await the completion of any of a collection of SmartFutures and
38 invoke callback each time one completes, repeatedly, until they are
42 futures: A collection of SmartFutures to wait on
43 callback: An optional callback to invoke whenever one of the
45 log_exceptions: Should we log (warning + exception) any
46 underlying exceptions raised during future processing or
48 timeout: invoke callback with a periodicity of timeout while
53 smart_future_by_real_future = {}
54 completed_futures: Set[fut.Future] = set()
56 assert isinstance(x, SmartFuture)
57 real_futures.append(x.wrapped_future)
58 smart_future_by_real_future[x.wrapped_future] = x
60 while len(completed_futures) != len(real_futures):
63 newly_completed_futures = concurrent.futures.as_completed(real_futures, timeout=timeout)
64 for f in newly_completed_futures:
65 if callback is not None:
67 completed_futures.add(f)
68 if log_exceptions and not f.cancelled():
69 exception = f.exception()
70 if exception is not None:
72 'Future 0x%x raised an unhandled exception and exited.', id(f)
74 logger.exception(exception)
76 yield smart_future_by_real_future[f]
78 print(f"HERE!!! {len(completed_futures)} / {len(real_futures)}.")
79 if callback is not None:
81 if callback is not None:
86 futures: List[SmartFuture],
88 log_exceptions: bool = True,
90 """Wait for all of the SmartFutures in the collection to finish before
94 futures: A collection of futures that we're waiting for
95 log_exceptions: Should we log (warning + exception) any
96 underlying exceptions raised during future processing or
102 assert isinstance(x, SmartFuture)
103 real_futures.append(x.wrapped_future)
105 (done, not_done) = concurrent.futures.wait(
106 real_futures, timeout=None, return_when=concurrent.futures.ALL_COMPLETED
109 for f in real_futures:
110 if not f.cancelled():
111 exception = f.exception()
112 if exception is not None:
113 logger.warning('Future 0x%x raised an unhandled exception and exited.', id(f))
114 logger.exception(exception)
116 assert len(done) == len(real_futures)
117 assert len(not_done) == 0
120 class SmartFuture(DeferredOperand):
121 """This is a SmartFuture, a class that wraps a normal :class:`Future`
122 and can then be used, mostly, like a normal (non-Future)
123 identifier of the type of that SmartFuture's result.
125 Using a FutureWrapper in expressions will block and wait until
126 the result of the deferred operation is known.
129 def __init__(self, wrapped_future: fut.Future) -> None:
130 assert isinstance(wrapped_future, fut.Future)
131 self.wrapped_future = wrapped_future
132 self.id = id_generator.get("smart_future_id")
134 def get_id(self) -> int:
137 def is_ready(self) -> bool:
138 return self.wrapped_future.done()
140 # You shouldn't have to call this; instead, have a look at defining a
141 # method on DeferredOperand base class.
143 def _resolve(self, timeout=None) -> T:
144 return self.wrapped_future.result(timeout)