Cleanup geocode.
[python_utils.git] / smart_future.py
1 #!/usr/bin/env python3
2
3 # © Copyright 2021-2022, Scott Gasch
4
5 """A future that can be treated like the result that it contains and
6 will not block until it is used.  At that point, if the underlying
7 value is not yet available, it will block until it becomes
8 available.
9
10 """
11
12 from __future__ import annotations
13 import concurrent
14 import concurrent.futures as fut
15 import logging
16 from typing import Callable, List, Set, TypeVar
17
18 from overrides import overrides
19
20 import id_generator
21
22 # This module is commonly used by others in here and should avoid
23 # taking any unnecessary dependencies back on them.
24 from deferred_operand import DeferredOperand
25
26 logger = logging.getLogger(__name__)
27
28 T = TypeVar('T')
29
30
31 def wait_any(
32     futures: List[SmartFuture],
33     *,
34     callback: Callable = None,
35     log_exceptions: bool = True,
36 ):
37     real_futures = []
38     smart_future_by_real_future = {}
39     completed_futures: Set[fut.Future] = set()
40     for x in futures:
41         assert isinstance(x, SmartFuture)
42         real_futures.append(x.wrapped_future)
43         smart_future_by_real_future[x.wrapped_future] = x
44
45     while len(completed_futures) != len(real_futures):
46         newly_completed_futures = concurrent.futures.as_completed(real_futures)
47         for f in newly_completed_futures:
48             if callback is not None:
49                 callback()
50             completed_futures.add(f)
51             if log_exceptions and not f.cancelled():
52                 exception = f.exception()
53                 if exception is not None:
54                     logger.warning('Future 0x%x raised an unhandled exception and exited.', id(f))
55                     logger.exception(exception)
56                     raise exception
57             yield smart_future_by_real_future[f]
58     if callback is not None:
59         callback()
60
61
62 def wait_all(
63     futures: List[SmartFuture],
64     *,
65     log_exceptions: bool = True,
66 ) -> None:
67     real_futures = []
68     for x in futures:
69         assert isinstance(x, SmartFuture)
70         real_futures.append(x.wrapped_future)
71
72     (done, not_done) = concurrent.futures.wait(
73         real_futures, timeout=None, return_when=concurrent.futures.ALL_COMPLETED
74     )
75     if log_exceptions:
76         for f in real_futures:
77             if not f.cancelled():
78                 exception = f.exception()
79                 if exception is not None:
80                     logger.warning('Future 0x%x raised an unhandled exception and exited.', id(f))
81                     logger.exception(exception)
82                     raise exception
83     assert len(done) == len(real_futures)
84     assert len(not_done) == 0
85
86
87 class SmartFuture(DeferredOperand):
88     """This is a SmartFuture, a class that wraps a normal Future and can
89     then be used, mostly, like a normal (non-Future) identifier.
90
91     Using a FutureWrapper in expressions will block and wait until
92     the result of the deferred operation is known.
93     """
94
95     def __init__(self, wrapped_future: fut.Future) -> None:
96         assert isinstance(wrapped_future, fut.Future)
97         self.wrapped_future = wrapped_future
98         self.id = id_generator.get("smart_future_id")
99
100     def get_id(self) -> int:
101         return self.id
102
103     def is_ready(self) -> bool:
104         return self.wrapped_future.done()
105
106     # You shouldn't have to call this; instead, have a look at defining a
107     # method on DeferredOperand base class.
108     @overrides
109     def _resolve(self, timeout=None) -> T:
110         return self.wrapped_future.result(timeout)