Add --run_profiler option.
[python_utils.git] / executors.py
index dcc21806efa33e10b9cf507c373205e7c2f4cb62..20aa9d2e325e5b5e54a77836f6117b91d99c36be 100644 (file)
@@ -63,7 +63,7 @@ parser.add_argument(
 )
 
 SSH = '/usr/bin/ssh -oForwardX11=no'
-SCP = '/usr/bin/scp'
+SCP = '/usr/bin/scp -C'
 
 
 def make_cloud_pickle(fun, *args, **kwargs):
@@ -428,21 +428,29 @@ class WeightedRandomRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
     def acquire_worker(self, machine_to_avoid=None) -> Optional[RemoteWorkerRecord]:
         grabbag = []
         for worker in self.workers:
-            for x in range(0, worker.count):
-                for y in range(0, worker.weight):
-                    grabbag.append(worker)
-
-        for _ in range(0, 5):
-            random.shuffle(grabbag)
-            worker = grabbag[0]
-            if worker.machine != machine_to_avoid or _ > 2:
+            if worker.machine != machine_to_avoid:
                 if worker.count > 0:
-                    worker.count -= 1
-                    logger.debug(f'Selected worker {worker}')
-                    return worker
-        msg = 'Unexpectedly could not find a worker, retrying...'
-        logger.warning(msg)
-        return None
+                    for _ in range(worker.count * worker.weight):
+                        grabbag.append(worker)
+
+        if len(grabbag) == 0:
+            logger.debug(
+                f'There are no available workers that avoid {machine_to_avoid}...'
+            )
+            for worker in self.workers:
+                if worker.count > 0:
+                    for _ in range(worker.count * worker.weight):
+                        grabbag.append(worker)
+
+        if len(grabbag) == 0:
+            logger.warning('There are no available workers?!')
+            return None
+
+        worker = random.sample(grabbag, 1)[0]
+        assert worker.count > 0
+        worker.count -= 1
+        logger.debug(f'Chose worker {worker}')
+        return worker
 
 
 class RoundRobinRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
@@ -482,7 +490,9 @@ class RoundRobinRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
 
 class RemoteExecutor(BaseExecutor):
     def __init__(
-        self, workers: List[RemoteWorkerRecord], policy: RemoteWorkerSelectionPolicy
+        self,
+        workers: List[RemoteWorkerRecord],
+        policy: RemoteWorkerSelectionPolicy,
     ) -> None:
         super().__init__()
         self.workers = workers
@@ -513,7 +523,7 @@ class RemoteExecutor(BaseExecutor):
         ) = self.run_periodic_heartbeat()
 
     @background_thread
-    def run_periodic_heartbeat(self, stop_event) -> None:
+    def run_periodic_heartbeat(self, stop_event: threading.Event) -> None:
         while not stop_event.is_set():
             time.sleep(5.0)
             logger.debug('Running periodic heartbeat code...')
@@ -521,8 +531,10 @@ class RemoteExecutor(BaseExecutor):
         logger.debug('Periodic heartbeat thread shutting down.')
 
     def heartbeat(self) -> None:
+        # Note: this is invoked on a background thread, not an
+        # executor thread.  Be careful what you do with it b/c it
+        # needs to get back and dump status again periodically.
         with self.status.lock:
-            # Dump regular progress report
             self.status.periodic_dump(self.total_bundles_submitted)
 
             # Look for bundles to reschedule via executor.submit
@@ -537,7 +549,7 @@ class RemoteExecutor(BaseExecutor):
         if (
             num_done > 2
             and num_idle_workers > 1
-            and (self.last_backup is None or (now - self.last_backup > 6.0))
+            and (self.last_backup is None or (now - self.last_backup > 9.0))
             and self.backup_lock.acquire(blocking=False)
         ):
             try:
@@ -1102,7 +1114,7 @@ class DefaultExecutors(object):
                     RemoteWorkerRecord(
                         username='scott',
                         machine='cheetah.house',
-                        weight=25,
+                        weight=30,
                         count=6,
                     ),
                 )
@@ -1122,7 +1134,7 @@ class DefaultExecutors(object):
                     RemoteWorkerRecord(
                         username='scott',
                         machine='wannabe.house',
-                        weight=30,
+                        weight=25,
                         count=10,
                     ),
                 )
@@ -1132,7 +1144,7 @@ class DefaultExecutors(object):
                     RemoteWorkerRecord(
                         username='scott',
                         machine='puma.cabin',
-                        weight=25,
+                        weight=30,
                         count=6,
                     ),
                 )
@@ -1142,7 +1154,7 @@ class DefaultExecutors(object):
                     RemoteWorkerRecord(
                         username='scott',
                         machine='backup.house',
-                        weight=7,
+                        weight=8,
                         count=2,
                     ),
                 )