3 # © Copyright 2021-2022, Scott Gasch
5 """A :class:`Future` that can be treated as a substutute for the result
6 that it contains and will not block until it is used. At that point,
7 if the underlying value is not yet available yet, it will block until
8 the internal result actually becomes available.
11 from __future__ import annotations
14 import concurrent.futures as fut
16 from typing import Callable, List, Set, TypeVar
18 from overrides import overrides
20 # This module is commonly used by others in here and should avoid
21 # taking any unnecessary dependencies back on them.
22 from pyutils import id_generator
23 from pyutils.parallelize.deferred_operand import DeferredOperand
25 logger = logging.getLogger(__name__)
31 futures: List[SmartFuture],
33 callback: Callable = None,
34 log_exceptions: bool = True,
35 timeout: float = None,
37 """Await the completion of any of a collection of SmartFutures and
38 invoke callback each time one completes, repeatedly, until they are
42 futures: A collection of SmartFutures to wait on
43 callback: An optional callback to invoke whenever one of the
45 log_exceptions: Should we log (warning + exception) any
46 underlying exceptions raised during future processing or
48 timeout: invoke callback with a periodicity of timeout while
53 smart_future_by_real_future = {}
54 completed_futures: Set[fut.Future] = set()
56 assert isinstance(x, SmartFuture)
57 real_futures.append(x.wrapped_future)
58 smart_future_by_real_future[x.wrapped_future] = x
60 while len(completed_futures) != len(real_futures):
62 newly_completed_futures = concurrent.futures.as_completed(
63 real_futures, timeout=timeout
65 for f in newly_completed_futures:
66 if callback is not None:
68 completed_futures.add(f)
69 if log_exceptions and not f.cancelled():
70 exception = f.exception()
71 if exception is not None:
73 'Future 0x%x raised an unhandled exception and exited.',
76 logger.exception(exception)
78 yield smart_future_by_real_future[f]
80 if callback is not None:
82 if callback is not None:
87 futures: List[SmartFuture],
89 log_exceptions: bool = True,
91 """Wait for all of the SmartFutures in the collection to finish before
95 futures: A collection of futures that we're waiting for
96 log_exceptions: Should we log (warning + exception) any
97 underlying exceptions raised during future processing or
103 assert isinstance(x, SmartFuture)
104 real_futures.append(x.wrapped_future)
106 (done, not_done) = concurrent.futures.wait(
107 real_futures, timeout=None, return_when=concurrent.futures.ALL_COMPLETED
110 for f in real_futures:
111 if not f.cancelled():
112 exception = f.exception()
113 if exception is not None:
115 'Future 0x%x raised an unhandled exception and exited.', id(f)
117 logger.exception(exception)
119 assert len(done) == len(real_futures)
120 assert len(not_done) == 0
123 class SmartFuture(DeferredOperand):
124 """This is a SmartFuture, a class that wraps a normal :class:`Future`
125 and can then be used, mostly, like a normal (non-Future)
126 identifier of the type of that SmartFuture's result.
128 Using a FutureWrapper in expressions will block and wait until
129 the result of the deferred operation is known.
132 def __init__(self, wrapped_future: fut.Future) -> None:
133 assert isinstance(wrapped_future, fut.Future)
134 self.wrapped_future = wrapped_future
135 self.id = id_generator.get("smart_future_id")
137 def get_id(self) -> int:
140 def is_ready(self) -> bool:
141 return self.wrapped_future.done()
143 # You shouldn't have to call this; instead, have a look at defining a
144 # method on DeferredOperand base class.
146 def _resolve(self, timeout=None) -> T:
147 return self.wrapped_future.result(timeout)