- result = self._thread_pool_executor.submit(
- self.run_local_bundle, *newargs, **kwargs
- )
+ result = self._thread_pool_executor.submit(self.run_local_bundle, *newargs, **kwargs)
result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
result.add_done_callback(lambda _: self.adjust_task_count(-1))
return result
result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
result.add_done_callback(lambda _: self.adjust_task_count(-1))
return result
self.start_time: float = time.time()
self.start_per_bundle: Dict[str, Optional[float]] = defaultdict(float)
self.end_per_bundle: Dict[str, float] = defaultdict(float)
self.start_time: float = time.time()
self.start_per_bundle: Dict[str, Optional[float]] = defaultdict(float)
self.end_per_bundle: Dict[str, float] = defaultdict(float)
self.in_flight_bundles_by_worker: Dict[RemoteWorkerRecord, Set[str]] = {}
self.bundle_details_by_uuid: Dict[str, BundleDetails] = {}
self.finished_bundle_timings: List[float] = []
self.in_flight_bundles_by_worker: Dict[RemoteWorkerRecord, Set[str]] = {}
self.bundle_details_by_uuid: Dict[str, BundleDetails] = {}
self.finished_bundle_timings: List[float] = []
- def record_acquire_worker_already_locked(
- self, worker: RemoteWorkerRecord, uuid: str
- ) -> None:
+ def record_acquire_worker_already_locked(self, worker: RemoteWorkerRecord, uuid: str) -> None:
assert self.lock.locked()
self.known_workers.add(worker)
self.start_per_bundle[uuid] = None
assert self.lock.locked()
self.known_workers.add(worker)
self.start_per_bundle[uuid] = None
raise RemoteExecutorException(msg)
self.policy.register_worker_pool(self.workers)
self.cv = threading.Condition()
raise RemoteExecutorException(msg)
self.policy.register_worker_pool(self.workers)
self.cv = threading.Condition()
self._helper_executor = fut.ThreadPoolExecutor(
thread_name_prefix="remote_executor_helper",
max_workers=self.worker_count,
self._helper_executor = fut.ThreadPoolExecutor(
thread_name_prefix="remote_executor_helper",
max_workers=self.worker_count,
bundle_to_backup = bundle
assert bundle is not None
assert bundle.backup_bundles is not None
bundle_to_backup = bundle
assert bundle is not None
assert bundle.backup_bundles is not None
- def find_available_worker_or_block(
- self, machine_to_avoid: str = None
- ) -> RemoteWorkerRecord:
+ def find_available_worker_or_block(self, machine_to_avoid: str = None) -> RemoteWorkerRecord:
logger.debug(f'{bundle}: Executing {cmd} in the background to kick off work...')
p = cmd_in_background(cmd, silent=True)
bundle.pid = p.pid
logger.debug(f'{bundle}: Executing {cmd} in the background to kick off work...')
p = cmd_in_background(cmd, silent=True)
bundle.pid = p.pid
- f'{SSH} {username}@{machine}'
- f' "/bin/rm -f {code_file} {result_file}"'
- )
- logger.debug(
- f'Fetching results back took {time.time() - bundle.end_ts:.1f}s.'
+ f'{SSH} {username}@{machine}' f' "/bin/rm -f {code_file} {result_file}"'
backup.is_cancelled.set()
# This is a backup job and, by now, we have already fetched
backup.is_cancelled.set()
# This is a backup job and, by now, we have already fetched
orig_bundle.is_cancelled.set()
self.release_worker(bundle, was_cancelled=was_cancelled)
return result
orig_bundle.is_cancelled.set()
self.release_worker(bundle, was_cancelled=was_cancelled)
return result
# they will move the result_file to this machine and let
# the original pick them up and unpickle them.
# they will move the result_file to this machine and let
# the original pick them up and unpickle them.
- def emergency_retry_nasty_bundle(
- self, bundle: BundleDetails
- ) -> Optional[fut.Future]:
+ def emergency_retry_nasty_bundle(self, bundle: BundleDetails) -> Optional[fut.Future]:
def ping(self, host) -> bool:
logger.debug(f'RUN> ping -c 1 {host}')
try:
def ping(self, host) -> bool:
logger.debug(f'RUN> ping -c 1 {host}')
try: