Fix a recent bug in executors. Thread executor needs to return
authorScott <[email protected]>
Mon, 31 Jan 2022 06:09:43 +0000 (22:09 -0800)
committerScott <[email protected]>
Mon, 31 Jan 2022 06:09:43 +0000 (22:09 -0800)
its future.

executors.py
smart_future.py
tests/parallelize_itest.py

index 5b77a42dc3d29ca6f42673a369e23f0962343c62..47b4a89a88d693d535ed2e036c6288829505a005 100644 (file)
@@ -152,6 +152,7 @@ class ThreadExecutor(BaseExecutor):
         )
         result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
         result.add_done_callback(lambda _: self.adjust_task_count(-1))
+        return result
 
     @overrides
     def shutdown(self, wait=True) -> None:
index 2f3cbd9a9949f681e8a7cc70bd35ca43626e2861..604c149520464bcd9d8c5a55cf8905acd5ec34d4 100644 (file)
@@ -28,9 +28,11 @@ def wait_any(
     real_futures = []
     smart_future_by_real_future = {}
     completed_futures = set()
-    for _ in futures:
-        real_futures.append(_.wrapped_future)
-        smart_future_by_real_future[_.wrapped_future] = _
+    for f in futures:
+        assert type(f) == SmartFuture
+        real_futures.append(f.wrapped_future)
+        smart_future_by_real_future[f.wrapped_future] = f
+
     while len(completed_futures) != len(real_futures):
         newly_completed_futures = concurrent.futures.as_completed(real_futures)
         for f in newly_completed_futures:
@@ -56,7 +58,11 @@ def wait_all(
     *,
     log_exceptions: bool = True,
 ) -> None:
-    real_futures = [x.wrapped_future for x in futures]
+    real_futures = []
+    for f in futures:
+        assert type(f) == SmartFuture
+        real_futures.append(f.wrapped_future)
+
     (done, not_done) = concurrent.futures.wait(
         real_futures, timeout=None, return_when=concurrent.futures.ALL_COMPLETED
     )
@@ -83,6 +89,7 @@ class SmartFuture(DeferredOperand):
     """
 
     def __init__(self, wrapped_future: fut.Future) -> None:
+        assert type(wrapped_future) == fut.Future
         self.wrapped_future = wrapped_future
         self.id = id_generator.get("smart_future_id")
 
index 11c5676173ce584ae1f6a13fb3c5c1e3d8549b5b..6ac95380c5a9a132fb7c43172d1a30d0ce59e2ee 100755 (executable)
@@ -37,7 +37,8 @@ def compute_factorial_remote(n):
 def test_thread_parallelization() -> None:
     results = []
     for _ in range(50):
-        results.append(compute_factorial_thread(_))
+        f = compute_factorial_thread(_)
+        results.append(f)
     smart_future.wait_all(results)
     for future in results:
         print(f'Thread: {future}')