Make smart futures avoid polling.
[python_utils.git] / smart_future.py
1 #!/usr/bin/env python3
2
3 from __future__ import annotations
4 import concurrent
5 import concurrent.futures as fut
6 from typing import Callable, List, TypeVar
7
8 from overrides import overrides
9
10 # This module is commonly used by others in here and should avoid
11 # taking any unnecessary dependencies back on them.
12 from deferred_operand import DeferredOperand
13 import id_generator
14
15 T = TypeVar('T')
16
17
18 def wait_any(futures: List[SmartFuture], *, callback: Callable = None):
19     real_futures = []
20     smart_future_by_real_future = {}
21     completed_futures = set()
22     for _ in futures:
23         real_futures.append(_.wrapped_future)
24         smart_future_by_real_future[_.wrapped_future] = _
25     while len(completed_futures) != len(real_futures):
26         newly_completed_futures = concurrent.futures.as_completed(real_futures)
27         for f in newly_completed_futures:
28             if callback is not None:
29                 callback()
30             completed_futures.add(f)
31             yield smart_future_by_real_future[f]
32     if callback is not None:
33         callback()
34     return
35
36
37 def wait_all(futures: List[SmartFuture]) -> None:
38     real_futures = [x.wrapped_future for x in futures]
39     (done, not_done) = concurrent.futures.wait(
40         real_futures,
41         timeout=None,
42         return_when=concurrent.futures.ALL_COMPLETED
43     )
44     assert len(done) == len(real_futures)
45     assert len(not_done) == 0
46
47
48 class SmartFuture(DeferredOperand):
49     """This is a SmartFuture, a class that wraps a normal Future and can
50     then be used, mostly, like a normal (non-Future) identifier.
51
52     Using a FutureWrapper in expressions will block and wait until
53     the result of the deferred operation is known.
54     """
55
56     def __init__(self, wrapped_future: fut.Future) -> None:
57         self.wrapped_future = wrapped_future
58         self.id = id_generator.get("smart_future_id")
59
60     def get_id(self) -> int:
61         return self.id
62
63     def is_ready(self) -> bool:
64         return self.wrapped_future.done()
65
66     # You shouldn't have to call this; instead, have a look at defining a
67     # method on DeferredOperand base class.
68     @overrides
69     def _resolve(self, *, timeout=None) -> T:
70         return self.wrapped_future.result(timeout)