X-Git-Url: https://wannabe.guru.org/gitweb/?a=blobdiff_plain;f=executors.py;h=6ccd7b675c760315d05158d68dad0768cc0f0871;hb=21562149bda41588b65e4553d216fd0103761811;hp=cdbb811a6acd2cb9af12fefe49c61b65adba5ad0;hpb=5f75cf834725ac26b289cc5f157af0cb71cd5f0e;p=python_utils.git diff --git a/executors.py b/executors.py index cdbb811..6ccd7b6 100644 --- a/executors.py +++ b/executors.py @@ -15,6 +15,7 @@ import subprocess import threading import time from typing import Any, Callable, Dict, List, Optional, Set +import warnings import cloudpickle # type: ignore from overrides import overrides @@ -490,7 +491,8 @@ class WeightedRandomRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy): worker.count -= 1 logger.debug(f'Selected worker {worker}') return worker - logger.warning("Couldn't find a worker; go fish.") + msg = 'Unexpectedly could not find a worker, retrying...' + logger.warning(msg) return None @@ -525,7 +527,8 @@ class RoundRobinRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy): if x >= len(self.workers): x = 0 if x == self.index: - logger.warning("Couldn't find a worker; go fish.") + msg = 'Unexpectedly could not find a worker, retrying...' + logger.warning(msg) return None @@ -768,11 +771,10 @@ class RemoteExecutor(BaseExecutor): # There's a race condition where someone else # already finished the work and removed the source # code file before we could copy it. No biggie. - logger.warning( - f'{bundle}: Failed to send instructions to the worker machine... ' + - 'We\'re a backup and this may be caused by the original (or some ' + - 'other backup) already finishing this work. Ignoring this.' - ) + msg = f'{bundle}: Failed to send instructions to the worker machine... ' + msg += 'We\'re a backup and this may be caused by the original (or some ' + msg += 'other backup) already finishing this work. Ignoring this.' + logger.warning(msg) return None # Kick off the work. Note that if this fails we let @@ -844,9 +846,8 @@ class RemoteExecutor(BaseExecutor): logger.exception(e) logger.error(f'{bundle}: Something unexpected just happened...') if p is not None: - logger.warning( - f"{bundle}: Failed to wrap up \"done\" bundle, re-waiting on active ssh." - ) + msg = f"{bundle}: Failed to wrap up \"done\" bundle, re-waiting on active ssh." + logger.warning(msg) return self.wait_for_process(p, bundle, depth + 1) else: self.status.record_release_worker( @@ -957,7 +958,7 @@ class RemoteExecutor(BaseExecutor): def create_original_bundle(self, pickle, fname: str): from string_utils import generate_uuid - uuid = generate_uuid(as_hex=True) + uuid = generate_uuid(omit_dashes=True) code_file = f'/tmp/{uuid}.code.bin' result_file = f'/tmp/{uuid}.result.bin' @@ -1059,9 +1060,9 @@ class RemoteExecutor(BaseExecutor): logger.error(f'{bundle}: At least it\'s only a backup; better luck with the others.') return None else: - logger.warning( - f'>>> Emergency rescheduling {bundle} because of unexected errors (wtf?!) <<<' - ) + msg = f'>>> Emergency rescheduling {bundle} because of unexected errors (wtf?!) <<<' + logger.warning(msg) + warnings.warn(msg) return self.launch(bundle, avoid_last_machine) @overrides