X-Git-Url: https://wannabe.guru.org/gitweb/?a=blobdiff_plain;f=src%2Fpyutils%2Fparallelize%2Fexecutors.py;h=fd70e327b75b81a25e6e20a470de3c36a296d125;hb=a3257d6eba8699af801991cec28a0c49eb034d40;hp=7bd44ca1ddf5d90662f5c5dcfd55c65642423c40;hpb=2ca7b9cb3a9ad208b4fac1e179048b8e5305c832;p=pyutils.git diff --git a/src/pyutils/parallelize/executors.py b/src/pyutils/parallelize/executors.py index 7bd44ca..fd70e32 100644 --- a/src/pyutils/parallelize/executors.py +++ b/src/pyutils/parallelize/executors.py @@ -28,11 +28,10 @@ from dataclasses import dataclass, fields from typing import Any, Callable, Dict, List, Optional, Set import cloudpickle # type: ignore -import numpy from overrides import overrides import pyutils.typez.histogram as hist -from pyutils import argparse_utils, config, persistent, string_utils +from pyutils import argparse_utils, config, math_utils, persistent, string_utils from pyutils.ansi import bg, fg, reset, underline from pyutils.decorator_utils import singleton from pyutils.exec_utils import cmd_exitcode, cmd_in_background, run_silently @@ -408,11 +407,13 @@ class RemoteExecutorStatus: self.start_per_bundle: Dict[str, Optional[float]] = defaultdict(float) self.end_per_bundle: Dict[str, float] = defaultdict(float) self.finished_bundle_timings_per_worker: Dict[ - RemoteWorkerRecord, List[float] + RemoteWorkerRecord, math_utils.NumericPopulation ] = {} self.in_flight_bundles_by_worker: Dict[RemoteWorkerRecord, Set[str]] = {} self.bundle_details_by_uuid: Dict[str, BundleDetails] = {} - self.finished_bundle_timings: List[float] = [] + self.finished_bundle_timings: math_utils.NumericPopulation = ( + math_utils.NumericPopulation() + ) self.last_periodic_dump: Optional[float] = None self.total_bundles_submitted: int = 0 @@ -477,10 +478,12 @@ class RemoteExecutorStatus: start = self.start_per_bundle[uuid] assert start is not None bundle_latency = ts - start - x = self.finished_bundle_timings_per_worker.get(worker, []) - x.append(bundle_latency) + x = self.finished_bundle_timings_per_worker.get( + worker, math_utils.NumericPopulation() + ) + x.add_number(bundle_latency) self.finished_bundle_timings_per_worker[worker] = x - self.finished_bundle_timings.append(bundle_latency) + self.finished_bundle_timings.add_number(bundle_latency) def record_processing_began(self, uuid: str): """Record when work on a bundle begins.""" @@ -508,9 +511,10 @@ class RemoteExecutorStatus: ret = f'\n\n{underline()}Remote Executor Pool Status{reset()}: ' qall = None if len(self.finished_bundle_timings) > 1: - qall = numpy.quantile(self.finished_bundle_timings, [0.5, 0.95]) + qall_median = self.finished_bundle_timings.get_median() + qall_p95 = self.finished_bundle_timings.get_percentile(95) ret += ( - f'⏱=∀p50:{qall[0]:.1f}s, ∀p95:{qall[1]:.1f}s, total={ts-self.start_time:.1f}s, ' + f'⏱=∀p50:{qall_median:.1f}s, ∀p95:{qall_p95:.1f}s, total={ts-self.start_time:.1f}s, ' f'✅={total_finished}/{self.total_bundles_submitted}, ' f'💻n={total_in_flight}/{self.worker_count}\n' ) @@ -523,12 +527,16 @@ class RemoteExecutorStatus: for worker in self.known_workers: ret += f' {fg("lightning yellow")}{worker.machine}{reset()}: ' - timings = self.finished_bundle_timings_per_worker.get(worker, []) + timings = self.finished_bundle_timings_per_worker.get( + worker, math_utils.NumericPopulation() + ) count = len(timings) - qworker = None + qworker_median = None + qworker_p95 = None if count > 1: - qworker = numpy.quantile(timings, [0.5, 0.95]) - ret += f' 💻p50: {qworker[0]:.1f}s, 💻p95: {qworker[1]:.1f}s\n' + qworker_median = timings.get_median() + qworker_p95 = timings.get_percentile(95) + ret += f' 💻p50: {qworker_median:.1f}s, 💻p95: {qworker_p95:.1f}s\n' else: ret += '\n' if count > 0: @@ -546,8 +554,8 @@ class RemoteExecutorStatus: ret += f' {details} setting up / copying data...' sec = 0.0 - if qworker is not None: - if sec > qworker[1]: + if qworker_p95 is not None: + if sec > qworker_p95: ret += f'{bg("red")}>💻p95{reset()} ' if details is not None: details.slower_than_local_p95 = True