X-Git-Url: https://wannabe.guru.org/gitweb/?a=blobdiff_plain;f=executors.py;h=dabddf31b65e6687087820f7f2115a1197e93281;hb=b454ad295eb3024a238d32bf2aef1ebc3c496b44;hp=cdbb811a6acd2cb9af12fefe49c61b65adba5ad0;hpb=5f75cf834725ac26b289cc5f157af0cb71cd5f0e;p=python_utils.git diff --git a/executors.py b/executors.py index cdbb811..dabddf3 100644 --- a/executors.py +++ b/executors.py @@ -15,6 +15,7 @@ import subprocess import threading import time from typing import Any, Callable, Dict, List, Optional, Set +import warnings import cloudpickle # type: ignore from overrides import overrides @@ -490,7 +491,9 @@ class WeightedRandomRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy): worker.count -= 1 logger.debug(f'Selected worker {worker}') return worker - logger.warning("Couldn't find a worker; go fish.") + msg = 'Unexpectedly could not find a worker, retrying...' + logger.warning(msg) + warnings.warn(msg) return None @@ -525,7 +528,9 @@ class RoundRobinRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy): if x >= len(self.workers): x = 0 if x == self.index: - logger.warning("Couldn't find a worker; go fish.") + msg = 'Unexpectedly could not find a worker, retrying...' + logger.warning(msg) + warnings.warn(msg) return None @@ -768,11 +773,11 @@ class RemoteExecutor(BaseExecutor): # There's a race condition where someone else # already finished the work and removed the source # code file before we could copy it. No biggie. - logger.warning( - f'{bundle}: Failed to send instructions to the worker machine... ' + + msg = f'{bundle}: Failed to send instructions to the worker machine... ' + 'We\'re a backup and this may be caused by the original (or some ' + 'other backup) already finishing this work. Ignoring this.' - ) + logger.warning(msg) + warnings.warn(msg) return None # Kick off the work. Note that if this fails we let @@ -844,9 +849,9 @@ class RemoteExecutor(BaseExecutor): logger.exception(e) logger.error(f'{bundle}: Something unexpected just happened...') if p is not None: - logger.warning( - f"{bundle}: Failed to wrap up \"done\" bundle, re-waiting on active ssh." - ) + msg = f"{bundle}: Failed to wrap up \"done\" bundle, re-waiting on active ssh." + logger.warning(msg) + warnings.warn(msg) return self.wait_for_process(p, bundle, depth + 1) else: self.status.record_release_worker( @@ -1059,9 +1064,9 @@ class RemoteExecutor(BaseExecutor): logger.error(f'{bundle}: At least it\'s only a backup; better luck with the others.') return None else: - logger.warning( - f'>>> Emergency rescheduling {bundle} because of unexected errors (wtf?!) <<<' - ) + msg = f'>>> Emergency rescheduling {bundle} because of unexected errors (wtf?!) <<<' + logger.warning(msg) + warnings.warn(msg) return self.launch(bundle, avoid_last_machine) @overrides