3 from __future__ import annotations
5 import concurrent.futures as fut
6 from typing import Callable, List, TypeVar
8 from overrides import overrides
10 # This module is commonly used by others in here and should avoid
11 # taking any unnecessary dependencies back on them.
12 from deferred_operand import DeferredOperand
18 def wait_any(futures: List[SmartFuture], *, callback: Callable = None):
20 smart_future_by_real_future = {}
21 completed_futures = set()
23 real_futures.append(_.wrapped_future)
24 smart_future_by_real_future[_.wrapped_future] = _
25 while len(completed_futures) != len(real_futures):
26 newly_completed_futures = concurrent.futures.as_completed(real_futures)
27 for f in newly_completed_futures:
28 if callback is not None:
30 completed_futures.add(f)
31 yield smart_future_by_real_future[f]
32 if callback is not None:
37 def wait_all(futures: List[SmartFuture]) -> None:
38 real_futures = [x.wrapped_future for x in futures]
39 (done, not_done) = concurrent.futures.wait(
40 real_futures, timeout=None, return_when=concurrent.futures.ALL_COMPLETED
42 assert len(done) == len(real_futures)
43 assert len(not_done) == 0
46 class SmartFuture(DeferredOperand):
47 """This is a SmartFuture, a class that wraps a normal Future and can
48 then be used, mostly, like a normal (non-Future) identifier.
50 Using a FutureWrapper in expressions will block and wait until
51 the result of the deferred operation is known.
54 def __init__(self, wrapped_future: fut.Future) -> None:
55 self.wrapped_future = wrapped_future
56 self.id = id_generator.get("smart_future_id")
58 def get_id(self) -> int:
61 def is_ready(self) -> bool:
62 return self.wrapped_future.done()
64 # You shouldn't have to call this; instead, have a look at defining a
65 # method on DeferredOperand base class.
67 def _resolve(self, *, timeout=None) -> T:
68 return self.wrapped_future.result(timeout)