return state
+class RemoteExecutorException(Exception):
+ """Thrown when a bundle cannot be executed despite several retries."""
+ pass
+
+
@dataclass
class RemoteWorkerRecord:
username: str
if self.worker_count <= 0:
msg = f"We need somewhere to schedule work; count was {self.worker_count}"
logger.critical(msg)
- raise Exception(msg)
+ raise RemoteExecutorException(msg)
self.policy.register_worker_pool(self.workers)
self.cv = threading.Condition()
logger.debug(f'Creating {self.worker_count} local threads, one per remote worker.')
)
self.status = RemoteExecutorStatus(self.worker_count)
self.total_bundles_submitted = 0
- logger.debug(
- f'Creating remote processpool with {self.worker_count} remote worker threads.'
- )
def is_worker_available(self) -> bool:
return self.policy.is_worker_available()
# Regular progress report
self.status.periodic_dump(self.total_bundles_submitted)
- # Look for bundles to reschedule
+ # Look for bundles to reschedule.
num_done = len(self.status.finished_bundle_timings)
if num_done > 7 or (num_done > 5 and self.is_worker_available()):
for worker, bundle_uuids in self.status.in_flight_bundles_by_worker.items():
logger.info(f"{uuid}/{fname}: Copying work to {worker} via {cmd}.")
run_silently(cmd)
xfer_latency = time.time() - start_ts
- logger.info(f"{uuid}/{fname}: Copying done in {xfer_latency}s.")
+ logger.info(f"{uuid}/{fname}: Copying done in {xfer_latency:.1f}s.")
except Exception as e:
logger.exception(e)
logger.error(
f'{uuid}: Tried this bundle too many times already ({retry_limit}x); giving up.'
)
if is_original:
- logger.critical(
- f'{uuid}: This is the original of the bundle; results will be incomplete.'
+ raise RemoteExecutorException(
+ f'{uuid}: This bundle can\'t be completed despite several backups and retries'
)
else:
logger.error(f'{uuid}: At least it\'s only a backup; better luck with the others.')