X-Git-Url: https://wannabe.guru.org/gitweb/?a=blobdiff_plain;f=executors.py;h=fe8d9d0d8e749b0aa85609d04c3444e35b6e89d3;hb=bef486c8c06e8d743a98b89910658a615acc8bbc;hp=b16ad92d80a624c466b6d54c5830d5a2f00c8789;hpb=2c54bfde335f3631f045a871c540c9d63c5bb081;p=python_utils.git diff --git a/executors.py b/executors.py index b16ad92..fe8d9d0 100644 --- a/executors.py +++ b/executors.py @@ -197,6 +197,11 @@ class ProcessExecutor(BaseExecutor): return state +class RemoteExecutorException(Exception): + """Thrown when a bundle cannot be executed despite several retries.""" + pass + + @dataclass class RemoteWorkerRecord: username: str @@ -508,7 +513,7 @@ class RemoteExecutor(BaseExecutor): if self.worker_count <= 0: msg = f"We need somewhere to schedule work; count was {self.worker_count}" logger.critical(msg) - raise Exception(msg) + raise RemoteExecutorException(msg) self.policy.register_worker_pool(self.workers) self.cv = threading.Condition() logger.debug(f'Creating {self.worker_count} local threads, one per remote worker.') @@ -518,9 +523,6 @@ class RemoteExecutor(BaseExecutor): ) self.status = RemoteExecutorStatus(self.worker_count) self.total_bundles_submitted = 0 - logger.debug( - f'Creating remote processpool with {self.worker_count} remote worker threads.' - ) def is_worker_available(self) -> bool: return self.policy.is_worker_available() @@ -556,7 +558,7 @@ class RemoteExecutor(BaseExecutor): # Regular progress report self.status.periodic_dump(self.total_bundles_submitted) - # Look for bundles to reschedule + # Look for bundles to reschedule. num_done = len(self.status.finished_bundle_timings) if num_done > 7 or (num_done > 5 and self.is_worker_available()): for worker, bundle_uuids in self.status.in_flight_bundles_by_worker.items(): @@ -663,7 +665,7 @@ class RemoteExecutor(BaseExecutor): logger.info(f"{uuid}/{fname}: Copying work to {worker} via {cmd}.") run_silently(cmd) xfer_latency = time.time() - start_ts - logger.info(f"{uuid}/{fname}: Copying done in {xfer_latency}s.") + logger.info(f"{uuid}/{fname}: Copying done in {xfer_latency:.1f}s.") except Exception as e: logger.exception(e) logger.error( @@ -969,8 +971,8 @@ class RemoteExecutor(BaseExecutor): f'{uuid}: Tried this bundle too many times already ({retry_limit}x); giving up.' ) if is_original: - logger.critical( - f'{uuid}: This is the original of the bundle; results will be incomplete.' + raise RemoteExecutorException( + f'{uuid}: This bundle can\'t be completed despite several backups and retries' ) else: logger.error(f'{uuid}: At least it\'s only a backup; better luck with the others.')