Improve docstrings for sphinx.
[python_utils.git] / smart_future.py
1 #!/usr/bin/env python3
2
3 # © Copyright 2021-2022, Scott Gasch
4
5 """A :class:Future that can be treated as a substutute for the result
6 that it contains and will not block until it is used.  At that point,
7 if the underlying value is not yet available yet, it will block until
8 the internal result actually becomes available.
9 """
10
11 from __future__ import annotations
12 import concurrent
13 import concurrent.futures as fut
14 import logging
15 from typing import Callable, List, Set, TypeVar
16
17 from overrides import overrides
18
19 import id_generator
20
21 # This module is commonly used by others in here and should avoid
22 # taking any unnecessary dependencies back on them.
23 from deferred_operand import DeferredOperand
24
25 logger = logging.getLogger(__name__)
26
27 T = TypeVar('T')
28
29
30 def wait_any(
31     futures: List[SmartFuture],
32     *,
33     callback: Callable = None,
34     log_exceptions: bool = True,
35 ):
36     """Await the completion of any of a collection of SmartFutures and
37     invoke callback each time one completes, repeatedly, until they are
38     all finished.
39
40     Args:
41         futures: A collection of SmartFutures to wait on
42         callback: An optional callback to invoke whenever one of the
43             futures completes
44         log_exceptions: Should we log (warning + exception) any
45             underlying exceptions raised during future processing or
46             silently ignore then?
47     """
48
49     real_futures = []
50     smart_future_by_real_future = {}
51     completed_futures: Set[fut.Future] = set()
52     for x in futures:
53         assert isinstance(x, SmartFuture)
54         real_futures.append(x.wrapped_future)
55         smart_future_by_real_future[x.wrapped_future] = x
56
57     while len(completed_futures) != len(real_futures):
58         newly_completed_futures = concurrent.futures.as_completed(real_futures)
59         for f in newly_completed_futures:
60             if callback is not None:
61                 callback()
62             completed_futures.add(f)
63             if log_exceptions and not f.cancelled():
64                 exception = f.exception()
65                 if exception is not None:
66                     logger.warning('Future 0x%x raised an unhandled exception and exited.', id(f))
67                     logger.exception(exception)
68                     raise exception
69             yield smart_future_by_real_future[f]
70     if callback is not None:
71         callback()
72
73
74 def wait_all(
75     futures: List[SmartFuture],
76     *,
77     log_exceptions: bool = True,
78 ) -> None:
79     """Wait for all of the SmartFutures in the collection to finish before
80     returning.
81
82     Args:
83         futures: A collection of futures that we're waiting for
84         log_exceptions: Should we log (warning + exception) any
85             underlying exceptions raised during future processing or
86             silently ignore then?
87     """
88
89     real_futures = []
90     for x in futures:
91         assert isinstance(x, SmartFuture)
92         real_futures.append(x.wrapped_future)
93
94     (done, not_done) = concurrent.futures.wait(
95         real_futures, timeout=None, return_when=concurrent.futures.ALL_COMPLETED
96     )
97     if log_exceptions:
98         for f in real_futures:
99             if not f.cancelled():
100                 exception = f.exception()
101                 if exception is not None:
102                     logger.warning('Future 0x%x raised an unhandled exception and exited.', id(f))
103                     logger.exception(exception)
104                     raise exception
105     assert len(done) == len(real_futures)
106     assert len(not_done) == 0
107
108
109 class SmartFuture(DeferredOperand):
110     """This is a SmartFuture, a class that wraps a normal :class:`Future`
111     and can then be used, mostly, like a normal (non-Future)
112     identifier of the type of that SmartFuture's result.
113
114     Using a FutureWrapper in expressions will block and wait until
115     the result of the deferred operation is known.
116     """
117
118     def __init__(self, wrapped_future: fut.Future) -> None:
119         assert isinstance(wrapped_future, fut.Future)
120         self.wrapped_future = wrapped_future
121         self.id = id_generator.get("smart_future_id")
122
123     def get_id(self) -> int:
124         return self.id
125
126     def is_ready(self) -> bool:
127         return self.wrapped_future.done()
128
129     # You shouldn't have to call this; instead, have a look at defining a
130     # method on DeferredOperand base class.
131     @overrides
132     def _resolve(self, timeout=None) -> T:
133         return self.wrapped_future.result(timeout)