7768599b419e014375a3750ee592c168e21c123e
[python_utils.git] / smart_future.py
1 #!/usr/bin/env python3
2
3 # © Copyright 2021-2022, Scott Gasch
4
5 """
6 A future that can be treated as a substutute for the result that it
7 contains and will not block until it is used.  At that point, if the
8 underlying value is not yet available yet, it will block until the
9 internal result actually becomes available.
10
11 """
12
13 from __future__ import annotations
14 import concurrent
15 import concurrent.futures as fut
16 import logging
17 from typing import Callable, List, Set, TypeVar
18
19 from overrides import overrides
20
21 import id_generator
22
23 # This module is commonly used by others in here and should avoid
24 # taking any unnecessary dependencies back on them.
25 from deferred_operand import DeferredOperand
26
27 logger = logging.getLogger(__name__)
28
29 T = TypeVar('T')
30
31
32 def wait_any(
33     futures: List[SmartFuture],
34     *,
35     callback: Callable = None,
36     log_exceptions: bool = True,
37 ):
38     """Await the completion of any of a collection of SmartFutures and
39     invoke callback each time one completes, repeatedly, until they are
40     all finished.
41
42     Args:
43         futures: A collection of SmartFutures to wait on
44         callback: An optional callback to invoke whenever one of the
45             futures completes
46         log_exceptions: Should we log (warning + exception) any
47             underlying exceptions raised during future processing or
48             silently ignore then?
49     """
50
51     real_futures = []
52     smart_future_by_real_future = {}
53     completed_futures: Set[fut.Future] = set()
54     for x in futures:
55         assert isinstance(x, SmartFuture)
56         real_futures.append(x.wrapped_future)
57         smart_future_by_real_future[x.wrapped_future] = x
58
59     while len(completed_futures) != len(real_futures):
60         newly_completed_futures = concurrent.futures.as_completed(real_futures)
61         for f in newly_completed_futures:
62             if callback is not None:
63                 callback()
64             completed_futures.add(f)
65             if log_exceptions and not f.cancelled():
66                 exception = f.exception()
67                 if exception is not None:
68                     logger.warning('Future 0x%x raised an unhandled exception and exited.', id(f))
69                     logger.exception(exception)
70                     raise exception
71             yield smart_future_by_real_future[f]
72     if callback is not None:
73         callback()
74
75
76 def wait_all(
77     futures: List[SmartFuture],
78     *,
79     log_exceptions: bool = True,
80 ) -> None:
81     """Wait for all of the SmartFutures in the collection to finish before
82     returning.
83
84     Args:
85         futures: A collection of futures that we're waiting for
86         log_exceptions: Should we log (warning + exception) any
87             underlying exceptions raised during future processing or
88             silently ignore then?
89     """
90
91     real_futures = []
92     for x in futures:
93         assert isinstance(x, SmartFuture)
94         real_futures.append(x.wrapped_future)
95
96     (done, not_done) = concurrent.futures.wait(
97         real_futures, timeout=None, return_when=concurrent.futures.ALL_COMPLETED
98     )
99     if log_exceptions:
100         for f in real_futures:
101             if not f.cancelled():
102                 exception = f.exception()
103                 if exception is not None:
104                     logger.warning('Future 0x%x raised an unhandled exception and exited.', id(f))
105                     logger.exception(exception)
106                     raise exception
107     assert len(done) == len(real_futures)
108     assert len(not_done) == 0
109
110
111 class SmartFuture(DeferredOperand):
112     """This is a SmartFuture, a class that wraps a normal :class:`Future`
113     and can then be used, mostly, like a normal (non-Future)
114     identifier of the type of that SmartFuture's result.
115
116     Using a FutureWrapper in expressions will block and wait until
117     the result of the deferred operation is known.
118     """
119
120     def __init__(self, wrapped_future: fut.Future) -> None:
121         assert isinstance(wrapped_future, fut.Future)
122         self.wrapped_future = wrapped_future
123         self.id = id_generator.get("smart_future_id")
124
125     def get_id(self) -> int:
126         return self.id
127
128     def is_ready(self) -> bool:
129         return self.wrapped_future.done()
130
131     # You shouldn't have to call this; instead, have a look at defining a
132     # method on DeferredOperand base class.
133     @overrides
134     def _resolve(self, timeout=None) -> T:
135         return self.wrapped_future.result(timeout)