3 # © Copyright 2021-2022, Scott Gasch
6 A :class:`Future` that can be treated as a substutute for the result
7 that it contains and will not block until it is used. At that point,
8 if the underlying value is not yet available yet, it will block until
9 the internal result actually becomes available.
11 Results from :class:`parallelize.parallelize` are returned wrapped
12 in :class:`SmartFuture` instances.
15 from __future__ import annotations
18 import concurrent.futures as fut
20 from typing import Callable, List, Set, TypeVar
22 from overrides import overrides
24 # This module is commonly used by others in here and should avoid
25 # taking any unnecessary dependencies back on them.
26 from pyutils import id_generator
27 from pyutils.parallelize.deferred_operand import DeferredOperand
29 logger = logging.getLogger(__name__)
35 futures: List[SmartFuture],
37 callback: Callable = None,
38 log_exceptions: bool = True,
39 timeout: float = None,
41 """Await the completion of any of a collection of SmartFutures and
42 invoke callback each time one completes, repeatedly, until they are
46 futures: A collection of SmartFutures to wait on
47 callback: An optional callback to invoke whenever one of the
49 log_exceptions: Should we log (warning + exception) any
50 underlying exceptions raised during future processing or
52 timeout: invoke callback with a periodicity of timeout while
56 A :class:`SmartFuture` from the futures list with a result
57 available without blocking.
61 smart_future_by_real_future = {}
62 completed_futures: Set[fut.Future] = set()
64 assert isinstance(x, SmartFuture)
65 real_futures.append(x.wrapped_future)
66 smart_future_by_real_future[x.wrapped_future] = x
68 while len(completed_futures) != len(real_futures):
70 newly_completed_futures = concurrent.futures.as_completed(
71 real_futures, timeout=timeout
73 for f in newly_completed_futures:
74 if callback is not None:
76 completed_futures.add(f)
77 if log_exceptions and not f.cancelled():
78 exception = f.exception()
79 if exception is not None:
81 'Future 0x%x raised an unhandled exception and exited.',
84 logger.exception(exception)
86 yield smart_future_by_real_future[f]
88 if callback is not None:
90 if callback is not None:
95 futures: List[SmartFuture],
97 log_exceptions: bool = True,
99 """Wait for all of the SmartFutures in the collection to finish before
103 futures: A collection of futures that we're waiting for
104 log_exceptions: Should we log (warning + exception) any
105 underlying exceptions raised during future processing or
106 silently ignore then?
109 Only when all futures in the input list are ready. Blocks
115 assert isinstance(x, SmartFuture)
116 real_futures.append(x.wrapped_future)
118 (done, not_done) = concurrent.futures.wait(
119 real_futures, timeout=None, return_when=concurrent.futures.ALL_COMPLETED
122 for f in real_futures:
123 if not f.cancelled():
124 exception = f.exception()
125 if exception is not None:
127 'Future 0x%x raised an unhandled exception and exited.', id(f)
129 logger.exception(exception)
131 assert len(done) == len(real_futures)
132 assert len(not_done) == 0
135 class SmartFuture(DeferredOperand):
136 """This is a SmartFuture, a class that wraps a normal :class:`Future`
137 and can then be used, mostly, like a normal (non-Future)
138 identifier of the type of that SmartFuture's result.
140 Using a FutureWrapper in expressions will block and wait until
141 the result of the deferred operation is known.
144 def __init__(self, wrapped_future: fut.Future) -> None:
147 wrapped_future: a normal Python :class:`concurrent.Future`
148 object that we are wrapping.
150 assert isinstance(wrapped_future, fut.Future)
151 self.wrapped_future = wrapped_future
152 self.id = id_generator.get("smart_future_id")
154 def get_id(self) -> int:
157 A unique identifier for this instance.
161 def is_ready(self) -> bool:
164 True if the wrapped future is ready without blocking, False
167 return self.wrapped_future.done()
169 # You shouldn't have to call this; instead, have a look at defining a
170 # method on DeferredOperand base class.
172 def _resolve(self, timeout=None) -> T:
173 return self.wrapped_future.result(timeout)