X-Git-Url: https://wannabe.guru.org/gitweb/?a=blobdiff_plain;f=smart_future.py;h=625c3ed66b63b3ec17e5dc94897a3ca1a88a5451;hb=a9bdfd8fc9f84b7b2c09a57cd12ba32259e84d1c;hp=7aac8ebb68eb3bb3af6da50b127cfb344ff925e3;hpb=532df2c5b57c7517dfb3dddd8c1358fbadf8baf3;p=python_utils.git diff --git a/smart_future.py b/smart_future.py index 7aac8eb..625c3ed 100644 --- a/smart_future.py +++ b/smart_future.py @@ -2,11 +2,10 @@ # © Copyright 2021-2022, Scott Gasch -"""A future that can be treated like the result that it contains and -will not block until it is used. At that point, if the underlying -value is not yet available, it will block until it becomes -available. - +"""A :class:`Future` that can be treated as a substutute for the result +that it contains and will not block until it is used. At that point, +if the underlying value is not yet available yet, it will block until +the internal result actually becomes available. """ from __future__ import annotations @@ -33,7 +32,23 @@ def wait_any( *, callback: Callable = None, log_exceptions: bool = True, + timeout: float = None, ): + """Await the completion of any of a collection of SmartFutures and + invoke callback each time one completes, repeatedly, until they are + all finished. + + Args: + futures: A collection of SmartFutures to wait on + callback: An optional callback to invoke whenever one of the + futures completes + log_exceptions: Should we log (warning + exception) any + underlying exceptions raised during future processing or + silently ignore then? + timeout: invoke callback with a periodicity of timeout while + awaiting futures + """ + real_futures = [] smart_future_by_real_future = {} completed_futures: Set[fut.Future] = set() @@ -43,18 +58,24 @@ def wait_any( smart_future_by_real_future[x.wrapped_future] = x while len(completed_futures) != len(real_futures): - newly_completed_futures = concurrent.futures.as_completed(real_futures) - for f in newly_completed_futures: + try: + newly_completed_futures = concurrent.futures.as_completed(real_futures, timeout=timeout) + for f in newly_completed_futures: + if callback is not None: + callback() + completed_futures.add(f) + if log_exceptions and not f.cancelled(): + exception = f.exception() + if exception is not None: + logger.warning( + 'Future 0x%x raised an unhandled exception and exited.', id(f) + ) + logger.exception(exception) + raise exception + yield smart_future_by_real_future[f] + except TimeoutError: if callback is not None: callback() - completed_futures.add(f) - if log_exceptions and not f.cancelled(): - exception = f.exception() - if exception is not None: - logger.warning('Future 0x%x raised an unhandled exception and exited.', id(f)) - logger.exception(exception) - raise exception - yield smart_future_by_real_future[f] if callback is not None: callback() @@ -64,6 +85,16 @@ def wait_all( *, log_exceptions: bool = True, ) -> None: + """Wait for all of the SmartFutures in the collection to finish before + returning. + + Args: + futures: A collection of futures that we're waiting for + log_exceptions: Should we log (warning + exception) any + underlying exceptions raised during future processing or + silently ignore then? + """ + real_futures = [] for x in futures: assert isinstance(x, SmartFuture) @@ -85,8 +116,9 @@ def wait_all( class SmartFuture(DeferredOperand): - """This is a SmartFuture, a class that wraps a normal Future and can - then be used, mostly, like a normal (non-Future) identifier. + """This is a SmartFuture, a class that wraps a normal :class:`Future` + and can then be used, mostly, like a normal (non-Future) + identifier of the type of that SmartFuture's result. Using a FutureWrapper in expressions will block and wait until the result of the deferred operation is known.