)
SSH = '/usr/bin/ssh -oForwardX11=no'
-SCP = '/usr/bin/scp'
+SCP = '/usr/bin/scp -C'
def make_cloud_pickle(fun, *args, **kwargs):
class RemoteExecutorStatus:
def __init__(self, total_worker_count: int) -> None:
- self.worker_count = total_worker_count
+ self.worker_count: int = total_worker_count
self.known_workers: Set[RemoteWorkerRecord] = set()
+ self.start_time: float = time.time()
self.start_per_bundle: Dict[str, float] = defaultdict(float)
self.end_per_bundle: Dict[str, float] = defaultdict(float)
self.finished_bundle_timings_per_worker: Dict[
self.bundle_details_by_uuid: Dict[str, BundleDetails] = {}
self.finished_bundle_timings: List[float] = []
self.last_periodic_dump: Optional[float] = None
- self.total_bundles_submitted = 0
+ self.total_bundles_submitted: int = 0
# Protects reads and modification using self. Also used
# as a memory fence for modifications to bundle.
- self.lock = threading.Lock()
+ self.lock: threading.Lock = threading.Lock()
def record_acquire_worker(self, worker: RemoteWorkerRecord, uuid: str) -> None:
with self.lock:
if len(self.finished_bundle_timings) > 1:
qall = numpy.quantile(self.finished_bundle_timings, [0.5, 0.95])
ret += (
- f'⏱=∀p50:{qall[0]:.1f}s, ∀p95:{qall[1]:.1f}s, '
+ f'⏱=∀p50:{qall[0]:.1f}s, ∀p95:{qall[1]:.1f}s, total={ts-self.start_time:.1f}s, '
f'✅={total_finished}/{self.total_bundles_submitted}, '
f'💻n={total_in_flight}/{self.worker_count}\n'
)
else:
ret += (
- f' ✅={total_finished}/{self.total_bundles_submitted}, '
+ f'⏱={ts-self.start_time:.1f}s, '
+ f'✅={total_finished}/{self.total_bundles_submitted}, '
f'💻n={total_in_flight}/{self.worker_count}\n'
)
def acquire_worker(self, machine_to_avoid=None) -> Optional[RemoteWorkerRecord]:
grabbag = []
for worker in self.workers:
- for x in range(0, worker.count):
- for y in range(0, worker.weight):
- grabbag.append(worker)
-
- for _ in range(0, 5):
- random.shuffle(grabbag)
- worker = grabbag[0]
- if worker.machine != machine_to_avoid or _ > 2:
+ if worker.machine != machine_to_avoid:
+ if worker.count > 0:
+ for _ in range(worker.count * worker.weight):
+ grabbag.append(worker)
+
+ if len(grabbag) == 0:
+ logger.debug(
+ f'There are no available workers that avoid {machine_to_avoid}...'
+ )
+ for worker in self.workers:
if worker.count > 0:
- worker.count -= 1
- logger.debug(f'Selected worker {worker}')
- return worker
- msg = 'Unexpectedly could not find a worker, retrying...'
- logger.warning(msg)
- return None
+ for _ in range(worker.count * worker.weight):
+ grabbag.append(worker)
+
+ if len(grabbag) == 0:
+ logger.warning('There are no available workers?!')
+ return None
+
+ worker = random.sample(grabbag, 1)[0]
+ assert worker.count > 0
+ worker.count -= 1
+ logger.debug(f'Chose worker {worker}')
+ return worker
class RoundRobinRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
class RemoteExecutor(BaseExecutor):
def __init__(
- self, workers: List[RemoteWorkerRecord], policy: RemoteWorkerSelectionPolicy
+ self,
+ workers: List[RemoteWorkerRecord],
+ policy: RemoteWorkerSelectionPolicy,
) -> None:
super().__init__()
self.workers = workers
) = self.run_periodic_heartbeat()
@background_thread
- def run_periodic_heartbeat(self, stop_event) -> None:
+ def run_periodic_heartbeat(self, stop_event: threading.Event) -> None:
while not stop_event.is_set():
time.sleep(5.0)
logger.debug('Running periodic heartbeat code...')
logger.debug('Periodic heartbeat thread shutting down.')
def heartbeat(self) -> None:
+ # Note: this is invoked on a background thread, not an
+ # executor thread. Be careful what you do with it b/c it
+ # needs to get back and dump status again periodically.
with self.status.lock:
- # Dump regular progress report
self.status.periodic_dump(self.total_bundles_submitted)
# Look for bundles to reschedule via executor.submit
if (
num_done > 2
and num_idle_workers > 1
- and (self.last_backup is None or (now - self.last_backup > 6.0))
+ and (self.last_backup is None or (now - self.last_backup > 9.0))
and self.backup_lock.acquire(blocking=False)
):
try:
RemoteWorkerRecord(
username='scott',
machine='cheetah.house',
- weight=25,
+ weight=30,
count=6,
),
)
RemoteWorkerRecord(
username='scott',
machine='wannabe.house',
- weight=30,
+ weight=25,
count=10,
),
)
RemoteWorkerRecord(
username='scott',
machine='puma.cabin',
- weight=25,
+ weight=30,
count=6,
),
)
RemoteWorkerRecord(
username='scott',
machine='backup.house',
- weight=7,
+ weight=8,
count=2,
),
)