From 74af46212d1d800db6fb0a26fda941f4a6f5b0f4 Mon Sep 17 00:00:00 2001 From: Scott Date: Thu, 27 Jan 2022 09:29:07 -0800 Subject: [PATCH] Make the worker selection heuristics work harder to avoid backup bundles scheduled on the same machine as their primary. --- executors.py | 50 ++++++++++++++++++++++++++++++-------------------- 1 file changed, 30 insertions(+), 20 deletions(-) diff --git a/executors.py b/executors.py index 453139a..20aa9d2 100644 --- a/executors.py +++ b/executors.py @@ -428,21 +428,29 @@ class WeightedRandomRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy): def acquire_worker(self, machine_to_avoid=None) -> Optional[RemoteWorkerRecord]: grabbag = [] for worker in self.workers: - for x in range(0, worker.count): - for y in range(0, worker.weight): - grabbag.append(worker) - - for _ in range(0, 5): - random.shuffle(grabbag) - worker = grabbag[0] - if worker.machine != machine_to_avoid or _ > 2: + if worker.machine != machine_to_avoid: if worker.count > 0: - worker.count -= 1 - logger.debug(f'Selected worker {worker}') - return worker - msg = 'Unexpectedly could not find a worker, retrying...' - logger.warning(msg) - return None + for _ in range(worker.count * worker.weight): + grabbag.append(worker) + + if len(grabbag) == 0: + logger.debug( + f'There are no available workers that avoid {machine_to_avoid}...' + ) + for worker in self.workers: + if worker.count > 0: + for _ in range(worker.count * worker.weight): + grabbag.append(worker) + + if len(grabbag) == 0: + logger.warning('There are no available workers?!') + return None + + worker = random.sample(grabbag, 1)[0] + assert worker.count > 0 + worker.count -= 1 + logger.debug(f'Chose worker {worker}') + return worker class RoundRobinRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy): @@ -515,7 +523,7 @@ class RemoteExecutor(BaseExecutor): ) = self.run_periodic_heartbeat() @background_thread - def run_periodic_heartbeat(self, stop_event) -> None: + def run_periodic_heartbeat(self, stop_event: threading.Event) -> None: while not stop_event.is_set(): time.sleep(5.0) logger.debug('Running periodic heartbeat code...') @@ -523,8 +531,10 @@ class RemoteExecutor(BaseExecutor): logger.debug('Periodic heartbeat thread shutting down.') def heartbeat(self) -> None: + # Note: this is invoked on a background thread, not an + # executor thread. Be careful what you do with it b/c it + # needs to get back and dump status again periodically. with self.status.lock: - # Dump regular progress report self.status.periodic_dump(self.total_bundles_submitted) # Look for bundles to reschedule via executor.submit @@ -539,7 +549,7 @@ class RemoteExecutor(BaseExecutor): if ( num_done > 2 and num_idle_workers > 1 - and (self.last_backup is None or (now - self.last_backup > 6.0)) + and (self.last_backup is None or (now - self.last_backup > 9.0)) and self.backup_lock.acquire(blocking=False) ): try: @@ -1104,7 +1114,7 @@ class DefaultExecutors(object): RemoteWorkerRecord( username='scott', machine='cheetah.house', - weight=34, + weight=30, count=6, ), ) @@ -1134,7 +1144,7 @@ class DefaultExecutors(object): RemoteWorkerRecord( username='scott', machine='puma.cabin', - weight=25, + weight=30, count=6, ), ) @@ -1144,7 +1154,7 @@ class DefaultExecutors(object): RemoteWorkerRecord( username='scott', machine='backup.house', - weight=7, + weight=8, count=2, ), ) -- 2.45.2