Reduce the doctest lease duration...
[python_utils.git] / smart_future.py
index 7768599b419e014375a3750ee592c168e21c123e..625c3ed66b63b3ec17e5dc94897a3ca1a88a5451 100644 (file)
@@ -2,12 +2,10 @@
 
 # © Copyright 2021-2022, Scott Gasch
 
-"""
-A future that can be treated as a substutute for the result that it
-contains and will not block until it is used.  At that point, if the
-underlying value is not yet available yet, it will block until the
-internal result actually becomes available.
-
+"""A :class:`Future` that can be treated as a substutute for the result
+that it contains and will not block until it is used.  At that point,
+if the underlying value is not yet available yet, it will block until
+the internal result actually becomes available.
 """
 
 from __future__ import annotations
@@ -34,6 +32,7 @@ def wait_any(
     *,
     callback: Callable = None,
     log_exceptions: bool = True,
+    timeout: float = None,
 ):
     """Await the completion of any of a collection of SmartFutures and
     invoke callback each time one completes, repeatedly, until they are
@@ -46,6 +45,8 @@ def wait_any(
         log_exceptions: Should we log (warning + exception) any
             underlying exceptions raised during future processing or
             silently ignore then?
+        timeout: invoke callback with a periodicity of timeout while
+            awaiting futures
     """
 
     real_futures = []
@@ -57,18 +58,24 @@ def wait_any(
         smart_future_by_real_future[x.wrapped_future] = x
 
     while len(completed_futures) != len(real_futures):
-        newly_completed_futures = concurrent.futures.as_completed(real_futures)
-        for f in newly_completed_futures:
+        try:
+            newly_completed_futures = concurrent.futures.as_completed(real_futures, timeout=timeout)
+            for f in newly_completed_futures:
+                if callback is not None:
+                    callback()
+                completed_futures.add(f)
+                if log_exceptions and not f.cancelled():
+                    exception = f.exception()
+                    if exception is not None:
+                        logger.warning(
+                            'Future 0x%x raised an unhandled exception and exited.', id(f)
+                        )
+                        logger.exception(exception)
+                        raise exception
+                yield smart_future_by_real_future[f]
+        except TimeoutError:
             if callback is not None:
                 callback()
-            completed_futures.add(f)
-            if log_exceptions and not f.cancelled():
-                exception = f.exception()
-                if exception is not None:
-                    logger.warning('Future 0x%x raised an unhandled exception and exited.', id(f))
-                    logger.exception(exception)
-                    raise exception
-            yield smart_future_by_real_future[f]
     if callback is not None:
         callback()