Stop using rsync in executors; this was a hack to work around some
[python_utils.git] / executors.py
index cdbb811a6acd2cb9af12fefe49c61b65adba5ad0..2829c6957268dd9d607f0db4e4102510b972e38a 100644 (file)
@@ -15,6 +15,7 @@ import subprocess
 import threading
 import time
 from typing import Any, Callable, Dict, List, Optional, Set
+import warnings
 
 import cloudpickle  # type: ignore
 from overrides import overrides
@@ -60,9 +61,8 @@ parser.add_argument(
     help='Maximum number of failures before giving up on a bundle',
 )
 
-RSYNC = 'rsync -q --no-motd -W --ignore-existing --timeout=60 --size-only -z'
-SSH = 'ssh -oForwardX11=no'
-
+SSH = '/usr/bin/ssh -oForwardX11=no'
+SCP = '/usr/bin/scp'
 
 def make_cloud_pickle(fun, *args, **kwargs):
     logger.debug(f"Making cloudpickled bundle at {fun.__name__}")
@@ -490,7 +490,8 @@ class WeightedRandomRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
                     worker.count -= 1
                     logger.debug(f'Selected worker {worker}')
                     return worker
-        logger.warning("Couldn't find a worker; go fish.")
+        msg = 'Unexpectedly could not find a worker, retrying...'
+        logger.warning(msg)
         return None
 
 
@@ -525,7 +526,8 @@ class RoundRobinRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
             if x >= len(self.workers):
                 x = 0
             if x == self.index:
-                logger.warning("Couldn't find a worker; go fish.")
+                msg = 'Unexpectedly could not find a worker, retrying...'
+                logger.warning(msg)
                 return None
 
 
@@ -738,7 +740,7 @@ class RemoteExecutor(BaseExecutor):
         # Send input code / data to worker machine if it's not local.
         if hostname not in machine:
             try:
-                cmd = f'{RSYNC} {bundle.code_file} {username}@{machine}:{bundle.code_file}'
+                cmd = f'{SCP} {bundle.code_file} {username}@{machine}:{bundle.code_file}'
                 start_ts = time.time()
                 logger.info(f"{bundle}: Copying work to {worker} via {cmd}.")
                 run_silently(cmd)
@@ -768,11 +770,10 @@ class RemoteExecutor(BaseExecutor):
                     # There's a race condition where someone else
                     # already finished the work and removed the source
                     # code file before we could copy it.  No biggie.
-                    logger.warning(
-                        f'{bundle}: Failed to send instructions to the worker machine... ' +
-                        'We\'re a backup and this may be caused by the original (or some ' +
-                        'other backup) already finishing this work.  Ignoring this.'
-                    )
+                    msg = f'{bundle}: Failed to send instructions to the worker machine... '
+                    msg += 'We\'re a backup and this may be caused by the original (or some '
+                    msg += 'other backup) already finishing this work.  Ignoring this.'
+                    logger.warning(msg)
                     return None
 
         # Kick off the work.  Note that if this fails we let
@@ -844,9 +845,8 @@ class RemoteExecutor(BaseExecutor):
             logger.exception(e)
             logger.error(f'{bundle}: Something unexpected just happened...')
             if p is not None:
-                logger.warning(
-                    f"{bundle}: Failed to wrap up \"done\" bundle, re-waiting on active ssh."
-                )
+                msg = f"{bundle}: Failed to wrap up \"done\" bundle, re-waiting on active ssh."
+                logger.warning(msg)
                 return self.wait_for_process(p, bundle, depth + 1)
             else:
                 self.status.record_release_worker(
@@ -874,7 +874,7 @@ class RemoteExecutor(BaseExecutor):
             if not was_cancelled:
                 assert bundle.machine is not None
                 if bundle.hostname not in bundle.machine:
-                    cmd = f'{RSYNC} {username}@{machine}:{result_file} {result_file} 2>/dev/null'
+                    cmd = f'{SCP} {username}@{machine}:{result_file} {result_file} 2>/dev/null'
                     logger.info(
                         f"{bundle}: Fetching results from {username}@{machine} via {cmd}"
                     )
@@ -957,7 +957,7 @@ class RemoteExecutor(BaseExecutor):
 
     def create_original_bundle(self, pickle, fname: str):
         from string_utils import generate_uuid
-        uuid = generate_uuid(as_hex=True)
+        uuid = generate_uuid(omit_dashes=True)
         code_file = f'/tmp/{uuid}.code.bin'
         result_file = f'/tmp/{uuid}.result.bin'
 
@@ -1059,9 +1059,9 @@ class RemoteExecutor(BaseExecutor):
                 logger.error(f'{bundle}: At least it\'s only a backup; better luck with the others.')
             return None
         else:
-            logger.warning(
-                f'>>> Emergency rescheduling {bundle} because of unexected errors (wtf?!) <<<'
-            )
+            msg = f'>>> Emergency rescheduling {bundle} because of unexected errors (wtf?!) <<<'
+            logger.warning(msg)
+            warnings.warn(msg)
             return self.launch(bundle, avoid_last_machine)
 
     @overrides