class BaseExecutor(ABC):
def __init__(self, *, title=''):
self.title = title
- self.task_count = 0
self.histogram = hist.SimpleHistogram(
hist.SimpleHistogram.n_evenly_spaced_buckets(int(0), int(500), 50)
)
+ self.task_count = 0
@abstractmethod
def submit(self, function: Callable, *args, **kwargs) -> fut.Future:
pass
def adjust_task_count(self, delta: int) -> None:
+ """Change the task count. Note: do not call this method from a
+ worker, it should only be called by the launcher process /
+ thread / machine.
+
+ """
self.task_count += delta
+ print(f'(adjusted task count by {delta} to {self.task_count})')
logger.debug(f'Executor current task count is {self.task_count}')
+ def get_task_count(self) -> int:
+ """Change the task count. Note: do not call this method from a
+ worker, it should only be called by the launcher process /
+ thread / machine.
+
+ """
+ return self.task_count
+
class ThreadExecutor(BaseExecutor):
def __init__(self, max_workers: Optional[int] = None):
max_workers=workers, thread_name_prefix="thread_executor_helper"
)
+ # This is run on a different thread; do not adjust task count here.
def run_local_bundle(self, fun, *args, **kwargs):
logger.debug(f"Running local bundle at {fun.__name__}")
- start = time.time()
result = fun(*args, **kwargs)
- end = time.time()
- self.adjust_task_count(-1)
- duration = end - start
- logger.debug(f"{fun.__name__} finished; used {duration:.1f}s")
- self.histogram.add_item(duration)
return result
@overrides
newargs.append(function)
for arg in args:
newargs.append(arg)
- return self._thread_pool_executor.submit(
+ start = time.time()
+ result = self._thread_pool_executor.submit(
self.run_local_bundle, *newargs, **kwargs
)
+ result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
+ result.add_done_callback(lambda _: self.adjust_task_count(-1))
@overrides
def shutdown(self, wait=True) -> None:
max_workers=workers,
)
+ # This is run in another process; do not adjust task count here.
def run_cloud_pickle(self, pickle):
fun, args, kwargs = cloudpickle.loads(pickle)
logger.debug(f"Running pickled bundle at {fun.__name__}")
result = fun(*args, **kwargs)
- self.adjust_task_count(-1)
return result
@overrides
pickle = make_cloud_pickle(function, *args, **kwargs)
result = self._process_executor.submit(self.run_cloud_pickle, pickle)
result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
+ result.add_done_callback(lambda _: self.adjust_task_count(-1))
return result
@overrides