X-Git-Url: https://wannabe.guru.org/gitweb/?a=blobdiff_plain;f=executors.py;h=2829c6957268dd9d607f0db4e4102510b972e38a;hb=fd4daa9557429e519fadd6e7e355fb625bad26cc;hp=dabddf31b65e6687087820f7f2115a1197e93281;hpb=b454ad295eb3024a238d32bf2aef1ebc3c496b44;p=python_utils.git diff --git a/executors.py b/executors.py index dabddf3..2829c69 100644 --- a/executors.py +++ b/executors.py @@ -61,9 +61,8 @@ parser.add_argument( help='Maximum number of failures before giving up on a bundle', ) -RSYNC = 'rsync -q --no-motd -W --ignore-existing --timeout=60 --size-only -z' -SSH = 'ssh -oForwardX11=no' - +SSH = '/usr/bin/ssh -oForwardX11=no' +SCP = '/usr/bin/scp' def make_cloud_pickle(fun, *args, **kwargs): logger.debug(f"Making cloudpickled bundle at {fun.__name__}") @@ -493,7 +492,6 @@ class WeightedRandomRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy): return worker msg = 'Unexpectedly could not find a worker, retrying...' logger.warning(msg) - warnings.warn(msg) return None @@ -530,7 +528,6 @@ class RoundRobinRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy): if x == self.index: msg = 'Unexpectedly could not find a worker, retrying...' logger.warning(msg) - warnings.warn(msg) return None @@ -743,7 +740,7 @@ class RemoteExecutor(BaseExecutor): # Send input code / data to worker machine if it's not local. if hostname not in machine: try: - cmd = f'{RSYNC} {bundle.code_file} {username}@{machine}:{bundle.code_file}' + cmd = f'{SCP} {bundle.code_file} {username}@{machine}:{bundle.code_file}' start_ts = time.time() logger.info(f"{bundle}: Copying work to {worker} via {cmd}.") run_silently(cmd) @@ -773,11 +770,10 @@ class RemoteExecutor(BaseExecutor): # There's a race condition where someone else # already finished the work and removed the source # code file before we could copy it. No biggie. - msg = f'{bundle}: Failed to send instructions to the worker machine... ' + - 'We\'re a backup and this may be caused by the original (or some ' + - 'other backup) already finishing this work. Ignoring this.' + msg = f'{bundle}: Failed to send instructions to the worker machine... ' + msg += 'We\'re a backup and this may be caused by the original (or some ' + msg += 'other backup) already finishing this work. Ignoring this.' logger.warning(msg) - warnings.warn(msg) return None # Kick off the work. Note that if this fails we let @@ -851,7 +847,6 @@ class RemoteExecutor(BaseExecutor): if p is not None: msg = f"{bundle}: Failed to wrap up \"done\" bundle, re-waiting on active ssh." logger.warning(msg) - warnings.warn(msg) return self.wait_for_process(p, bundle, depth + 1) else: self.status.record_release_worker( @@ -879,7 +874,7 @@ class RemoteExecutor(BaseExecutor): if not was_cancelled: assert bundle.machine is not None if bundle.hostname not in bundle.machine: - cmd = f'{RSYNC} {username}@{machine}:{result_file} {result_file} 2>/dev/null' + cmd = f'{SCP} {username}@{machine}:{result_file} {result_file} 2>/dev/null' logger.info( f"{bundle}: Fetching results from {username}@{machine} via {cmd}" ) @@ -962,7 +957,7 @@ class RemoteExecutor(BaseExecutor): def create_original_bundle(self, pickle, fname: str): from string_utils import generate_uuid - uuid = generate_uuid(as_hex=True) + uuid = generate_uuid(omit_dashes=True) code_file = f'/tmp/{uuid}.code.bin' result_file = f'/tmp/{uuid}.result.bin'