Final touches on the new test runner.
[python_utils.git] / smart_future.py
index dbce4321842995a1855788089f8e598c8ad8bd11..86f1b1c42ad32bb85eb206e51c33543a8c57f9cd 100644 (file)
@@ -2,7 +2,7 @@
 
 # © Copyright 2021-2022, Scott Gasch
 
-"""A :class:Future that can be treated as a substutute for the result
+"""A :class:`Future` that can be treated as a substutute for the result
 that it contains and will not block until it is used.  At that point,
 if the underlying value is not yet available yet, it will block until
 the internal result actually becomes available.
@@ -32,6 +32,7 @@ def wait_any(
     *,
     callback: Callable = None,
     log_exceptions: bool = True,
+    timeout: float = None,
 ):
     """Await the completion of any of a collection of SmartFutures and
     invoke callback each time one completes, repeatedly, until they are
@@ -44,6 +45,8 @@ def wait_any(
         log_exceptions: Should we log (warning + exception) any
             underlying exceptions raised during future processing or
             silently ignore then?
+        timeout: invoke callback with a periodicity of timeout while
+            awaiting futures
     """
 
     real_futures = []
@@ -55,18 +58,26 @@ def wait_any(
         smart_future_by_real_future[x.wrapped_future] = x
 
     while len(completed_futures) != len(real_futures):
-        newly_completed_futures = concurrent.futures.as_completed(real_futures)
-        for f in newly_completed_futures:
+        print("TOP...")
+        try:
+            newly_completed_futures = concurrent.futures.as_completed(real_futures, timeout=timeout)
+            for f in newly_completed_futures:
+                if callback is not None:
+                    callback()
+                completed_futures.add(f)
+                if log_exceptions and not f.cancelled():
+                    exception = f.exception()
+                    if exception is not None:
+                        logger.warning(
+                            'Future 0x%x raised an unhandled exception and exited.', id(f)
+                        )
+                        logger.exception(exception)
+                        raise exception
+                yield smart_future_by_real_future[f]
+        except TimeoutError:
+            print(f"HERE!!! {len(completed_futures)} / {len(real_futures)}.")
             if callback is not None:
                 callback()
-            completed_futures.add(f)
-            if log_exceptions and not f.cancelled():
-                exception = f.exception()
-                if exception is not None:
-                    logger.warning('Future 0x%x raised an unhandled exception and exited.', id(f))
-                    logger.exception(exception)
-                    raise exception
-            yield smart_future_by_real_future[f]
     if callback is not None:
         callback()