X-Git-Url: https://wannabe.guru.org/gitweb/?a=blobdiff_plain;f=smart_future.py;h=625c3ed66b63b3ec17e5dc94897a3ca1a88a5451;hb=e46158e49121b8a955bb07b73f5bcf9928b79c90;hp=e4832d43d5b1674988628e5dae43a67cf8ed0565;hpb=3bc4daf1edc121cd633429187392227f2fa61885;p=python_utils.git diff --git a/smart_future.py b/smart_future.py index e4832d4..625c3ed 100644 --- a/smart_future.py +++ b/smart_future.py @@ -1,48 +1,131 @@ #!/usr/bin/env python3 +# © Copyright 2021-2022, Scott Gasch + +"""A :class:`Future` that can be treated as a substutute for the result +that it contains and will not block until it is used. At that point, +if the underlying value is not yet available yet, it will block until +the internal result actually becomes available. +""" + from __future__ import annotations -from collections.abc import Mapping +import concurrent import concurrent.futures as fut -import time -from typing import Callable, List, TypeVar +import logging +from typing import Callable, List, Set, TypeVar + +from overrides import overrides -from deferred_operand import DeferredOperand import id_generator +# This module is commonly used by others in here and should avoid +# taking any unnecessary dependencies back on them. +from deferred_operand import DeferredOperand + +logger = logging.getLogger(__name__) + T = TypeVar('T') -def wait_any(futures: List[SmartFuture], *, callback: Callable = None): - finished: Mapping[int, bool] = {} - x = 0 - while True: - future = futures[x] - if not finished.get(future.get_id(), False): - if future.is_ready(): - finished[future.get_id()] = True - yield future - else: +def wait_any( + futures: List[SmartFuture], + *, + callback: Callable = None, + log_exceptions: bool = True, + timeout: float = None, +): + """Await the completion of any of a collection of SmartFutures and + invoke callback each time one completes, repeatedly, until they are + all finished. + + Args: + futures: A collection of SmartFutures to wait on + callback: An optional callback to invoke whenever one of the + futures completes + log_exceptions: Should we log (warning + exception) any + underlying exceptions raised during future processing or + silently ignore then? + timeout: invoke callback with a periodicity of timeout while + awaiting futures + """ + + real_futures = [] + smart_future_by_real_future = {} + completed_futures: Set[fut.Future] = set() + for x in futures: + assert isinstance(x, SmartFuture) + real_futures.append(x.wrapped_future) + smart_future_by_real_future[x.wrapped_future] = x + + while len(completed_futures) != len(real_futures): + try: + newly_completed_futures = concurrent.futures.as_completed(real_futures, timeout=timeout) + for f in newly_completed_futures: if callback is not None: callback() - time.sleep(0.1) - x += 1 - if x >= len(futures): - x = 0 - if len(finished) == len(futures): + completed_futures.add(f) + if log_exceptions and not f.cancelled(): + exception = f.exception() + if exception is not None: + logger.warning( + 'Future 0x%x raised an unhandled exception and exited.', id(f) + ) + logger.exception(exception) + raise exception + yield smart_future_by_real_future[f] + except TimeoutError: if callback is not None: callback() - return + if callback is not None: + callback() + + +def wait_all( + futures: List[SmartFuture], + *, + log_exceptions: bool = True, +) -> None: + """Wait for all of the SmartFutures in the collection to finish before + returning. + + Args: + futures: A collection of futures that we're waiting for + log_exceptions: Should we log (warning + exception) any + underlying exceptions raised during future processing or + silently ignore then? + """ + + real_futures = [] + for x in futures: + assert isinstance(x, SmartFuture) + real_futures.append(x.wrapped_future) + + (done, not_done) = concurrent.futures.wait( + real_futures, timeout=None, return_when=concurrent.futures.ALL_COMPLETED + ) + if log_exceptions: + for f in real_futures: + if not f.cancelled(): + exception = f.exception() + if exception is not None: + logger.warning('Future 0x%x raised an unhandled exception and exited.', id(f)) + logger.exception(exception) + raise exception + assert len(done) == len(real_futures) + assert len(not_done) == 0 class SmartFuture(DeferredOperand): - """This is a SmartFuture, a class that wraps a normal Future and can - then be used, mostly, like a normal (non-Future) identifier. + """This is a SmartFuture, a class that wraps a normal :class:`Future` + and can then be used, mostly, like a normal (non-Future) + identifier of the type of that SmartFuture's result. Using a FutureWrapper in expressions will block and wait until the result of the deferred operation is known. """ def __init__(self, wrapped_future: fut.Future) -> None: + assert isinstance(wrapped_future, fut.Future) self.wrapped_future = wrapped_future self.id = id_generator.get("smart_future_id") @@ -54,5 +137,6 @@ class SmartFuture(DeferredOperand): # You shouldn't have to call this; instead, have a look at defining a # method on DeferredOperand base class. - def _resolve(self, *, timeout=None) -> T: + @overrides + def _resolve(self, timeout=None) -> T: return self.wrapped_future.result(timeout)