+ (self.heartbeat_thread, self.heartbeat_stop_event) = self.run_periodic_heartbeat()
+
+ @background_thread
+ def run_periodic_heartbeat(self, stop_event) -> None:
+ while not stop_event.is_set():
+ time.sleep(5.0)
+ logger.debug('Running periodic heartbeat code...')
+ self.heartbeat()
+ logger.debug('Periodic heartbeat thread shutting down.')
+
+ def heartbeat(self) -> None:
+ with self.status.lock:
+ # Dump regular progress report
+ self.status.periodic_dump(self.total_bundles_submitted)
+
+ # Look for bundles to reschedule via executor.submit
+ if config.config['executors_schedule_remote_backups']:
+ self.maybe_schedule_backup_bundles()
+
+ def maybe_schedule_backup_bundles(self):
+ assert self.status.lock.locked()
+ num_done = len(self.status.finished_bundle_timings)
+ num_idle_workers = self.worker_count - self.task_count
+ now = time.time()
+ if (
+ num_done > 2
+ and num_idle_workers > 1
+ and (self.last_backup is None or (now - self.last_backup > 6.0))
+ and self.backup_lock.acquire(blocking=False)
+ ):
+ try:
+ assert self.backup_lock.locked()
+
+ bundle_to_backup = None
+ best_score = None
+ for worker, bundle_uuids in self.status.in_flight_bundles_by_worker.items():
+
+ # Prefer to schedule backups of bundles running on
+ # slower machines.
+ base_score = 0
+ for record in self.workers:
+ if worker.machine == record.machine:
+ base_score = float(record.weight)
+ base_score = 1.0 / base_score
+ base_score *= 200.0
+ base_score = int(base_score)
+ break
+
+ for uuid in bundle_uuids:
+ bundle = self.status.bundle_details_by_uuid.get(uuid, None)
+ if (
+ bundle is not None
+ and bundle.src_bundle is None
+ and bundle.backup_bundles is not None
+ ):
+ score = base_score
+
+ # Schedule backups of bundles running
+ # longer; especially those that are
+ # unexpectedly slow.
+ start_ts = self.status.start_per_bundle[uuid]
+ if start_ts is not None:
+ runtime = now - start_ts
+ score += runtime
+ logger.debug(f'score[{bundle}] => {score} # latency boost')
+
+ if bundle.slower_than_local_p95:
+ score += runtime / 2
+ logger.debug(f'score[{bundle}] => {score} # >worker p95')
+
+ if bundle.slower_than_global_p95:
+ score += runtime / 4
+ logger.debug(f'score[{bundle}] => {score} # >global p95')
+
+ # Prefer backups of bundles that don't
+ # have backups already.
+ backup_count = len(bundle.backup_bundles)
+ if backup_count == 0:
+ score *= 2
+ elif backup_count == 1:
+ score /= 2
+ elif backup_count == 2:
+ score /= 8
+ else:
+ score = 0
+ logger.debug(
+ f'score[{bundle}] => {score} # {backup_count} dup backup factor'
+ )
+
+ if (
+ score != 0
+ and (best_score is None or score > best_score)
+ ):
+ bundle_to_backup = bundle
+ assert bundle is not None
+ assert bundle.backup_bundles is not None
+ assert bundle.src_bundle is None
+ best_score = score
+
+ # Note: this is all still happening on the heartbeat
+ # runner thread. That's ok because
+ # schedule_backup_for_bundle uses the executor to
+ # submit the bundle again which will cause it to be
+ # picked up by a worker thread and allow this thread
+ # to return to run future heartbeats.
+ if bundle_to_backup is not None:
+ self.last_backup = now
+ logger.info(
+ f'=====> SCHEDULING BACKUP {bundle_to_backup} (score={best_score:.1f}) <====='
+ )
+ self.schedule_backup_for_bundle(bundle_to_backup)
+ finally:
+ self.backup_lock.release()