#!/usr/bin/env python3 from __future__ import annotations import concurrent import concurrent.futures as fut import logging import traceback from typing import Callable, List, TypeVar from overrides import overrides # This module is commonly used by others in here and should avoid # taking any unnecessary dependencies back on them. from deferred_operand import DeferredOperand import id_generator logger = logging.getLogger(__name__) T = TypeVar('T') def wait_any( futures: List[SmartFuture], *, callback: Callable = None, log_exceptions: bool = True, ): real_futures = [] smart_future_by_real_future = {} completed_futures = set() for _ in futures: real_futures.append(_.wrapped_future) smart_future_by_real_future[_.wrapped_future] = _ while len(completed_futures) != len(real_futures): newly_completed_futures = concurrent.futures.as_completed(real_futures) for f in newly_completed_futures: if callback is not None: callback() completed_futures.add(f) if log_exceptions and not f.cancelled(): exception = f.exception() if exception is not None: logger.exception(exception) traceback.print_tb(exception.__traceback__) yield smart_future_by_real_future[f] if callback is not None: callback() return def wait_all( futures: List[SmartFuture], *, log_exceptions: bool = True, ) -> None: real_futures = [x.wrapped_future for x in futures] (done, not_done) = concurrent.futures.wait( real_futures, timeout=None, return_when=concurrent.futures.ALL_COMPLETED ) if log_exceptions: for f in real_futures: if not f.cancelled(): exception = f.exception() if exception is not None: logger.exception(exception) traceback.print_tb(exception.__traceback__) assert len(done) == len(real_futures) assert len(not_done) == 0 class SmartFuture(DeferredOperand): """This is a SmartFuture, a class that wraps a normal Future and can then be used, mostly, like a normal (non-Future) identifier. Using a FutureWrapper in expressions will block and wait until the result of the deferred operation is known. """ def __init__(self, wrapped_future: fut.Future) -> None: self.wrapped_future = wrapped_future self.id = id_generator.get("smart_future_id") def get_id(self) -> int: return self.id def is_ready(self) -> bool: return self.wrapped_future.done() # You shouldn't have to call this; instead, have a look at defining a # method on DeferredOperand base class. @overrides def _resolve(self, *, timeout=None) -> T: return self.wrapped_future.result(timeout)