Reduce the doctest lease duration...
[python_utils.git] / smart_future.py
index 1f6e6f0aedcf05966e536ec8f10f570c2175a3e4..625c3ed66b63b3ec17e5dc94897a3ca1a88a5451 100644 (file)
@@ -1,18 +1,26 @@
 #!/usr/bin/env python3
 
+# © Copyright 2021-2022, Scott Gasch
+
+"""A :class:`Future` that can be treated as a substutute for the result
+that it contains and will not block until it is used.  At that point,
+if the underlying value is not yet available yet, it will block until
+the internal result actually becomes available.
+"""
+
 from __future__ import annotations
 import concurrent
 import concurrent.futures as fut
 import logging
-import traceback
 from typing import Callable, List, Set, TypeVar
 
 from overrides import overrides
 
+import id_generator
+
 # This module is commonly used by others in here and should avoid
 # taking any unnecessary dependencies back on them.
 from deferred_operand import DeferredOperand
-import id_generator
 
 logger = logging.getLogger(__name__)
 
@@ -24,33 +32,52 @@ def wait_any(
     *,
     callback: Callable = None,
     log_exceptions: bool = True,
+    timeout: float = None,
 ):
+    """Await the completion of any of a collection of SmartFutures and
+    invoke callback each time one completes, repeatedly, until they are
+    all finished.
+
+    Args:
+        futures: A collection of SmartFutures to wait on
+        callback: An optional callback to invoke whenever one of the
+            futures completes
+        log_exceptions: Should we log (warning + exception) any
+            underlying exceptions raised during future processing or
+            silently ignore then?
+        timeout: invoke callback with a periodicity of timeout while
+            awaiting futures
+    """
+
     real_futures = []
     smart_future_by_real_future = {}
     completed_futures: Set[fut.Future] = set()
     for x in futures:
-        assert type(x) == SmartFuture
+        assert isinstance(x, SmartFuture)
         real_futures.append(x.wrapped_future)
         smart_future_by_real_future[x.wrapped_future] = x
 
     while len(completed_futures) != len(real_futures):
-        newly_completed_futures = concurrent.futures.as_completed(real_futures)
-        for f in newly_completed_futures:
+        try:
+            newly_completed_futures = concurrent.futures.as_completed(real_futures, timeout=timeout)
+            for f in newly_completed_futures:
+                if callback is not None:
+                    callback()
+                completed_futures.add(f)
+                if log_exceptions and not f.cancelled():
+                    exception = f.exception()
+                    if exception is not None:
+                        logger.warning(
+                            'Future 0x%x raised an unhandled exception and exited.', id(f)
+                        )
+                        logger.exception(exception)
+                        raise exception
+                yield smart_future_by_real_future[f]
+        except TimeoutError:
             if callback is not None:
                 callback()
-            completed_futures.add(f)
-            if log_exceptions and not f.cancelled():
-                exception = f.exception()
-                if exception is not None:
-                    logger.warning(
-                        f'Future {id(f)} raised an unhandled exception and exited.'
-                    )
-                    logger.exception(exception)
-                    raise exception
-            yield smart_future_by_real_future[f]
     if callback is not None:
         callback()
-    return
 
 
 def wait_all(
@@ -58,9 +85,19 @@ def wait_all(
     *,
     log_exceptions: bool = True,
 ) -> None:
+    """Wait for all of the SmartFutures in the collection to finish before
+    returning.
+
+    Args:
+        futures: A collection of futures that we're waiting for
+        log_exceptions: Should we log (warning + exception) any
+            underlying exceptions raised during future processing or
+            silently ignore then?
+    """
+
     real_futures = []
     for x in futures:
-        assert type(x) == SmartFuture
+        assert isinstance(x, SmartFuture)
         real_futures.append(x.wrapped_future)
 
     (done, not_done) = concurrent.futures.wait(
@@ -71,9 +108,7 @@ def wait_all(
             if not f.cancelled():
                 exception = f.exception()
                 if exception is not None:
-                    logger.warning(
-                        f'Future {id(f)} raised an unhandled exception and exited.'
-                    )
+                    logger.warning('Future 0x%x raised an unhandled exception and exited.', id(f))
                     logger.exception(exception)
                     raise exception
     assert len(done) == len(real_futures)
@@ -81,15 +116,16 @@ def wait_all(
 
 
 class SmartFuture(DeferredOperand):
-    """This is a SmartFuture, a class that wraps a normal Future and can
-    then be used, mostly, like a normal (non-Future) identifier.
+    """This is a SmartFuture, a class that wraps a normal :class:`Future`
+    and can then be used, mostly, like a normal (non-Future)
+    identifier of the type of that SmartFuture's result.
 
     Using a FutureWrapper in expressions will block and wait until
     the result of the deferred operation is known.
     """
 
     def __init__(self, wrapped_future: fut.Future) -> None:
-        assert type(wrapped_future) == fut.Future
+        assert isinstance(wrapped_future, fut.Future)
         self.wrapped_future = wrapped_future
         self.id = id_generator.get("smart_future_id")
 
@@ -102,5 +138,5 @@ class SmartFuture(DeferredOperand):
     # You shouldn't have to call this; instead, have a look at defining a
     # method on DeferredOperand base class.
     @overrides
-    def _resolve(self, *, timeout=None) -> T:
+    def _resolve(self, timeout=None) -> T:
         return self.wrapped_future.result(timeout)