Add --run_profiler option.
[python_utils.git] / executors.py
index e092e1058bac0d7ba754aaefbfefcfe51f3ffcec..20aa9d2e325e5b5e54a77836f6117b91d99c36be 100644 (file)
@@ -63,7 +63,7 @@ parser.add_argument(
 )
 
 SSH = '/usr/bin/ssh -oForwardX11=no'
-SCP = '/usr/bin/scp'
+SCP = '/usr/bin/scp -C'
 
 
 def make_cloud_pickle(fun, *args, **kwargs):
@@ -248,8 +248,9 @@ class BundleDetails:
 
 class RemoteExecutorStatus:
     def __init__(self, total_worker_count: int) -> None:
-        self.worker_count = total_worker_count
+        self.worker_count: int = total_worker_count
         self.known_workers: Set[RemoteWorkerRecord] = set()
+        self.start_time: float = time.time()
         self.start_per_bundle: Dict[str, float] = defaultdict(float)
         self.end_per_bundle: Dict[str, float] = defaultdict(float)
         self.finished_bundle_timings_per_worker: Dict[
@@ -259,11 +260,11 @@ class RemoteExecutorStatus:
         self.bundle_details_by_uuid: Dict[str, BundleDetails] = {}
         self.finished_bundle_timings: List[float] = []
         self.last_periodic_dump: Optional[float] = None
-        self.total_bundles_submitted = 0
+        self.total_bundles_submitted: int = 0
 
         # Protects reads and modification using self.  Also used
         # as a memory fence for modifications to bundle.
-        self.lock = threading.Lock()
+        self.lock: threading.Lock = threading.Lock()
 
     def record_acquire_worker(self, worker: RemoteWorkerRecord, uuid: str) -> None:
         with self.lock:
@@ -338,13 +339,14 @@ class RemoteExecutorStatus:
         if len(self.finished_bundle_timings) > 1:
             qall = numpy.quantile(self.finished_bundle_timings, [0.5, 0.95])
             ret += (
-                f'⏱=∀p50:{qall[0]:.1f}s, ∀p95:{qall[1]:.1f}s, '
+                f'⏱=∀p50:{qall[0]:.1f}s, ∀p95:{qall[1]:.1f}s, total={ts-self.start_time:.1f}s, '
                 f'✅={total_finished}/{self.total_bundles_submitted}, '
                 f'💻n={total_in_flight}/{self.worker_count}\n'
             )
         else:
             ret += (
-                f' ✅={total_finished}/{self.total_bundles_submitted}, '
+                f'⏱={ts-self.start_time:.1f}s, '
+                f'✅={total_finished}/{self.total_bundles_submitted}, '
                 f'💻n={total_in_flight}/{self.worker_count}\n'
             )
 
@@ -426,21 +428,29 @@ class WeightedRandomRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
     def acquire_worker(self, machine_to_avoid=None) -> Optional[RemoteWorkerRecord]:
         grabbag = []
         for worker in self.workers:
-            for x in range(0, worker.count):
-                for y in range(0, worker.weight):
-                    grabbag.append(worker)
-
-        for _ in range(0, 5):
-            random.shuffle(grabbag)
-            worker = grabbag[0]
-            if worker.machine != machine_to_avoid or _ > 2:
+            if worker.machine != machine_to_avoid:
+                if worker.count > 0:
+                    for _ in range(worker.count * worker.weight):
+                        grabbag.append(worker)
+
+        if len(grabbag) == 0:
+            logger.debug(
+                f'There are no available workers that avoid {machine_to_avoid}...'
+            )
+            for worker in self.workers:
                 if worker.count > 0:
-                    worker.count -= 1
-                    logger.debug(f'Selected worker {worker}')
-                    return worker
-        msg = 'Unexpectedly could not find a worker, retrying...'
-        logger.warning(msg)
-        return None
+                    for _ in range(worker.count * worker.weight):
+                        grabbag.append(worker)
+
+        if len(grabbag) == 0:
+            logger.warning('There are no available workers?!')
+            return None
+
+        worker = random.sample(grabbag, 1)[0]
+        assert worker.count > 0
+        worker.count -= 1
+        logger.debug(f'Chose worker {worker}')
+        return worker
 
 
 class RoundRobinRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
@@ -480,7 +490,9 @@ class RoundRobinRemoteWorkerSelectionPolicy(RemoteWorkerSelectionPolicy):
 
 class RemoteExecutor(BaseExecutor):
     def __init__(
-        self, workers: List[RemoteWorkerRecord], policy: RemoteWorkerSelectionPolicy
+        self,
+        workers: List[RemoteWorkerRecord],
+        policy: RemoteWorkerSelectionPolicy,
     ) -> None:
         super().__init__()
         self.workers = workers
@@ -511,7 +523,7 @@ class RemoteExecutor(BaseExecutor):
         ) = self.run_periodic_heartbeat()
 
     @background_thread
-    def run_periodic_heartbeat(self, stop_event) -> None:
+    def run_periodic_heartbeat(self, stop_event: threading.Event) -> None:
         while not stop_event.is_set():
             time.sleep(5.0)
             logger.debug('Running periodic heartbeat code...')
@@ -519,8 +531,10 @@ class RemoteExecutor(BaseExecutor):
         logger.debug('Periodic heartbeat thread shutting down.')
 
     def heartbeat(self) -> None:
+        # Note: this is invoked on a background thread, not an
+        # executor thread.  Be careful what you do with it b/c it
+        # needs to get back and dump status again periodically.
         with self.status.lock:
-            # Dump regular progress report
             self.status.periodic_dump(self.total_bundles_submitted)
 
             # Look for bundles to reschedule via executor.submit
@@ -535,7 +549,7 @@ class RemoteExecutor(BaseExecutor):
         if (
             num_done > 2
             and num_idle_workers > 1
-            and (self.last_backup is None or (now - self.last_backup > 6.0))
+            and (self.last_backup is None or (now - self.last_backup > 9.0))
             and self.backup_lock.acquire(blocking=False)
         ):
             try:
@@ -1100,7 +1114,7 @@ class DefaultExecutors(object):
                     RemoteWorkerRecord(
                         username='scott',
                         machine='cheetah.house',
-                        weight=25,
+                        weight=30,
                         count=6,
                     ),
                 )
@@ -1120,7 +1134,7 @@ class DefaultExecutors(object):
                     RemoteWorkerRecord(
                         username='scott',
                         machine='wannabe.house',
-                        weight=30,
+                        weight=25,
                         count=10,
                     ),
                 )
@@ -1130,7 +1144,7 @@ class DefaultExecutors(object):
                     RemoteWorkerRecord(
                         username='scott',
                         machine='puma.cabin',
-                        weight=25,
+                        weight=30,
                         count=6,
                     ),
                 )
@@ -1140,7 +1154,7 @@ class DefaultExecutors(object):
                     RemoteWorkerRecord(
                         username='scott',
                         machine='backup.house',
-                        weight=7,
+                        weight=8,
                         count=2,
                     ),
                 )