X-Git-Url: https://wannabe.guru.org/gitweb/?a=blobdiff_plain;f=smart_future.py;h=625c3ed66b63b3ec17e5dc94897a3ca1a88a5451;hb=e46158e49121b8a955bb07b73f5bcf9928b79c90;hp=dbce4321842995a1855788089f8e598c8ad8bd11;hpb=02302bbd9363facb59c4df2c1f4013087702cfa6;p=python_utils.git diff --git a/smart_future.py b/smart_future.py index dbce432..625c3ed 100644 --- a/smart_future.py +++ b/smart_future.py @@ -2,7 +2,7 @@ # © Copyright 2021-2022, Scott Gasch -"""A :class:Future that can be treated as a substutute for the result +"""A :class:`Future` that can be treated as a substutute for the result that it contains and will not block until it is used. At that point, if the underlying value is not yet available yet, it will block until the internal result actually becomes available. @@ -32,6 +32,7 @@ def wait_any( *, callback: Callable = None, log_exceptions: bool = True, + timeout: float = None, ): """Await the completion of any of a collection of SmartFutures and invoke callback each time one completes, repeatedly, until they are @@ -44,6 +45,8 @@ def wait_any( log_exceptions: Should we log (warning + exception) any underlying exceptions raised during future processing or silently ignore then? + timeout: invoke callback with a periodicity of timeout while + awaiting futures """ real_futures = [] @@ -55,18 +58,24 @@ def wait_any( smart_future_by_real_future[x.wrapped_future] = x while len(completed_futures) != len(real_futures): - newly_completed_futures = concurrent.futures.as_completed(real_futures) - for f in newly_completed_futures: + try: + newly_completed_futures = concurrent.futures.as_completed(real_futures, timeout=timeout) + for f in newly_completed_futures: + if callback is not None: + callback() + completed_futures.add(f) + if log_exceptions and not f.cancelled(): + exception = f.exception() + if exception is not None: + logger.warning( + 'Future 0x%x raised an unhandled exception and exited.', id(f) + ) + logger.exception(exception) + raise exception + yield smart_future_by_real_future[f] + except TimeoutError: if callback is not None: callback() - completed_futures.add(f) - if log_exceptions and not f.cancelled(): - exception = f.exception() - if exception is not None: - logger.warning('Future 0x%x raised an unhandled exception and exited.', id(f)) - logger.exception(exception) - raise exception - yield smart_future_by_real_future[f] if callback is not None: callback()