3 # © Copyright 2021-2022, Scott Gasch
5 """A :class:`Future` that can be treated as a substutute for the result
6 that it contains and will not block until it is used. At that point,
7 if the underlying value is not yet available yet, it will block until
8 the internal result actually becomes available.
11 from __future__ import annotations
13 import concurrent.futures as fut
15 from typing import Callable, List, Set, TypeVar
17 from overrides import overrides
21 # This module is commonly used by others in here and should avoid
22 # taking any unnecessary dependencies back on them.
23 from deferred_operand import DeferredOperand
25 logger = logging.getLogger(__name__)
31 futures: List[SmartFuture],
33 callback: Callable = None,
34 log_exceptions: bool = True,
36 """Await the completion of any of a collection of SmartFutures and
37 invoke callback each time one completes, repeatedly, until they are
41 futures: A collection of SmartFutures to wait on
42 callback: An optional callback to invoke whenever one of the
44 log_exceptions: Should we log (warning + exception) any
45 underlying exceptions raised during future processing or
50 smart_future_by_real_future = {}
51 completed_futures: Set[fut.Future] = set()
53 assert isinstance(x, SmartFuture)
54 real_futures.append(x.wrapped_future)
55 smart_future_by_real_future[x.wrapped_future] = x
57 while len(completed_futures) != len(real_futures):
58 newly_completed_futures = concurrent.futures.as_completed(real_futures)
59 for f in newly_completed_futures:
60 if callback is not None:
62 completed_futures.add(f)
63 if log_exceptions and not f.cancelled():
64 exception = f.exception()
65 if exception is not None:
66 logger.warning('Future 0x%x raised an unhandled exception and exited.', id(f))
67 logger.exception(exception)
69 yield smart_future_by_real_future[f]
70 if callback is not None:
75 futures: List[SmartFuture],
77 log_exceptions: bool = True,
79 """Wait for all of the SmartFutures in the collection to finish before
83 futures: A collection of futures that we're waiting for
84 log_exceptions: Should we log (warning + exception) any
85 underlying exceptions raised during future processing or
91 assert isinstance(x, SmartFuture)
92 real_futures.append(x.wrapped_future)
94 (done, not_done) = concurrent.futures.wait(
95 real_futures, timeout=None, return_when=concurrent.futures.ALL_COMPLETED
98 for f in real_futures:
100 exception = f.exception()
101 if exception is not None:
102 logger.warning('Future 0x%x raised an unhandled exception and exited.', id(f))
103 logger.exception(exception)
105 assert len(done) == len(real_futures)
106 assert len(not_done) == 0
109 class SmartFuture(DeferredOperand):
110 """This is a SmartFuture, a class that wraps a normal :class:`Future`
111 and can then be used, mostly, like a normal (non-Future)
112 identifier of the type of that SmartFuture's result.
114 Using a FutureWrapper in expressions will block and wait until
115 the result of the deferred operation is known.
118 def __init__(self, wrapped_future: fut.Future) -> None:
119 assert isinstance(wrapped_future, fut.Future)
120 self.wrapped_future = wrapped_future
121 self.id = id_generator.get("smart_future_id")
123 def get_id(self) -> int:
126 def is_ready(self) -> bool:
127 return self.wrapped_future.done()
129 # You shouldn't have to call this; instead, have a look at defining a
130 # method on DeferredOperand base class.
132 def _resolve(self, timeout=None) -> T:
133 return self.wrapped_future.result(timeout)