Reduce the doctest lease duration...
[python_utils.git] / smart_future.py
index dbce4321842995a1855788089f8e598c8ad8bd11..625c3ed66b63b3ec17e5dc94897a3ca1a88a5451 100644 (file)
@@ -2,7 +2,7 @@
 
 # © Copyright 2021-2022, Scott Gasch
 
-"""A :class:Future that can be treated as a substutute for the result
+"""A :class:`Future` that can be treated as a substutute for the result
 that it contains and will not block until it is used.  At that point,
 if the underlying value is not yet available yet, it will block until
 the internal result actually becomes available.
@@ -32,6 +32,7 @@ def wait_any(
     *,
     callback: Callable = None,
     log_exceptions: bool = True,
+    timeout: float = None,
 ):
     """Await the completion of any of a collection of SmartFutures and
     invoke callback each time one completes, repeatedly, until they are
@@ -44,6 +45,8 @@ def wait_any(
         log_exceptions: Should we log (warning + exception) any
             underlying exceptions raised during future processing or
             silently ignore then?
+        timeout: invoke callback with a periodicity of timeout while
+            awaiting futures
     """
 
     real_futures = []
@@ -55,18 +58,24 @@ def wait_any(
         smart_future_by_real_future[x.wrapped_future] = x
 
     while len(completed_futures) != len(real_futures):
-        newly_completed_futures = concurrent.futures.as_completed(real_futures)
-        for f in newly_completed_futures:
+        try:
+            newly_completed_futures = concurrent.futures.as_completed(real_futures, timeout=timeout)
+            for f in newly_completed_futures:
+                if callback is not None:
+                    callback()
+                completed_futures.add(f)
+                if log_exceptions and not f.cancelled():
+                    exception = f.exception()
+                    if exception is not None:
+                        logger.warning(
+                            'Future 0x%x raised an unhandled exception and exited.', id(f)
+                        )
+                        logger.exception(exception)
+                        raise exception
+                yield smart_future_by_real_future[f]
+        except TimeoutError:
             if callback is not None:
                 callback()
-            completed_futures.add(f)
-            if log_exceptions and not f.cancelled():
-                exception = f.exception()
-                if exception is not None:
-                    logger.warning('Future 0x%x raised an unhandled exception and exited.', id(f))
-                    logger.exception(exception)
-                    raise exception
-            yield smart_future_by_real_future[f]
     if callback is not None:
         callback()