Final touches on the new test runner.
[python_utils.git] / smart_future.py
1 #!/usr/bin/env python3
2
3 # © Copyright 2021-2022, Scott Gasch
4
5 """A :class:`Future` that can be treated as a substutute for the result
6 that it contains and will not block until it is used.  At that point,
7 if the underlying value is not yet available yet, it will block until
8 the internal result actually becomes available.
9 """
10
11 from __future__ import annotations
12 import concurrent
13 import concurrent.futures as fut
14 import logging
15 from typing import Callable, List, Set, TypeVar
16
17 from overrides import overrides
18
19 import id_generator
20
21 # This module is commonly used by others in here and should avoid
22 # taking any unnecessary dependencies back on them.
23 from deferred_operand import DeferredOperand
24
25 logger = logging.getLogger(__name__)
26
27 T = TypeVar('T')
28
29
30 def wait_any(
31     futures: List[SmartFuture],
32     *,
33     callback: Callable = None,
34     log_exceptions: bool = True,
35     timeout: float = None,
36 ):
37     """Await the completion of any of a collection of SmartFutures and
38     invoke callback each time one completes, repeatedly, until they are
39     all finished.
40
41     Args:
42         futures: A collection of SmartFutures to wait on
43         callback: An optional callback to invoke whenever one of the
44             futures completes
45         log_exceptions: Should we log (warning + exception) any
46             underlying exceptions raised during future processing or
47             silently ignore then?
48         timeout: invoke callback with a periodicity of timeout while
49             awaiting futures
50     """
51
52     real_futures = []
53     smart_future_by_real_future = {}
54     completed_futures: Set[fut.Future] = set()
55     for x in futures:
56         assert isinstance(x, SmartFuture)
57         real_futures.append(x.wrapped_future)
58         smart_future_by_real_future[x.wrapped_future] = x
59
60     while len(completed_futures) != len(real_futures):
61         print("TOP...")
62         try:
63             newly_completed_futures = concurrent.futures.as_completed(real_futures, timeout=timeout)
64             for f in newly_completed_futures:
65                 if callback is not None:
66                     callback()
67                 completed_futures.add(f)
68                 if log_exceptions and not f.cancelled():
69                     exception = f.exception()
70                     if exception is not None:
71                         logger.warning(
72                             'Future 0x%x raised an unhandled exception and exited.', id(f)
73                         )
74                         logger.exception(exception)
75                         raise exception
76                 yield smart_future_by_real_future[f]
77         except TimeoutError:
78             print(f"HERE!!! {len(completed_futures)} / {len(real_futures)}.")
79             if callback is not None:
80                 callback()
81     if callback is not None:
82         callback()
83
84
85 def wait_all(
86     futures: List[SmartFuture],
87     *,
88     log_exceptions: bool = True,
89 ) -> None:
90     """Wait for all of the SmartFutures in the collection to finish before
91     returning.
92
93     Args:
94         futures: A collection of futures that we're waiting for
95         log_exceptions: Should we log (warning + exception) any
96             underlying exceptions raised during future processing or
97             silently ignore then?
98     """
99
100     real_futures = []
101     for x in futures:
102         assert isinstance(x, SmartFuture)
103         real_futures.append(x.wrapped_future)
104
105     (done, not_done) = concurrent.futures.wait(
106         real_futures, timeout=None, return_when=concurrent.futures.ALL_COMPLETED
107     )
108     if log_exceptions:
109         for f in real_futures:
110             if not f.cancelled():
111                 exception = f.exception()
112                 if exception is not None:
113                     logger.warning('Future 0x%x raised an unhandled exception and exited.', id(f))
114                     logger.exception(exception)
115                     raise exception
116     assert len(done) == len(real_futures)
117     assert len(not_done) == 0
118
119
120 class SmartFuture(DeferredOperand):
121     """This is a SmartFuture, a class that wraps a normal :class:`Future`
122     and can then be used, mostly, like a normal (non-Future)
123     identifier of the type of that SmartFuture's result.
124
125     Using a FutureWrapper in expressions will block and wait until
126     the result of the deferred operation is known.
127     """
128
129     def __init__(self, wrapped_future: fut.Future) -> None:
130         assert isinstance(wrapped_future, fut.Future)
131         self.wrapped_future = wrapped_future
132         self.id = id_generator.get("smart_future_id")
133
134     def get_id(self) -> int:
135         return self.id
136
137     def is_ready(self) -> bool:
138         return self.wrapped_future.done()
139
140     # You shouldn't have to call this; instead, have a look at defining a
141     # method on DeferredOperand base class.
142     @overrides
143     def _resolve(self, timeout=None) -> T:
144         return self.wrapped_future.result(timeout)