from typing import Any, Callable, Dict, List, Optional, Set
import cloudpickle # type: ignore
+from overrides import overrides
from ansi import bg, fg, underline, reset
import argparse_utils
self.histogram.add_item(duration)
return result
+ @overrides
def submit(self,
function: Callable,
*args,
*newargs,
**kwargs)
+ @overrides
def shutdown(self,
wait = True) -> None:
logger.debug(f'Shutting down threadpool executor {self.title}')
self.adjust_task_count(-1)
return result
+ @overrides
def submit(self,
function: Callable,
*args,
)
return result
+ @overrides
def shutdown(self, wait=True) -> None:
logger.debug(f'Shutting down processpool executor {self.title}')
self._process_executor.shutdown(wait)
policy: RemoteWorkerSelectionPolicy) -> None:
super().__init__()
self.workers = workers
+ self.policy = policy
self.worker_count = 0
for worker in self.workers:
self.worker_count += worker.count
msg = f"We need somewhere to schedule work; count was {self.worker_count}"
logger.critical(msg)
raise Exception(msg)
- self.policy = policy
self.policy.register_worker_pool(self.workers)
self.cv = threading.Condition()
self._helper_executor = fut.ThreadPoolExecutor(
self.status = RemoteExecutorStatus(self.worker_count)
self.total_bundles_submitted = 0
logger.debug(
- f'Creating remote processpool with {self.worker_count} remote endpoints.'
+ f'Creating remote processpool with {self.worker_count} remote worker threads.'
)
def is_worker_available(self) -> bool:
# they will move the result_file to this machine and let
# the original pick them up and unpickle them.
+ @overrides
def submit(self,
function: Callable,
*args,
self.total_bundles_submitted += 1
return self._helper_executor.submit(self.launch, bundle)
+ @overrides
def shutdown(self, wait=True) -> None:
self._helper_executor.shutdown(wait)
logging.debug(f'Shutting down RemoteExecutor {self.title}')