#!/usr/bin/env python3 # © Copyright 2021-2022, Scott Gasch """A future that can be treated like the result that it contains and will not block until it is used. At that point, if the underlying value is not yet available, it will block until it becomes available. """ from __future__ import annotations import concurrent import concurrent.futures as fut import logging from typing import Callable, List, Set, TypeVar from overrides import overrides import id_generator # This module is commonly used by others in here and should avoid # taking any unnecessary dependencies back on them. from deferred_operand import DeferredOperand logger = logging.getLogger(__name__) T = TypeVar('T') def wait_any( futures: List[SmartFuture], *, callback: Callable = None, log_exceptions: bool = True, ): real_futures = [] smart_future_by_real_future = {} completed_futures: Set[fut.Future] = set() for x in futures: assert isinstance(x, SmartFuture) real_futures.append(x.wrapped_future) smart_future_by_real_future[x.wrapped_future] = x while len(completed_futures) != len(real_futures): newly_completed_futures = concurrent.futures.as_completed(real_futures) for f in newly_completed_futures: if callback is not None: callback() completed_futures.add(f) if log_exceptions and not f.cancelled(): exception = f.exception() if exception is not None: logger.warning('Future 0x%x raised an unhandled exception and exited.', id(f)) logger.exception(exception) raise exception yield smart_future_by_real_future[f] if callback is not None: callback() def wait_all( futures: List[SmartFuture], *, log_exceptions: bool = True, ) -> None: real_futures = [] for x in futures: assert isinstance(x, SmartFuture) real_futures.append(x.wrapped_future) (done, not_done) = concurrent.futures.wait( real_futures, timeout=None, return_when=concurrent.futures.ALL_COMPLETED ) if log_exceptions: for f in real_futures: if not f.cancelled(): exception = f.exception() if exception is not None: logger.warning('Future 0x%x raised an unhandled exception and exited.', id(f)) logger.exception(exception) raise exception assert len(done) == len(real_futures) assert len(not_done) == 0 class SmartFuture(DeferredOperand): """This is a SmartFuture, a class that wraps a normal Future and can then be used, mostly, like a normal (non-Future) identifier. Using a FutureWrapper in expressions will block and wait until the result of the deferred operation is known. """ def __init__(self, wrapped_future: fut.Future) -> None: assert isinstance(wrapped_future, fut.Future) self.wrapped_future = wrapped_future self.id = id_generator.get("smart_future_id") def get_id(self) -> int: return self.id def is_ready(self) -> bool: return self.wrapped_future.done() # You shouldn't have to call this; instead, have a look at defining a # method on DeferredOperand base class. @overrides def _resolve(self, timeout=None) -> T: return self.wrapped_future.result(timeout)