Easier and more self documenting patterns for loading/saving Persistent
[python_utils.git] / smart_future.py
1 #!/usr/bin/env python3
2
3 # © Copyright 2021-2022, Scott Gasch
4
5 """A :class:`Future` that can be treated as a substutute for the result
6 that it contains and will not block until it is used.  At that point,
7 if the underlying value is not yet available yet, it will block until
8 the internal result actually becomes available.
9 """
10
11 from __future__ import annotations
12 import concurrent
13 import concurrent.futures as fut
14 import logging
15 from typing import Callable, List, Set, TypeVar
16
17 from overrides import overrides
18
19 import id_generator
20
21 # This module is commonly used by others in here and should avoid
22 # taking any unnecessary dependencies back on them.
23 from deferred_operand import DeferredOperand
24
25 logger = logging.getLogger(__name__)
26
27 T = TypeVar('T')
28
29
30 def wait_any(
31     futures: List[SmartFuture],
32     *,
33     callback: Callable = None,
34     log_exceptions: bool = True,
35     timeout: float = None,
36 ):
37     """Await the completion of any of a collection of SmartFutures and
38     invoke callback each time one completes, repeatedly, until they are
39     all finished.
40
41     Args:
42         futures: A collection of SmartFutures to wait on
43         callback: An optional callback to invoke whenever one of the
44             futures completes
45         log_exceptions: Should we log (warning + exception) any
46             underlying exceptions raised during future processing or
47             silently ignore then?
48         timeout: invoke callback with a periodicity of timeout while
49             awaiting futures
50     """
51
52     real_futures = []
53     smart_future_by_real_future = {}
54     completed_futures: Set[fut.Future] = set()
55     for x in futures:
56         assert isinstance(x, SmartFuture)
57         real_futures.append(x.wrapped_future)
58         smart_future_by_real_future[x.wrapped_future] = x
59
60     while len(completed_futures) != len(real_futures):
61         try:
62             newly_completed_futures = concurrent.futures.as_completed(real_futures, timeout=timeout)
63             for f in newly_completed_futures:
64                 if callback is not None:
65                     callback()
66                 completed_futures.add(f)
67                 if log_exceptions and not f.cancelled():
68                     exception = f.exception()
69                     if exception is not None:
70                         logger.warning(
71                             'Future 0x%x raised an unhandled exception and exited.', id(f)
72                         )
73                         logger.exception(exception)
74                         raise exception
75                 yield smart_future_by_real_future[f]
76         except TimeoutError:
77             if callback is not None:
78                 callback()
79     if callback is not None:
80         callback()
81
82
83 def wait_all(
84     futures: List[SmartFuture],
85     *,
86     log_exceptions: bool = True,
87 ) -> None:
88     """Wait for all of the SmartFutures in the collection to finish before
89     returning.
90
91     Args:
92         futures: A collection of futures that we're waiting for
93         log_exceptions: Should we log (warning + exception) any
94             underlying exceptions raised during future processing or
95             silently ignore then?
96     """
97
98     real_futures = []
99     for x in futures:
100         assert isinstance(x, SmartFuture)
101         real_futures.append(x.wrapped_future)
102
103     (done, not_done) = concurrent.futures.wait(
104         real_futures, timeout=None, return_when=concurrent.futures.ALL_COMPLETED
105     )
106     if log_exceptions:
107         for f in real_futures:
108             if not f.cancelled():
109                 exception = f.exception()
110                 if exception is not None:
111                     logger.warning('Future 0x%x raised an unhandled exception and exited.', id(f))
112                     logger.exception(exception)
113                     raise exception
114     assert len(done) == len(real_futures)
115     assert len(not_done) == 0
116
117
118 class SmartFuture(DeferredOperand):
119     """This is a SmartFuture, a class that wraps a normal :class:`Future`
120     and can then be used, mostly, like a normal (non-Future)
121     identifier of the type of that SmartFuture's result.
122
123     Using a FutureWrapper in expressions will block and wait until
124     the result of the deferred operation is known.
125     """
126
127     def __init__(self, wrapped_future: fut.Future) -> None:
128         assert isinstance(wrapped_future, fut.Future)
129         self.wrapped_future = wrapped_future
130         self.id = id_generator.get("smart_future_id")
131
132     def get_id(self) -> int:
133         return self.id
134
135     def is_ready(self) -> bool:
136         return self.wrapped_future.done()
137
138     # You shouldn't have to call this; instead, have a look at defining a
139     # method on DeferredOperand base class.
140     @overrides
141     def _resolve(self, timeout=None) -> T:
142         return self.wrapped_future.result(timeout)