Making remote training work better.
[python_utils.git] / executors.py
index b16ad92d80a624c466b6d54c5830d5a2f00c8789..fe8d9d0d8e749b0aa85609d04c3444e35b6e89d3 100644 (file)
@@ -197,6 +197,11 @@ class ProcessExecutor(BaseExecutor):
         return state
 
 
+class RemoteExecutorException(Exception):
+    """Thrown when a bundle cannot be executed despite several retries."""
+    pass
+
+
 @dataclass
 class RemoteWorkerRecord:
     username: str
@@ -508,7 +513,7 @@ class RemoteExecutor(BaseExecutor):
         if self.worker_count <= 0:
             msg = f"We need somewhere to schedule work; count was {self.worker_count}"
             logger.critical(msg)
-            raise Exception(msg)
+            raise RemoteExecutorException(msg)
         self.policy.register_worker_pool(self.workers)
         self.cv = threading.Condition()
         logger.debug(f'Creating {self.worker_count} local threads, one per remote worker.')
@@ -518,9 +523,6 @@ class RemoteExecutor(BaseExecutor):
         )
         self.status = RemoteExecutorStatus(self.worker_count)
         self.total_bundles_submitted = 0
-        logger.debug(
-            f'Creating remote processpool with {self.worker_count} remote worker threads.'
-        )
 
     def is_worker_available(self) -> bool:
         return self.policy.is_worker_available()
@@ -556,7 +558,7 @@ class RemoteExecutor(BaseExecutor):
             # Regular progress report
             self.status.periodic_dump(self.total_bundles_submitted)
 
-            # Look for bundles to reschedule
+            # Look for bundles to reschedule.
             num_done = len(self.status.finished_bundle_timings)
             if num_done > 7 or (num_done > 5 and self.is_worker_available()):
                 for worker, bundle_uuids in self.status.in_flight_bundles_by_worker.items():
@@ -663,7 +665,7 @@ class RemoteExecutor(BaseExecutor):
                 logger.info(f"{uuid}/{fname}: Copying work to {worker} via {cmd}.")
                 run_silently(cmd)
                 xfer_latency = time.time() - start_ts
-                logger.info(f"{uuid}/{fname}: Copying done in {xfer_latency}s.")
+                logger.info(f"{uuid}/{fname}: Copying done in {xfer_latency:.1f}s.")
             except Exception as e:
                 logger.exception(e)
                 logger.error(
@@ -969,8 +971,8 @@ class RemoteExecutor(BaseExecutor):
                 f'{uuid}: Tried this bundle too many times already ({retry_limit}x); giving up.'
             )
             if is_original:
-                logger.critical(
-                    f'{uuid}: This is the original of the bundle; results will be incomplete.'
+                raise RemoteExecutorException(
+                    f'{uuid}: This bundle can\'t be completed despite several backups and retries'
                 )
             else:
                 logger.error(f'{uuid}: At least it\'s only a backup; better luck with the others.')