I guess it's 2023 now...
[pyutils.git] / src / pyutils / parallelize / smart_future.py
1 #!/usr/bin/env python3
2
3 # © Copyright 2021-2023, Scott Gasch
4
5 """
6 A :class:`Future` that can be treated as a substutute for the result
7 that it contains and will not block until it is used.  At that point,
8 if the underlying value is not yet available yet, it will block until
9 the internal result actually becomes available.
10
11 Results from :class:`parallelize.parallelize` are returned wrapped
12 in :class:`SmartFuture` instances.
13
14 Also contains some utilility code for waiting for one/many futures.
15 """
16
17 from __future__ import annotations
18
19 import concurrent
20 import concurrent.futures as fut
21 import logging
22 from typing import Callable, List, Set, TypeVar
23
24 from overrides import overrides
25
26 # This module is commonly used by others in here and should avoid
27 # taking any unnecessary dependencies back on them.
28 from pyutils import id_generator
29 from pyutils.parallelize.deferred_operand import DeferredOperand
30
31 logger = logging.getLogger(__name__)
32
33 T = TypeVar("T")
34
35
36 def wait_any(
37     futures: List[SmartFuture],
38     *,
39     callback: Callable = None,
40     log_exceptions: bool = True,
41     timeout: float = None,
42 ):
43     """Await the completion of any of a collection of SmartFutures and
44     invoke callback each time one completes, repeatedly, until they are
45     all finished.
46
47     Args:
48         futures: A collection of SmartFutures to wait on
49         callback: An optional callback to invoke whenever one of the
50             futures completes
51         log_exceptions: Should we log (warning + exception) any
52             underlying exceptions raised during future processing or
53             silently ignore them?
54         timeout: invoke callback with a periodicity of timeout while
55             awaiting futures
56
57     Returns:
58         A :class:`SmartFuture` from the futures list with a result
59         available without blocking.
60     """
61
62     real_futures = []
63     smart_future_by_real_future = {}
64     completed_futures: Set[fut.Future] = set()
65     for x in futures:
66         assert isinstance(x, SmartFuture)
67         real_futures.append(x.wrapped_future)
68         smart_future_by_real_future[x.wrapped_future] = x
69
70     while len(completed_futures) != len(real_futures):
71         try:
72             newly_completed_futures = concurrent.futures.as_completed(
73                 real_futures, timeout=timeout
74             )
75             for f in newly_completed_futures:
76                 if callback is not None:
77                     callback()
78                 completed_futures.add(f)
79                 if log_exceptions and not f.cancelled():
80                     exception = f.exception()
81                     if exception is not None:
82                         logger.exception(
83                             "Future 0x%x raised an unhandled exception and exited.",
84                             id(f),
85                             exc_info=exception,
86                         )
87                         raise exception
88                 yield smart_future_by_real_future[f]
89         except concurrent.futures.TimeoutError:
90             if callback is not None:
91                 callback()
92     if callback is not None:
93         callback()
94
95
96 def wait_all(
97     futures: List[SmartFuture],
98     *,
99     log_exceptions: bool = True,
100 ) -> None:
101     """Wait for all of the SmartFutures in the collection to finish before
102     returning.
103
104     Args:
105         futures: A collection of futures that we're waiting for
106         log_exceptions: Should we log (warning + exception) any
107             underlying exceptions raised during future processing or
108             silently ignore them?
109
110     Returns:
111         Only when all futures in the input list are ready.  Blocks
112         until such time.
113     """
114
115     real_futures = []
116     for x in futures:
117         assert isinstance(x, SmartFuture)
118         real_futures.append(x.wrapped_future)
119
120     (done, not_done) = concurrent.futures.wait(
121         real_futures, timeout=None, return_when=concurrent.futures.ALL_COMPLETED
122     )
123     if log_exceptions:
124         for f in real_futures:
125             if not f.cancelled():
126                 exception = f.exception()
127                 if exception is not None:
128                     logger.exception(
129                         "Future 0x%x raised an unhandled exception and exited.",
130                         id(f),
131                         exc_info=exception,
132                     )
133                     raise exception
134     assert len(done) == len(real_futures)
135     assert len(not_done) == 0
136
137
138 class SmartFuture(DeferredOperand):
139     """This is a SmartFuture, a class that wraps a normal :class:`Future`
140     and can then be used, mostly, like a normal (non-Future)
141     identifier of the type of that SmartFuture's result.
142
143     Using a FutureWrapper in expressions will block and wait until
144     the result of the deferred operation is known.
145     """
146
147     def __init__(self, wrapped_future: fut.Future) -> None:
148         """
149         Args:
150             wrapped_future: a normal Python :class:`concurrent.Future`
151                 object that we are wrapping.
152         """
153         super().__init__(set(["id", "wrapped_future", "get_id", "is_ready"]))
154         assert isinstance(wrapped_future, fut.Future)
155         self.wrapped_future = wrapped_future
156         self.id = id_generator.get("smart_future_id")
157
158         # Note: if you are adding any settable properties to this
159         # class, go add them to the set in DeferredOperand.__setattr__()
160
161     def get_id(self) -> int:
162         """
163         Returns:
164             A unique identifier for this instance.
165         """
166         return self.id
167
168     def is_ready(self) -> bool:
169         """
170         Returns:
171             True if the wrapped future is ready without blocking, False
172             otherwise.
173         """
174         return self.wrapped_future.done()
175
176     # You shouldn't have to call this; instead, have a look at defining a
177     # method on DeferredOperand base class.
178     @overrides
179     def _resolve(self, timeout=None) -> T:
180         return self.wrapped_future.result(timeout)