class RemoteExecutorStatus:
def __init__(self, total_worker_count: int) -> None:
- self.worker_count = total_worker_count
+ self.worker_count: int = total_worker_count
self.known_workers: Set[RemoteWorkerRecord] = set()
+ self.start_time: float = time.time()
self.start_per_bundle: Dict[str, float] = defaultdict(float)
self.end_per_bundle: Dict[str, float] = defaultdict(float)
self.finished_bundle_timings_per_worker: Dict[
self.bundle_details_by_uuid: Dict[str, BundleDetails] = {}
self.finished_bundle_timings: List[float] = []
self.last_periodic_dump: Optional[float] = None
- self.total_bundles_submitted = 0
+ self.total_bundles_submitted: int = 0
# Protects reads and modification using self. Also used
# as a memory fence for modifications to bundle.
- self.lock = threading.Lock()
+ self.lock: threading.Lock = threading.Lock()
def record_acquire_worker(self, worker: RemoteWorkerRecord, uuid: str) -> None:
with self.lock:
if len(self.finished_bundle_timings) > 1:
qall = numpy.quantile(self.finished_bundle_timings, [0.5, 0.95])
ret += (
- f'⏱=∀p50:{qall[0]:.1f}s, ∀p95:{qall[1]:.1f}s, '
+ f'⏱=∀p50:{qall[0]:.1f}s, ∀p95:{qall[1]:.1f}s, total={ts-self.start_time:.1f}s, '
f'✅={total_finished}/{self.total_bundles_submitted}, '
f'💻n={total_in_flight}/{self.worker_count}\n'
)
else:
ret += (
- f' ✅={total_finished}/{self.total_bundles_submitted}, '
+ f'⏱={ts-self.start_time:.1f}s, '
+ f'✅={total_finished}/{self.total_bundles_submitted}, '
f'💻n={total_in_flight}/{self.worker_count}\n'
)