X-Git-Url: https://wannabe.guru.org/gitweb/?a=blobdiff_plain;f=executors.py;h=2829c6957268dd9d607f0db4e4102510b972e38a;hb=927b6920b62d581556c12ade7e9b09dc8978296b;hp=cdbb811a6acd2cb9af12fefe49c61b65adba5ad0;hpb=5f75cf834725ac26b289cc5f157af0cb71cd5f0e;p=python_utils.git diff --git a/executors.py b/executors.py index cdbb811..2829c69 100644 --- a/executors.py +++ b/executors.py @@ -15,6 +15,7 @@ import subprocess import threading import time from typing import Any, Callable, Dict, List, Optional, Set +import warnings import cloudpickle # type: ignore from overrides import overrides @@ -60,9 +61,8 @@ parser.add_argument( help='Maximum number of failures before giving up on a bundle', ) -RSYNC = 'rsync -q --no-motd -W --ignore-existing --timeout=60 --size-only -z' -SSH = 'ssh -oForwardX11=no' - +SSH = '/usr/bin/ssh -oForwardX11=no' +SCP = '/usr/bin/scp' def make_cloud_pickle(fun, *args, **kwargs): logger.debug(f"Making cloudpickled bundle at {fun.__name__}") @@ -490,7 +490,8 @@ class WeightedRandomRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy): worker.count -= 1 logger.debug(f'Selected worker {worker}') return worker - logger.warning("Couldn't find a worker; go fish.") + msg = 'Unexpectedly could not find a worker, retrying...' + logger.warning(msg) return None @@ -525,7 +526,8 @@ class RoundRobinRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy): if x >= len(self.workers): x = 0 if x == self.index: - logger.warning("Couldn't find a worker; go fish.") + msg = 'Unexpectedly could not find a worker, retrying...' + logger.warning(msg) return None @@ -738,7 +740,7 @@ class RemoteExecutor(BaseExecutor): # Send input code / data to worker machine if it's not local. if hostname not in machine: try: - cmd = f'{RSYNC} {bundle.code_file} {username}@{machine}:{bundle.code_file}' + cmd = f'{SCP} {bundle.code_file} {username}@{machine}:{bundle.code_file}' start_ts = time.time() logger.info(f"{bundle}: Copying work to {worker} via {cmd}.") run_silently(cmd) @@ -768,11 +770,10 @@ class RemoteExecutor(BaseExecutor): # There's a race condition where someone else # already finished the work and removed the source # code file before we could copy it. No biggie. - logger.warning( - f'{bundle}: Failed to send instructions to the worker machine... ' + - 'We\'re a backup and this may be caused by the original (or some ' + - 'other backup) already finishing this work. Ignoring this.' - ) + msg = f'{bundle}: Failed to send instructions to the worker machine... ' + msg += 'We\'re a backup and this may be caused by the original (or some ' + msg += 'other backup) already finishing this work. Ignoring this.' + logger.warning(msg) return None # Kick off the work. Note that if this fails we let @@ -844,9 +845,8 @@ class RemoteExecutor(BaseExecutor): logger.exception(e) logger.error(f'{bundle}: Something unexpected just happened...') if p is not None: - logger.warning( - f"{bundle}: Failed to wrap up \"done\" bundle, re-waiting on active ssh." - ) + msg = f"{bundle}: Failed to wrap up \"done\" bundle, re-waiting on active ssh." + logger.warning(msg) return self.wait_for_process(p, bundle, depth + 1) else: self.status.record_release_worker( @@ -874,7 +874,7 @@ class RemoteExecutor(BaseExecutor): if not was_cancelled: assert bundle.machine is not None if bundle.hostname not in bundle.machine: - cmd = f'{RSYNC} {username}@{machine}:{result_file} {result_file} 2>/dev/null' + cmd = f'{SCP} {username}@{machine}:{result_file} {result_file} 2>/dev/null' logger.info( f"{bundle}: Fetching results from {username}@{machine} via {cmd}" ) @@ -957,7 +957,7 @@ class RemoteExecutor(BaseExecutor): def create_original_bundle(self, pickle, fname: str): from string_utils import generate_uuid - uuid = generate_uuid(as_hex=True) + uuid = generate_uuid(omit_dashes=True) code_file = f'/tmp/{uuid}.code.bin' result_file = f'/tmp/{uuid}.result.bin' @@ -1059,9 +1059,9 @@ class RemoteExecutor(BaseExecutor): logger.error(f'{bundle}: At least it\'s only a backup; better luck with the others.') return None else: - logger.warning( - f'>>> Emergency rescheduling {bundle} because of unexected errors (wtf?!) <<<' - ) + msg = f'>>> Emergency rescheduling {bundle} because of unexected errors (wtf?!) <<<' + logger.warning(msg) + warnings.warn(msg) return self.launch(bundle, avoid_last_machine) @overrides