Initial stab at a smarter doc/unit/integration/coverage test runner.
[python_utils.git] / smart_future.py
index c0fce3aff32b9c103b2f3cbbdb3684cf14e03199..86f1b1c42ad32bb85eb206e51c33543a8c57f9cd 100644 (file)
@@ -32,6 +32,7 @@ def wait_any(
     *,
     callback: Callable = None,
     log_exceptions: bool = True,
+    timeout: float = None,
 ):
     """Await the completion of any of a collection of SmartFutures and
     invoke callback each time one completes, repeatedly, until they are
@@ -44,6 +45,8 @@ def wait_any(
         log_exceptions: Should we log (warning + exception) any
             underlying exceptions raised during future processing or
             silently ignore then?
+        timeout: invoke callback with a periodicity of timeout while
+            awaiting futures
     """
 
     real_futures = []
@@ -55,18 +58,26 @@ def wait_any(
         smart_future_by_real_future[x.wrapped_future] = x
 
     while len(completed_futures) != len(real_futures):
-        newly_completed_futures = concurrent.futures.as_completed(real_futures)
-        for f in newly_completed_futures:
+        print("TOP...")
+        try:
+            newly_completed_futures = concurrent.futures.as_completed(real_futures, timeout=timeout)
+            for f in newly_completed_futures:
+                if callback is not None:
+                    callback()
+                completed_futures.add(f)
+                if log_exceptions and not f.cancelled():
+                    exception = f.exception()
+                    if exception is not None:
+                        logger.warning(
+                            'Future 0x%x raised an unhandled exception and exited.', id(f)
+                        )
+                        logger.exception(exception)
+                        raise exception
+                yield smart_future_by_real_future[f]
+        except TimeoutError:
+            print(f"HERE!!! {len(completed_futures)} / {len(real_futures)}.")
             if callback is not None:
                 callback()
-            completed_futures.add(f)
-            if log_exceptions and not f.cancelled():
-                exception = f.exception()
-                if exception is not None:
-                    logger.warning('Future 0x%x raised an unhandled exception and exited.', id(f))
-                    logger.exception(exception)
-                    raise exception
-            yield smart_future_by_real_future[f]
     if callback is not None:
         callback()