More work to improve documentation generated by sphinx. Also fixes
[pyutils.git] / src / pyutils / parallelize / smart_future.py
1 #!/usr/bin/env python3
2
3 # © Copyright 2021-2022, Scott Gasch
4
5 """
6 A :class:`Future` that can be treated as a substutute for the result
7 that it contains and will not block until it is used.  At that point,
8 if the underlying value is not yet available yet, it will block until
9 the internal result actually becomes available.
10
11 Results from :class:`parallelize.parallelize` are returned wrapped
12 in :class:`SmartFuture` instances.
13 """
14
15 from __future__ import annotations
16
17 import concurrent
18 import concurrent.futures as fut
19 import logging
20 from typing import Callable, List, Set, TypeVar
21
22 from overrides import overrides
23
24 # This module is commonly used by others in here and should avoid
25 # taking any unnecessary dependencies back on them.
26 from pyutils import id_generator
27 from pyutils.parallelize.deferred_operand import DeferredOperand
28
29 logger = logging.getLogger(__name__)
30
31 T = TypeVar('T')
32
33
34 def wait_any(
35     futures: List[SmartFuture],
36     *,
37     callback: Callable = None,
38     log_exceptions: bool = True,
39     timeout: float = None,
40 ):
41     """Await the completion of any of a collection of SmartFutures and
42     invoke callback each time one completes, repeatedly, until they are
43     all finished.
44
45     Args:
46         futures: A collection of SmartFutures to wait on
47         callback: An optional callback to invoke whenever one of the
48             futures completes
49         log_exceptions: Should we log (warning + exception) any
50             underlying exceptions raised during future processing or
51             silently ignore then?
52         timeout: invoke callback with a periodicity of timeout while
53             awaiting futures
54
55     Returns:
56         A :class:`SmartFuture` from the futures list with a result
57         available without blocking.
58     """
59
60     real_futures = []
61     smart_future_by_real_future = {}
62     completed_futures: Set[fut.Future] = set()
63     for x in futures:
64         assert isinstance(x, SmartFuture)
65         real_futures.append(x.wrapped_future)
66         smart_future_by_real_future[x.wrapped_future] = x
67
68     while len(completed_futures) != len(real_futures):
69         try:
70             newly_completed_futures = concurrent.futures.as_completed(
71                 real_futures, timeout=timeout
72             )
73             for f in newly_completed_futures:
74                 if callback is not None:
75                     callback()
76                 completed_futures.add(f)
77                 if log_exceptions and not f.cancelled():
78                     exception = f.exception()
79                     if exception is not None:
80                         logger.warning(
81                             'Future 0x%x raised an unhandled exception and exited.',
82                             id(f),
83                         )
84                         logger.exception(exception)
85                         raise exception
86                 yield smart_future_by_real_future[f]
87         except TimeoutError:
88             if callback is not None:
89                 callback()
90     if callback is not None:
91         callback()
92
93
94 def wait_all(
95     futures: List[SmartFuture],
96     *,
97     log_exceptions: bool = True,
98 ) -> None:
99     """Wait for all of the SmartFutures in the collection to finish before
100     returning.
101
102     Args:
103         futures: A collection of futures that we're waiting for
104         log_exceptions: Should we log (warning + exception) any
105             underlying exceptions raised during future processing or
106             silently ignore then?
107
108     Returns:
109         Only when all futures in the input list are ready.  Blocks
110         until such time.
111     """
112
113     real_futures = []
114     for x in futures:
115         assert isinstance(x, SmartFuture)
116         real_futures.append(x.wrapped_future)
117
118     (done, not_done) = concurrent.futures.wait(
119         real_futures, timeout=None, return_when=concurrent.futures.ALL_COMPLETED
120     )
121     if log_exceptions:
122         for f in real_futures:
123             if not f.cancelled():
124                 exception = f.exception()
125                 if exception is not None:
126                     logger.warning(
127                         'Future 0x%x raised an unhandled exception and exited.', id(f)
128                     )
129                     logger.exception(exception)
130                     raise exception
131     assert len(done) == len(real_futures)
132     assert len(not_done) == 0
133
134
135 class SmartFuture(DeferredOperand):
136     """This is a SmartFuture, a class that wraps a normal :class:`Future`
137     and can then be used, mostly, like a normal (non-Future)
138     identifier of the type of that SmartFuture's result.
139
140     Using a FutureWrapper in expressions will block and wait until
141     the result of the deferred operation is known.
142     """
143
144     def __init__(self, wrapped_future: fut.Future) -> None:
145         """
146         Args:
147             wrapped_future: a normal Python :class:`concurrent.Future`
148                 object that we are wrapping.
149         """
150         assert isinstance(wrapped_future, fut.Future)
151         self.wrapped_future = wrapped_future
152         self.id = id_generator.get("smart_future_id")
153
154     def get_id(self) -> int:
155         """
156         Returns:
157             A unique identifier for this instance.
158         """
159         return self.id
160
161     def is_ready(self) -> bool:
162         """
163         Returns:
164             True if the wrapped future is ready without blocking, False
165             otherwise.
166         """
167         return self.wrapped_future.done()
168
169     # You shouldn't have to call this; instead, have a look at defining a
170     # method on DeferredOperand base class.
171     @overrides
172     def _resolve(self, timeout=None) -> T:
173         return self.wrapped_future.result(timeout)