Migration from old pyutilz package name (which, in turn, came from
[pyutils.git] / src / pyutils / parallelize / smart_future.py
1 #!/usr/bin/env python3
2
3 # © Copyright 2021-2022, Scott Gasch
4
5 """A :class:`Future` that can be treated as a substutute for the result
6 that it contains and will not block until it is used.  At that point,
7 if the underlying value is not yet available yet, it will block until
8 the internal result actually becomes available.
9 """
10
11 from __future__ import annotations
12
13 import concurrent
14 import concurrent.futures as fut
15 import logging
16 from typing import Callable, List, Set, TypeVar
17
18 from overrides import overrides
19
20 # This module is commonly used by others in here and should avoid
21 # taking any unnecessary dependencies back on them.
22 from pyutils import id_generator
23 from pyutils.parallelize.deferred_operand import DeferredOperand
24
25 logger = logging.getLogger(__name__)
26
27 T = TypeVar('T')
28
29
30 def wait_any(
31     futures: List[SmartFuture],
32     *,
33     callback: Callable = None,
34     log_exceptions: bool = True,
35     timeout: float = None,
36 ):
37     """Await the completion of any of a collection of SmartFutures and
38     invoke callback each time one completes, repeatedly, until they are
39     all finished.
40
41     Args:
42         futures: A collection of SmartFutures to wait on
43         callback: An optional callback to invoke whenever one of the
44             futures completes
45         log_exceptions: Should we log (warning + exception) any
46             underlying exceptions raised during future processing or
47             silently ignore then?
48         timeout: invoke callback with a periodicity of timeout while
49             awaiting futures
50     """
51
52     real_futures = []
53     smart_future_by_real_future = {}
54     completed_futures: Set[fut.Future] = set()
55     for x in futures:
56         assert isinstance(x, SmartFuture)
57         real_futures.append(x.wrapped_future)
58         smart_future_by_real_future[x.wrapped_future] = x
59
60     while len(completed_futures) != len(real_futures):
61         try:
62             newly_completed_futures = concurrent.futures.as_completed(
63                 real_futures, timeout=timeout
64             )
65             for f in newly_completed_futures:
66                 if callback is not None:
67                     callback()
68                 completed_futures.add(f)
69                 if log_exceptions and not f.cancelled():
70                     exception = f.exception()
71                     if exception is not None:
72                         logger.warning(
73                             'Future 0x%x raised an unhandled exception and exited.',
74                             id(f),
75                         )
76                         logger.exception(exception)
77                         raise exception
78                 yield smart_future_by_real_future[f]
79         except TimeoutError:
80             if callback is not None:
81                 callback()
82     if callback is not None:
83         callback()
84
85
86 def wait_all(
87     futures: List[SmartFuture],
88     *,
89     log_exceptions: bool = True,
90 ) -> None:
91     """Wait for all of the SmartFutures in the collection to finish before
92     returning.
93
94     Args:
95         futures: A collection of futures that we're waiting for
96         log_exceptions: Should we log (warning + exception) any
97             underlying exceptions raised during future processing or
98             silently ignore then?
99     """
100
101     real_futures = []
102     for x in futures:
103         assert isinstance(x, SmartFuture)
104         real_futures.append(x.wrapped_future)
105
106     (done, not_done) = concurrent.futures.wait(
107         real_futures, timeout=None, return_when=concurrent.futures.ALL_COMPLETED
108     )
109     if log_exceptions:
110         for f in real_futures:
111             if not f.cancelled():
112                 exception = f.exception()
113                 if exception is not None:
114                     logger.warning(
115                         'Future 0x%x raised an unhandled exception and exited.', id(f)
116                     )
117                     logger.exception(exception)
118                     raise exception
119     assert len(done) == len(real_futures)
120     assert len(not_done) == 0
121
122
123 class SmartFuture(DeferredOperand):
124     """This is a SmartFuture, a class that wraps a normal :class:`Future`
125     and can then be used, mostly, like a normal (non-Future)
126     identifier of the type of that SmartFuture's result.
127
128     Using a FutureWrapper in expressions will block and wait until
129     the result of the deferred operation is known.
130     """
131
132     def __init__(self, wrapped_future: fut.Future) -> None:
133         assert isinstance(wrapped_future, fut.Future)
134         self.wrapped_future = wrapped_future
135         self.id = id_generator.get("smart_future_id")
136
137     def get_id(self) -> int:
138         return self.id
139
140     def is_ready(self) -> bool:
141         return self.wrapped_future.done()
142
143     # You shouldn't have to call this; instead, have a look at defining a
144     # method on DeferredOperand base class.
145     @overrides
146     def _resolve(self, timeout=None) -> T:
147         return self.wrapped_future.result(timeout)