#!/usr/bin/env python3 from __future__ import annotations from collections.abc import Mapping import concurrent.futures as fut import time from typing import Callable, List, TypeVar # This module is commonly used by others in here and should avoid # taking any unnecessary dependencies back on them. from deferred_operand import DeferredOperand import id_generator T = TypeVar('T') def wait_any(futures: List[SmartFuture], *, callback: Callable = None): finished: Mapping[int, bool] = {} x = 0 while True: future = futures[x] if not finished.get(future.get_id(), False): if future.is_ready(): finished[future.get_id()] = True yield future else: if callback is not None: callback() time.sleep(0.1) x += 1 if x >= len(futures): x = 0 if len(finished) == len(futures): if callback is not None: callback() return def wait_all(futures: List[SmartFuture]) -> None: done_set = set() while len(done_set) < len(futures): for future in futures: i = future.get_id() if i not in done_set and future.wrapped_future.done(): done_set.add(i) time.sleep(0.1) class SmartFuture(DeferredOperand): """This is a SmartFuture, a class that wraps a normal Future and can then be used, mostly, like a normal (non-Future) identifier. Using a FutureWrapper in expressions will block and wait until the result of the deferred operation is known. """ def __init__(self, wrapped_future: fut.Future) -> None: self.wrapped_future = wrapped_future self.id = id_generator.get("smart_future_id") def get_id(self) -> int: return self.id def is_ready(self) -> bool: return self.wrapped_future.done() # You shouldn't have to call this; instead, have a look at defining a # method on DeferredOperand base class. def _resolve(self, *, timeout=None) -> T: return self.wrapped_future.result(timeout)