#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
from __future__ import annotations
for arg in args:
newargs.append(arg)
start = time.time()
- result = self._thread_pool_executor.submit(
- self.run_local_bundle, *newargs, **kwargs
- )
+ result = self._thread_pool_executor.submit(self.run_local_bundle, *newargs, **kwargs)
result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
result.add_done_callback(lambda _: self.adjust_task_count(-1))
return result
self.start_time: float = time.time()
self.start_per_bundle: Dict[str, Optional[float]] = defaultdict(float)
self.end_per_bundle: Dict[str, float] = defaultdict(float)
- self.finished_bundle_timings_per_worker: Dict[
- RemoteWorkerRecord, List[float]
- ] = {}
+ self.finished_bundle_timings_per_worker: Dict[RemoteWorkerRecord, List[float]] = {}
self.in_flight_bundles_by_worker: Dict[RemoteWorkerRecord, Set[str]] = {}
self.bundle_details_by_uuid: Dict[str, BundleDetails] = {}
self.finished_bundle_timings: List[float] = []
with self.lock:
self.record_acquire_worker_already_locked(worker, uuid)
- def record_acquire_worker_already_locked(
- self, worker: RemoteWorkerRecord, uuid: str
- ) -> None:
+ def record_acquire_worker_already_locked(self, worker: RemoteWorkerRecord, uuid: str) -> None:
assert self.lock.locked()
self.known_workers.add(worker)
self.start_per_bundle[uuid] = None
self.in_flight_bundles_by_worker[worker].remove(uuid)
if not was_cancelled:
start = self.start_per_bundle[uuid]
- assert start
+ assert start is not None
bundle_latency = ts - start
x = self.finished_bundle_timings_per_worker.get(worker, list())
x.append(bundle_latency)
grabbag.append(worker)
if len(grabbag) == 0:
- logger.debug(
- f'There are no available workers that avoid {machine_to_avoid}...'
- )
+ logger.debug(f'There are no available workers that avoid {machine_to_avoid}...')
for worker in self.workers:
if worker.count > 0:
for _ in range(worker.count * worker.weight):
return False
@overrides
- def acquire_worker(
- self, machine_to_avoid: str = None
- ) -> Optional[RemoteWorkerRecord]:
+ def acquire_worker(self, machine_to_avoid: str = None) -> Optional[RemoteWorkerRecord]:
x = self.index
while True:
worker = self.workers[x]
raise RemoteExecutorException(msg)
self.policy.register_worker_pool(self.workers)
self.cv = threading.Condition()
- logger.debug(
- f'Creating {self.worker_count} local threads, one per remote worker.'
- )
+ logger.debug(f'Creating {self.worker_count} local threads, one per remote worker.')
self._helper_executor = fut.ThreadPoolExecutor(
thread_name_prefix="remote_executor_helper",
max_workers=self.worker_count,
if start_ts is not None:
runtime = now - start_ts
score += runtime
- logger.debug(
- f'score[{bundle}] => {score} # latency boost'
- )
+ logger.debug(f'score[{bundle}] => {score} # latency boost')
if bundle.slower_than_local_p95:
score += runtime / 2
- logger.debug(
- f'score[{bundle}] => {score} # >worker p95'
- )
+ logger.debug(f'score[{bundle}] => {score} # >worker p95')
if bundle.slower_than_global_p95:
score += runtime / 4
- logger.debug(
- f'score[{bundle}] => {score} # >global p95'
- )
+ logger.debug(f'score[{bundle}] => {score} # >global p95')
# Prefer backups of bundles that don't
# have backups already.
f'score[{bundle}] => {score} # {backup_count} dup backup factor'
)
- if score != 0 and (
- best_score is None or score > best_score
- ):
+ if score != 0 and (best_score is None or score > best_score):
bundle_to_backup = bundle
assert bundle is not None
assert bundle.backup_bundles is not None
def is_worker_available(self) -> bool:
return self.policy.is_worker_available()
- def acquire_worker(
- self, machine_to_avoid: str = None
- ) -> Optional[RemoteWorkerRecord]:
+ def acquire_worker(self, machine_to_avoid: str = None) -> Optional[RemoteWorkerRecord]:
return self.policy.acquire_worker(machine_to_avoid)
- def find_available_worker_or_block(
- self, machine_to_avoid: str = None
- ) -> RemoteWorkerRecord:
+ def find_available_worker_or_block(self, machine_to_avoid: str = None) -> RemoteWorkerRecord:
with self.cv:
while not self.is_worker_available():
self.cv.wait()
worker = None
while worker is None:
worker = self.find_available_worker_or_block(avoid_machine)
- assert worker
+ assert worker is not None
# Ok, found a worker.
bundle.worker = worker
try:
return self.process_work_result(bundle)
except Exception as e:
- logger.warning(
- f'{bundle}: bundle says it\'s cancelled upfront but no results?!'
- )
+ logger.warning(f'{bundle}: bundle says it\'s cancelled upfront but no results?!')
self.release_worker(bundle)
if is_original:
# Weird. We are the original owner of this
# Send input code / data to worker machine if it's not local.
if hostname not in machine:
try:
- cmd = (
- f'{SCP} {bundle.code_file} {username}@{machine}:{bundle.code_file}'
- )
+ cmd = f'{SCP} {bundle.code_file} {username}@{machine}:{bundle.code_file}'
start_ts = time.time()
logger.info(f"{bundle}: Copying work to {worker} via {cmd}.")
run_silently(cmd)
logger.debug(f'{bundle}: Executing {cmd} in the background to kick off work...')
p = cmd_in_background(cmd, silent=True)
bundle.pid = p.pid
- logger.debug(
- f'{bundle}: Local ssh process pid={p.pid}; remote worker is {machine}.'
- )
+ logger.debug(f'{bundle}: Local ssh process pid={p.pid}; remote worker is {machine}.')
return self.wait_for_process(p, bundle, 0)
def wait_for_process(
self, p: Optional[subprocess.Popen], bundle: BundleDetails, depth: int
) -> Any:
machine = bundle.machine
- assert p
+ assert p is not None
pid = p.pid
if depth > 3:
logger.error(
p.wait(timeout=0.25)
except subprocess.TimeoutExpired:
if self.check_if_cancelled(bundle):
- logger.info(
- f'{bundle}: looks like another worker finished bundle...'
- )
+ logger.info(f'{bundle}: looks like another worker finished bundle...')
break
else:
logger.info(f"{bundle}: pid {pid} ({machine}) is finished!")
break
run_silently(
- f'{SSH} {username}@{machine}'
- f' "/bin/rm -f {code_file} {result_file}"'
- )
- logger.debug(
- f'Fetching results back took {time.time() - bundle.end_ts:.1f}s.'
+ f'{SSH} {username}@{machine}' f' "/bin/rm -f {code_file} {result_file}"'
)
+ logger.debug(f'Fetching results back took {time.time() - bundle.end_ts:.1f}s.')
dur = bundle.end_ts - bundle.start_ts
self.histogram.add_item(dur)
# backup.
if bundle.backup_bundles is not None:
for backup in bundle.backup_bundles:
- logger.debug(
- f'{bundle}: Notifying backup {backup.uuid} that it\'s cancelled'
- )
+ logger.debug(f'{bundle}: Notifying backup {backup.uuid} that it\'s cancelled')
backup.is_cancelled.set()
# This is a backup job and, by now, we have already fetched
# Tell the original to stop if we finished first.
if not was_cancelled:
orig_bundle = bundle.src_bundle
- assert orig_bundle
- logger.debug(
- f'{bundle}: Notifying original {orig_bundle.uuid} we beat them to it.'
- )
+ assert orig_bundle is not None
+ logger.debug(f'{bundle}: Notifying original {orig_bundle.uuid} we beat them to it.')
orig_bundle.is_cancelled.set()
self.release_worker(bundle, was_cancelled=was_cancelled)
return result
# they will move the result_file to this machine and let
# the original pick them up and unpickle them.
- def emergency_retry_nasty_bundle(
- self, bundle: BundleDetails
- ) -> Optional[fut.Future]:
+ def emergency_retry_nasty_bundle(self, bundle: BundleDetails) -> Optional[fut.Future]:
is_original = bundle.src_bundle is None
bundle.worker = None
avoid_last_machine = bundle.machine
def ping(self, host) -> bool:
logger.debug(f'RUN> ping -c 1 {host}')
try:
- x = cmd_with_timeout(
- f'ping -c 1 {host} >/dev/null 2>/dev/null', timeout_seconds=1.0
- )
+ x = cmd_with_timeout(f'ping -c 1 {host} >/dev/null 2>/dev/null', timeout_seconds=1.0)
return x == 0
except Exception:
return False