3 # © Copyright 2021-2022, Scott Gasch
6 A future that can be treated as a substutute for the result that it
7 contains and will not block until it is used. At that point, if the
8 underlying value is not yet available yet, it will block until the
9 internal result actually becomes available.
13 from __future__ import annotations
15 import concurrent.futures as fut
17 from typing import Callable, List, Set, TypeVar
19 from overrides import overrides
23 # This module is commonly used by others in here and should avoid
24 # taking any unnecessary dependencies back on them.
25 from deferred_operand import DeferredOperand
27 logger = logging.getLogger(__name__)
33 futures: List[SmartFuture],
35 callback: Callable = None,
36 log_exceptions: bool = True,
38 """Await the completion of any of a collection of SmartFutures and
39 invoke callback each time one completes, repeatedly, until they are
43 futures: A collection of SmartFutures to wait on
44 callback: An optional callback to invoke whenever one of the
46 log_exceptions: Should we log (warning + exception) any
47 underlying exceptions raised during future processing or
52 smart_future_by_real_future = {}
53 completed_futures: Set[fut.Future] = set()
55 assert isinstance(x, SmartFuture)
56 real_futures.append(x.wrapped_future)
57 smart_future_by_real_future[x.wrapped_future] = x
59 while len(completed_futures) != len(real_futures):
60 newly_completed_futures = concurrent.futures.as_completed(real_futures)
61 for f in newly_completed_futures:
62 if callback is not None:
64 completed_futures.add(f)
65 if log_exceptions and not f.cancelled():
66 exception = f.exception()
67 if exception is not None:
68 logger.warning('Future 0x%x raised an unhandled exception and exited.', id(f))
69 logger.exception(exception)
71 yield smart_future_by_real_future[f]
72 if callback is not None:
77 futures: List[SmartFuture],
79 log_exceptions: bool = True,
81 """Wait for all of the SmartFutures in the collection to finish before
85 futures: A collection of futures that we're waiting for
86 log_exceptions: Should we log (warning + exception) any
87 underlying exceptions raised during future processing or
93 assert isinstance(x, SmartFuture)
94 real_futures.append(x.wrapped_future)
96 (done, not_done) = concurrent.futures.wait(
97 real_futures, timeout=None, return_when=concurrent.futures.ALL_COMPLETED
100 for f in real_futures:
101 if not f.cancelled():
102 exception = f.exception()
103 if exception is not None:
104 logger.warning('Future 0x%x raised an unhandled exception and exited.', id(f))
105 logger.exception(exception)
107 assert len(done) == len(real_futures)
108 assert len(not_done) == 0
111 class SmartFuture(DeferredOperand):
112 """This is a SmartFuture, a class that wraps a normal :class:`Future`
113 and can then be used, mostly, like a normal (non-Future)
114 identifier of the type of that SmartFuture's result.
116 Using a FutureWrapper in expressions will block and wait until
117 the result of the deferred operation is known.
120 def __init__(self, wrapped_future: fut.Future) -> None:
121 assert isinstance(wrapped_future, fut.Future)
122 self.wrapped_future = wrapped_future
123 self.id = id_generator.get("smart_future_id")
125 def get_id(self) -> int:
128 def is_ready(self) -> bool:
129 return self.wrapped_future.done()
131 # You shouldn't have to call this; instead, have a look at defining a
132 # method on DeferredOperand base class.
134 def _resolve(self, timeout=None) -> T:
135 return self.wrapped_future.result(timeout)