#!/usr/bin/env python3
+# © Copyright 2021-2022, Scott Gasch
+
+"""A :class:`Future` that can be treated as a substutute for the result
+that it contains and will not block until it is used. At that point,
+if the underlying value is not yet available yet, it will block until
+the internal result actually becomes available.
+"""
+
from __future__ import annotations
-from collections.abc import Mapping
+import concurrent
import concurrent.futures as fut
-import time
-from typing import Callable, List, TypeVar
+import logging
+from typing import Callable, List, Set, TypeVar
+
+from overrides import overrides
+
+import id_generator
# This module is commonly used by others in here and should avoid
# taking any unnecessary dependencies back on them.
from deferred_operand import DeferredOperand
-import id_generator
+
+logger = logging.getLogger(__name__)
T = TypeVar('T')
-def wait_any(futures: List[SmartFuture], *, callback: Callable = None):
- finished: Mapping[int, bool] = {}
- x = 0
- while True:
- future = futures[x]
- if not finished.get(future.get_id(), False):
- if future.is_ready():
- finished[future.get_id()] = True
- yield future
- else:
+def wait_any(
+ futures: List[SmartFuture],
+ *,
+ callback: Callable = None,
+ log_exceptions: bool = True,
+ timeout: float = None,
+):
+ """Await the completion of any of a collection of SmartFutures and
+ invoke callback each time one completes, repeatedly, until they are
+ all finished.
+
+ Args:
+ futures: A collection of SmartFutures to wait on
+ callback: An optional callback to invoke whenever one of the
+ futures completes
+ log_exceptions: Should we log (warning + exception) any
+ underlying exceptions raised during future processing or
+ silently ignore then?
+ timeout: invoke callback with a periodicity of timeout while
+ awaiting futures
+ """
+
+ real_futures = []
+ smart_future_by_real_future = {}
+ completed_futures: Set[fut.Future] = set()
+ for x in futures:
+ assert isinstance(x, SmartFuture)
+ real_futures.append(x.wrapped_future)
+ smart_future_by_real_future[x.wrapped_future] = x
+
+ while len(completed_futures) != len(real_futures):
+ try:
+ newly_completed_futures = concurrent.futures.as_completed(real_futures, timeout=timeout)
+ for f in newly_completed_futures:
if callback is not None:
callback()
- time.sleep(0.1)
- x += 1
- if x >= len(futures):
- x = 0
- if len(finished) == len(futures):
+ completed_futures.add(f)
+ if log_exceptions and not f.cancelled():
+ exception = f.exception()
+ if exception is not None:
+ logger.warning(
+ 'Future 0x%x raised an unhandled exception and exited.', id(f)
+ )
+ logger.exception(exception)
+ raise exception
+ yield smart_future_by_real_future[f]
+ except TimeoutError:
if callback is not None:
callback()
- return
+ if callback is not None:
+ callback()
+
+
+def wait_all(
+ futures: List[SmartFuture],
+ *,
+ log_exceptions: bool = True,
+) -> None:
+ """Wait for all of the SmartFutures in the collection to finish before
+ returning.
+
+ Args:
+ futures: A collection of futures that we're waiting for
+ log_exceptions: Should we log (warning + exception) any
+ underlying exceptions raised during future processing or
+ silently ignore then?
+ """
+
+ real_futures = []
+ for x in futures:
+ assert isinstance(x, SmartFuture)
+ real_futures.append(x.wrapped_future)
+
+ (done, not_done) = concurrent.futures.wait(
+ real_futures, timeout=None, return_when=concurrent.futures.ALL_COMPLETED
+ )
+ if log_exceptions:
+ for f in real_futures:
+ if not f.cancelled():
+ exception = f.exception()
+ if exception is not None:
+ logger.warning('Future 0x%x raised an unhandled exception and exited.', id(f))
+ logger.exception(exception)
+ raise exception
+ assert len(done) == len(real_futures)
+ assert len(not_done) == 0
class SmartFuture(DeferredOperand):
- """This is a SmartFuture, a class that wraps a normal Future and can
- then be used, mostly, like a normal (non-Future) identifier.
+ """This is a SmartFuture, a class that wraps a normal :class:`Future`
+ and can then be used, mostly, like a normal (non-Future)
+ identifier of the type of that SmartFuture's result.
Using a FutureWrapper in expressions will block and wait until
the result of the deferred operation is known.
"""
def __init__(self, wrapped_future: fut.Future) -> None:
+ assert isinstance(wrapped_future, fut.Future)
self.wrapped_future = wrapped_future
self.id = id_generator.get("smart_future_id")
# You shouldn't have to call this; instead, have a look at defining a
# method on DeferredOperand base class.
- def _resolve(self, *, timeout=None) -> T:
+ @overrides
+ def _resolve(self, timeout=None) -> T:
return self.wrapped_future.result(timeout)