+ # Look for bundles to reschedule via executor.submit
+ if config.config['executors_schedule_remote_backups']:
+ self._maybe_schedule_backup_bundles()
+
+ def _maybe_schedule_backup_bundles(self):
+ """Maybe schedule backup bundles if we see a very slow bundle."""
+
+ assert self.status.lock.locked()
+ num_done = len(self.status.finished_bundle_timings)
+ num_idle_workers = self.worker_count - self.task_count
+ now = time.time()
+ if (
+ num_done >= 2
+ and num_idle_workers > 0
+ and (self.last_backup is None or (now - self.last_backup > 9.0))
+ and self.backup_lock.acquire(blocking=False)
+ ):
+ try:
+ assert self.backup_lock.locked()
+
+ bundle_to_backup = None
+ best_score = None
+ for (
+ worker,
+ bundle_uuids,
+ ) in self.status.in_flight_bundles_by_worker.items():
+
+ # Prefer to schedule backups of bundles running on
+ # slower machines.
+ base_score = 0
+ for record in self.workers:
+ if worker.machine == record.machine:
+ base_score = float(record.weight)
+ base_score = 1.0 / base_score
+ base_score *= 200.0
+ base_score = int(base_score)
+ break
+
+ for uuid in bundle_uuids:
+ bundle = self.status.bundle_details_by_uuid.get(uuid, None)
+ if (
+ bundle is not None
+ and bundle.src_bundle is None
+ and bundle.backup_bundles is not None
+ ):
+ score = base_score
+
+ # Schedule backups of bundles running
+ # longer; especially those that are
+ # unexpectedly slow.
+ start_ts = self.status.start_per_bundle[uuid]
+ if start_ts is not None:
+ runtime = now - start_ts
+ score += runtime
+ logger.debug('score[%s] => %.1f # latency boost', bundle, score)
+
+ if bundle.slower_than_local_p95:
+ score += runtime / 2
+ logger.debug('score[%s] => %.1f # >worker p95', bundle, score)
+
+ if bundle.slower_than_global_p95:
+ score += runtime / 4
+ logger.debug('score[%s] => %.1f # >global p95', bundle, score)
+
+ # Prefer backups of bundles that don't
+ # have backups already.
+ backup_count = len(bundle.backup_bundles)
+ if backup_count == 0:
+ score *= 2
+ elif backup_count == 1:
+ score /= 2
+ elif backup_count == 2:
+ score /= 8
+ else:
+ score = 0
+ logger.debug(
+ 'score[%s] => %.1f # {backup_count} dup backup factor',
+ bundle,
+ score,
+ )
+
+ if score != 0 and (best_score is None or score > best_score):
+ bundle_to_backup = bundle
+ assert bundle is not None
+ assert bundle.backup_bundles is not None
+ assert bundle.src_bundle is None
+ best_score = score
+
+ # Note: this is all still happening on the heartbeat
+ # runner thread. That's ok because
+ # _schedule_backup_for_bundle uses the executor to
+ # submit the bundle again which will cause it to be
+ # picked up by a worker thread and allow this thread
+ # to return to run future heartbeats.
+ if bundle_to_backup is not None:
+ self.last_backup = now
+ logger.info(
+ '=====> SCHEDULING BACKUP %s (score=%.1f) <=====',
+ bundle_to_backup,
+ best_score,
+ )
+ self._schedule_backup_for_bundle(bundle_to_backup)
+ finally:
+ self.backup_lock.release()
+
+ def _is_worker_available(self) -> bool:
+ """Is there a worker available currently?"""