X-Git-Url: https://wannabe.guru.org/gitweb/?a=blobdiff_plain;f=smart_future.py;h=dbce4321842995a1855788089f8e598c8ad8bd11;hb=02302bbd9363facb59c4df2c1f4013087702cfa6;hp=604c149520464bcd9d8c5a55cf8905acd5ec34d4;hpb=f2b4fe83f6fc853a68653bd5e3d9fe0648c3d105;p=python_utils.git diff --git a/smart_future.py b/smart_future.py index 604c149..dbce432 100644 --- a/smart_future.py +++ b/smart_future.py @@ -1,18 +1,26 @@ #!/usr/bin/env python3 +# © Copyright 2021-2022, Scott Gasch + +"""A :class:Future that can be treated as a substutute for the result +that it contains and will not block until it is used. At that point, +if the underlying value is not yet available yet, it will block until +the internal result actually becomes available. +""" + from __future__ import annotations import concurrent import concurrent.futures as fut import logging -import traceback -from typing import Callable, List, TypeVar +from typing import Callable, List, Set, TypeVar from overrides import overrides +import id_generator + # This module is commonly used by others in here and should avoid # taking any unnecessary dependencies back on them. from deferred_operand import DeferredOperand -import id_generator logger = logging.getLogger(__name__) @@ -25,13 +33,26 @@ def wait_any( callback: Callable = None, log_exceptions: bool = True, ): + """Await the completion of any of a collection of SmartFutures and + invoke callback each time one completes, repeatedly, until they are + all finished. + + Args: + futures: A collection of SmartFutures to wait on + callback: An optional callback to invoke whenever one of the + futures completes + log_exceptions: Should we log (warning + exception) any + underlying exceptions raised during future processing or + silently ignore then? + """ + real_futures = [] smart_future_by_real_future = {} - completed_futures = set() - for f in futures: - assert type(f) == SmartFuture - real_futures.append(f.wrapped_future) - smart_future_by_real_future[f.wrapped_future] = f + completed_futures: Set[fut.Future] = set() + for x in futures: + assert isinstance(x, SmartFuture) + real_futures.append(x.wrapped_future) + smart_future_by_real_future[x.wrapped_future] = x while len(completed_futures) != len(real_futures): newly_completed_futures = concurrent.futures.as_completed(real_futures) @@ -42,15 +63,12 @@ def wait_any( if log_exceptions and not f.cancelled(): exception = f.exception() if exception is not None: - logger.warning( - f'Future {id(f)} raised an unhandled exception and exited.' - ) + logger.warning('Future 0x%x raised an unhandled exception and exited.', id(f)) logger.exception(exception) raise exception yield smart_future_by_real_future[f] if callback is not None: callback() - return def wait_all( @@ -58,10 +76,20 @@ def wait_all( *, log_exceptions: bool = True, ) -> None: + """Wait for all of the SmartFutures in the collection to finish before + returning. + + Args: + futures: A collection of futures that we're waiting for + log_exceptions: Should we log (warning + exception) any + underlying exceptions raised during future processing or + silently ignore then? + """ + real_futures = [] - for f in futures: - assert type(f) == SmartFuture - real_futures.append(f.wrapped_future) + for x in futures: + assert isinstance(x, SmartFuture) + real_futures.append(x.wrapped_future) (done, not_done) = concurrent.futures.wait( real_futures, timeout=None, return_when=concurrent.futures.ALL_COMPLETED @@ -71,9 +99,7 @@ def wait_all( if not f.cancelled(): exception = f.exception() if exception is not None: - logger.warning( - f'Future {id(f)} raised an unhandled exception and exited.' - ) + logger.warning('Future 0x%x raised an unhandled exception and exited.', id(f)) logger.exception(exception) raise exception assert len(done) == len(real_futures) @@ -81,15 +107,16 @@ def wait_all( class SmartFuture(DeferredOperand): - """This is a SmartFuture, a class that wraps a normal Future and can - then be used, mostly, like a normal (non-Future) identifier. + """This is a SmartFuture, a class that wraps a normal :class:`Future` + and can then be used, mostly, like a normal (non-Future) + identifier of the type of that SmartFuture's result. Using a FutureWrapper in expressions will block and wait until the result of the deferred operation is known. """ def __init__(self, wrapped_future: fut.Future) -> None: - assert type(wrapped_future) == fut.Future + assert isinstance(wrapped_future, fut.Future) self.wrapped_future = wrapped_future self.id = id_generator.get("smart_future_id") @@ -102,5 +129,5 @@ class SmartFuture(DeferredOperand): # You shouldn't have to call this; instead, have a look at defining a # method on DeferredOperand base class. @overrides - def _resolve(self, *, timeout=None) -> T: + def _resolve(self, timeout=None) -> T: return self.wrapped_future.result(timeout)