X-Git-Url: https://wannabe.guru.org/gitweb/?a=blobdiff_plain;ds=sidebyside;f=smart_future.py;h=625c3ed66b63b3ec17e5dc94897a3ca1a88a5451;hb=e46158e49121b8a955bb07b73f5bcf9928b79c90;hp=c0fce3aff32b9c103b2f3cbbdb3684cf14e03199;hpb=ec7c26dad2c26e4e4c33a69252654e3b1155bc7b;p=python_utils.git diff --git a/smart_future.py b/smart_future.py index c0fce3a..625c3ed 100644 --- a/smart_future.py +++ b/smart_future.py @@ -32,6 +32,7 @@ def wait_any( *, callback: Callable = None, log_exceptions: bool = True, + timeout: float = None, ): """Await the completion of any of a collection of SmartFutures and invoke callback each time one completes, repeatedly, until they are @@ -44,6 +45,8 @@ def wait_any( log_exceptions: Should we log (warning + exception) any underlying exceptions raised during future processing or silently ignore then? + timeout: invoke callback with a periodicity of timeout while + awaiting futures """ real_futures = [] @@ -55,18 +58,24 @@ def wait_any( smart_future_by_real_future[x.wrapped_future] = x while len(completed_futures) != len(real_futures): - newly_completed_futures = concurrent.futures.as_completed(real_futures) - for f in newly_completed_futures: + try: + newly_completed_futures = concurrent.futures.as_completed(real_futures, timeout=timeout) + for f in newly_completed_futures: + if callback is not None: + callback() + completed_futures.add(f) + if log_exceptions and not f.cancelled(): + exception = f.exception() + if exception is not None: + logger.warning( + 'Future 0x%x raised an unhandled exception and exited.', id(f) + ) + logger.exception(exception) + raise exception + yield smart_future_by_real_future[f] + except TimeoutError: if callback is not None: callback() - completed_futures.add(f) - if log_exceptions and not f.cancelled(): - exception = f.exception() - if exception is not None: - logger.warning('Future 0x%x raised an unhandled exception and exited.', id(f)) - logger.exception(exception) - raise exception - yield smart_future_by_real_future[f] if callback is not None: callback()