X-Git-Url: https://wannabe.guru.org/gitweb/?a=blobdiff_plain;f=executors.py;h=20aa9d2e325e5b5e54a77836f6117b91d99c36be;hb=7c09ecdda0e93da9ffab0d477aa561eb89cd6953;hp=399e32d14d33c3b3097a6457f762c76439785f5e;hpb=98d3d3251029d5dff6439311b040fca7f311983b;p=python_utils.git diff --git a/executors.py b/executors.py index 399e32d..20aa9d2 100644 --- a/executors.py +++ b/executors.py @@ -428,21 +428,29 @@ class WeightedRandomRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy): def acquire_worker(self, machine_to_avoid=None) -> Optional[RemoteWorkerRecord]: grabbag = [] for worker in self.workers: - for x in range(0, worker.count): - for y in range(0, worker.weight): - grabbag.append(worker) - - for _ in range(0, 5): - random.shuffle(grabbag) - worker = grabbag[0] - if worker.machine != machine_to_avoid or _ > 2: + if worker.machine != machine_to_avoid: if worker.count > 0: - worker.count -= 1 - logger.debug(f'Selected worker {worker}') - return worker - msg = 'Unexpectedly could not find a worker, retrying...' - logger.warning(msg) - return None + for _ in range(worker.count * worker.weight): + grabbag.append(worker) + + if len(grabbag) == 0: + logger.debug( + f'There are no available workers that avoid {machine_to_avoid}...' + ) + for worker in self.workers: + if worker.count > 0: + for _ in range(worker.count * worker.weight): + grabbag.append(worker) + + if len(grabbag) == 0: + logger.warning('There are no available workers?!') + return None + + worker = random.sample(grabbag, 1)[0] + assert worker.count > 0 + worker.count -= 1 + logger.debug(f'Chose worker {worker}') + return worker class RoundRobinRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy): @@ -482,7 +490,9 @@ class RoundRobinRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy): class RemoteExecutor(BaseExecutor): def __init__( - self, workers: List[RemoteWorkerRecord], policy: RemoteWorkerSelectionPolicy + self, + workers: List[RemoteWorkerRecord], + policy: RemoteWorkerSelectionPolicy, ) -> None: super().__init__() self.workers = workers @@ -513,7 +523,7 @@ class RemoteExecutor(BaseExecutor): ) = self.run_periodic_heartbeat() @background_thread - def run_periodic_heartbeat(self, stop_event) -> None: + def run_periodic_heartbeat(self, stop_event: threading.Event) -> None: while not stop_event.is_set(): time.sleep(5.0) logger.debug('Running periodic heartbeat code...') @@ -521,8 +531,10 @@ class RemoteExecutor(BaseExecutor): logger.debug('Periodic heartbeat thread shutting down.') def heartbeat(self) -> None: + # Note: this is invoked on a background thread, not an + # executor thread. Be careful what you do with it b/c it + # needs to get back and dump status again periodically. with self.status.lock: - # Dump regular progress report self.status.periodic_dump(self.total_bundles_submitted) # Look for bundles to reschedule via executor.submit @@ -537,7 +549,7 @@ class RemoteExecutor(BaseExecutor): if ( num_done > 2 and num_idle_workers > 1 - and (self.last_backup is None or (now - self.last_backup > 6.0)) + and (self.last_backup is None or (now - self.last_backup > 9.0)) and self.backup_lock.acquire(blocking=False) ): try: @@ -1102,7 +1114,7 @@ class DefaultExecutors(object): RemoteWorkerRecord( username='scott', machine='cheetah.house', - weight=25, + weight=30, count=6, ), ) @@ -1122,7 +1134,7 @@ class DefaultExecutors(object): RemoteWorkerRecord( username='scott', machine='wannabe.house', - weight=30, + weight=25, count=10, ), ) @@ -1132,7 +1144,7 @@ class DefaultExecutors(object): RemoteWorkerRecord( username='scott', machine='puma.cabin', - weight=25, + weight=30, count=6, ), ) @@ -1142,7 +1154,7 @@ class DefaultExecutors(object): RemoteWorkerRecord( username='scott', machine='backup.house', - weight=7, + weight=8, count=2, ), )