)
result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
result.add_done_callback(lambda _: self.adjust_task_count(-1))
+ return result
@overrides
def shutdown(self, wait=True) -> None:
real_futures = []
smart_future_by_real_future = {}
completed_futures = set()
- for _ in futures:
- real_futures.append(_.wrapped_future)
- smart_future_by_real_future[_.wrapped_future] = _
+ for f in futures:
+ assert type(f) == SmartFuture
+ real_futures.append(f.wrapped_future)
+ smart_future_by_real_future[f.wrapped_future] = f
+
while len(completed_futures) != len(real_futures):
newly_completed_futures = concurrent.futures.as_completed(real_futures)
for f in newly_completed_futures:
*,
log_exceptions: bool = True,
) -> None:
- real_futures = [x.wrapped_future for x in futures]
+ real_futures = []
+ for f in futures:
+ assert type(f) == SmartFuture
+ real_futures.append(f.wrapped_future)
+
(done, not_done) = concurrent.futures.wait(
real_futures, timeout=None, return_when=concurrent.futures.ALL_COMPLETED
)
"""
def __init__(self, wrapped_future: fut.Future) -> None:
+ assert type(wrapped_future) == fut.Future
self.wrapped_future = wrapped_future
self.id = id_generator.get("smart_future_id")
def test_thread_parallelization() -> None:
results = []
for _ in range(50):
- results.append(compute_factorial_thread(_))
+ f = compute_factorial_thread(_)
+ results.append(f)
smart_future.wait_all(results)
for future in results:
print(f'Thread: {future}')