3 # © Copyright 2021-2022, Scott Gasch
6 A :class:`Future` that can be treated as a substutute for the result
7 that it contains and will not block until it is used. At that point,
8 if the underlying value is not yet available yet, it will block until
9 the internal result actually becomes available.
11 Results from :class:`parallelize.parallelize` are returned wrapped
12 in :class:`SmartFuture` instances.
14 Also contains some utilility code for waiting for one/many futures.
17 from __future__ import annotations
20 import concurrent.futures as fut
22 from typing import Callable, List, Set, TypeVar
24 from overrides import overrides
26 # This module is commonly used by others in here and should avoid
27 # taking any unnecessary dependencies back on them.
28 from pyutils import id_generator
29 from pyutils.parallelize.deferred_operand import DeferredOperand
31 logger = logging.getLogger(__name__)
37 futures: List[SmartFuture],
39 callback: Callable = None,
40 log_exceptions: bool = True,
41 timeout: float = None,
43 """Await the completion of any of a collection of SmartFutures and
44 invoke callback each time one completes, repeatedly, until they are
48 futures: A collection of SmartFutures to wait on
49 callback: An optional callback to invoke whenever one of the
51 log_exceptions: Should we log (warning + exception) any
52 underlying exceptions raised during future processing or
54 timeout: invoke callback with a periodicity of timeout while
58 A :class:`SmartFuture` from the futures list with a result
59 available without blocking.
63 smart_future_by_real_future = {}
64 completed_futures: Set[fut.Future] = set()
66 assert isinstance(x, SmartFuture)
67 real_futures.append(x.wrapped_future)
68 smart_future_by_real_future[x.wrapped_future] = x
70 while len(completed_futures) != len(real_futures):
72 newly_completed_futures = concurrent.futures.as_completed(
73 real_futures, timeout=timeout
75 for f in newly_completed_futures:
76 if callback is not None:
78 completed_futures.add(f)
79 if log_exceptions and not f.cancelled():
80 exception = f.exception()
81 if exception is not None:
83 'Future 0x%x raised an unhandled exception and exited.',
86 logger.exception(exception)
88 yield smart_future_by_real_future[f]
90 if callback is not None:
92 if callback is not None:
97 futures: List[SmartFuture],
99 log_exceptions: bool = True,
101 """Wait for all of the SmartFutures in the collection to finish before
105 futures: A collection of futures that we're waiting for
106 log_exceptions: Should we log (warning + exception) any
107 underlying exceptions raised during future processing or
108 silently ignore then?
111 Only when all futures in the input list are ready. Blocks
117 assert isinstance(x, SmartFuture)
118 real_futures.append(x.wrapped_future)
120 (done, not_done) = concurrent.futures.wait(
121 real_futures, timeout=None, return_when=concurrent.futures.ALL_COMPLETED
124 for f in real_futures:
125 if not f.cancelled():
126 exception = f.exception()
127 if exception is not None:
129 'Future 0x%x raised an unhandled exception and exited.', id(f)
131 logger.exception(exception)
133 assert len(done) == len(real_futures)
134 assert len(not_done) == 0
137 class SmartFuture(DeferredOperand):
138 """This is a SmartFuture, a class that wraps a normal :class:`Future`
139 and can then be used, mostly, like a normal (non-Future)
140 identifier of the type of that SmartFuture's result.
142 Using a FutureWrapper in expressions will block and wait until
143 the result of the deferred operation is known.
146 def __init__(self, wrapped_future: fut.Future) -> None:
149 wrapped_future: a normal Python :class:`concurrent.Future`
150 object that we are wrapping.
152 super().__init__(set(['id', 'wrapped_future', 'get_id', 'is_ready']))
153 assert isinstance(wrapped_future, fut.Future)
154 self.wrapped_future = wrapped_future
155 self.id = id_generator.get("smart_future_id")
157 # Note: if you are adding any settable properties to this
158 # class, go add them to the set in DeferredOperand.__setattr__()
160 def get_id(self) -> int:
163 A unique identifier for this instance.
167 def is_ready(self) -> bool:
170 True if the wrapped future is ready without blocking, False
173 return self.wrapped_future.done()
175 # You shouldn't have to call this; instead, have a look at defining a
176 # method on DeferredOperand base class.
178 def _resolve(self, timeout=None) -> T:
179 return self.wrapped_future.result(timeout)