from typing import Any, Callable, Dict, List, Optional, Set
import cloudpickle # type: ignore
-import numpy
from overrides import overrides
import pyutils.typez.histogram as hist
-from pyutils import argparse_utils, config, persistent, string_utils
+from pyutils import argparse_utils, config, math_utils, persistent, string_utils
from pyutils.ansi import bg, fg, reset, underline
from pyutils.decorator_utils import singleton
from pyutils.exec_utils import cmd_exitcode, cmd_in_background, run_silently
self.start_per_bundle: Dict[str, Optional[float]] = defaultdict(float)
self.end_per_bundle: Dict[str, float] = defaultdict(float)
self.finished_bundle_timings_per_worker: Dict[
- RemoteWorkerRecord, List[float]
+ RemoteWorkerRecord, math_utils.NumericPopulation
] = {}
self.in_flight_bundles_by_worker: Dict[RemoteWorkerRecord, Set[str]] = {}
self.bundle_details_by_uuid: Dict[str, BundleDetails] = {}
- self.finished_bundle_timings: List[float] = []
+ self.finished_bundle_timings: math_utils.NumericPopulation = (
+ math_utils.NumericPopulation()
+ )
self.last_periodic_dump: Optional[float] = None
self.total_bundles_submitted: int = 0
start = self.start_per_bundle[uuid]
assert start is not None
bundle_latency = ts - start
- x = self.finished_bundle_timings_per_worker.get(worker, [])
- x.append(bundle_latency)
+ x = self.finished_bundle_timings_per_worker.get(
+ worker, math_utils.NumericPopulation()
+ )
+ x.add_number(bundle_latency)
self.finished_bundle_timings_per_worker[worker] = x
- self.finished_bundle_timings.append(bundle_latency)
+ self.finished_bundle_timings.add_number(bundle_latency)
def record_processing_began(self, uuid: str):
"""Record when work on a bundle begins."""
ret = f'\n\n{underline()}Remote Executor Pool Status{reset()}: '
qall = None
if len(self.finished_bundle_timings) > 1:
- qall = numpy.quantile(self.finished_bundle_timings, [0.5, 0.95])
+ qall_median = self.finished_bundle_timings.get_median()
+ qall_p95 = self.finished_bundle_timings.get_percentile(95)
ret += (
- f'⏱=∀p50:{qall[0]:.1f}s, ∀p95:{qall[1]:.1f}s, total={ts-self.start_time:.1f}s, '
+ f'⏱=∀p50:{qall_median:.1f}s, ∀p95:{qall_p95:.1f}s, total={ts-self.start_time:.1f}s, '
f'✅={total_finished}/{self.total_bundles_submitted}, '
f'💻n={total_in_flight}/{self.worker_count}\n'
)
for worker in self.known_workers:
ret += f' {fg("lightning yellow")}{worker.machine}{reset()}: '
- timings = self.finished_bundle_timings_per_worker.get(worker, [])
+ timings = self.finished_bundle_timings_per_worker.get(
+ worker, math_utils.NumericPopulation()
+ )
count = len(timings)
- qworker = None
+ qworker_median = None
+ qworker_p95 = None
if count > 1:
- qworker = numpy.quantile(timings, [0.5, 0.95])
- ret += f' 💻p50: {qworker[0]:.1f}s, 💻p95: {qworker[1]:.1f}s\n'
+ qworker_median = timings.get_median()
+ qworker_p95 = timings.get_percentile(95)
+ ret += f' 💻p50: {qworker_median:.1f}s, 💻p95: {qworker_p95:.1f}s\n'
else:
ret += '\n'
if count > 0:
ret += f' {details} setting up / copying data...'
sec = 0.0
- if qworker is not None:
- if sec > qworker[1]:
+ if qworker_p95 is not None:
+ if sec > qworker_p95:
ret += f'{bg("red")}>💻p95{reset()} '
if details is not None:
details.slower_than_local_p95 = True