From a3257d6eba8699af801991cec28a0c49eb034d40 Mon Sep 17 00:00:00 2001 From: Scott Gasch Date: Wed, 12 Oct 2022 12:31:51 -0700 Subject: [PATCH] Add __len__ to NumericPopulation and then use it instead of a dependency on numpy. --- pyproject.template | 1 - pyproject.toml | 1 - src/pyutils/math_utils.py | 11 ++++++++ src/pyutils/parallelize/executors.py | 38 +++++++++++++++++----------- 4 files changed, 34 insertions(+), 17 deletions(-) diff --git a/pyproject.template b/pyproject.template index b155476..262c864 100644 --- a/pyproject.template +++ b/pyproject.template @@ -18,7 +18,6 @@ dependencies = [ "bitstring", "cloudpickle", "holidays", - "numpy", "overrides", "python-dateutil", "pytz", diff --git a/pyproject.toml b/pyproject.toml index 1f771f0..a19d89d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -18,7 +18,6 @@ dependencies = [ "bitstring", "cloudpickle", "holidays", - "numpy", "overrides", "python-dateutil", "pytz", diff --git a/src/pyutils/math_utils.py b/src/pyutils/math_utils.py index 97bb635..ce9940d 100644 --- a/src/pyutils/math_utils.py +++ b/src/pyutils/math_utils.py @@ -21,6 +21,8 @@ class NumericPopulation(object): >>> pop.add_number(1) >>> pop.add_number(10) >>> pop.add_number(3) + >>> len(pop) + 3 >>> pop.get_median() 3 >>> pop.add_number(7) @@ -59,6 +61,15 @@ class NumericPopulation(object): if not self.minimum or number < self.minimum: self.minimum = number + def __len__(self): + """Return the population size.""" + n = 0 + if self.highers: + n += len(self.highers) + if self.lowers: + n += len(self.lowers) + return n + def _rebalance(self): if len(self.lowers) - len(self.highers) > 1: heappush(self.highers, -heappop(self.lowers)) diff --git a/src/pyutils/parallelize/executors.py b/src/pyutils/parallelize/executors.py index 7bd44ca..fd70e32 100644 --- a/src/pyutils/parallelize/executors.py +++ b/src/pyutils/parallelize/executors.py @@ -28,11 +28,10 @@ from dataclasses import dataclass, fields from typing import Any, Callable, Dict, List, Optional, Set import cloudpickle # type: ignore -import numpy from overrides import overrides import pyutils.typez.histogram as hist -from pyutils import argparse_utils, config, persistent, string_utils +from pyutils import argparse_utils, config, math_utils, persistent, string_utils from pyutils.ansi import bg, fg, reset, underline from pyutils.decorator_utils import singleton from pyutils.exec_utils import cmd_exitcode, cmd_in_background, run_silently @@ -408,11 +407,13 @@ class RemoteExecutorStatus: self.start_per_bundle: Dict[str, Optional[float]] = defaultdict(float) self.end_per_bundle: Dict[str, float] = defaultdict(float) self.finished_bundle_timings_per_worker: Dict[ - RemoteWorkerRecord, List[float] + RemoteWorkerRecord, math_utils.NumericPopulation ] = {} self.in_flight_bundles_by_worker: Dict[RemoteWorkerRecord, Set[str]] = {} self.bundle_details_by_uuid: Dict[str, BundleDetails] = {} - self.finished_bundle_timings: List[float] = [] + self.finished_bundle_timings: math_utils.NumericPopulation = ( + math_utils.NumericPopulation() + ) self.last_periodic_dump: Optional[float] = None self.total_bundles_submitted: int = 0 @@ -477,10 +478,12 @@ class RemoteExecutorStatus: start = self.start_per_bundle[uuid] assert start is not None bundle_latency = ts - start - x = self.finished_bundle_timings_per_worker.get(worker, []) - x.append(bundle_latency) + x = self.finished_bundle_timings_per_worker.get( + worker, math_utils.NumericPopulation() + ) + x.add_number(bundle_latency) self.finished_bundle_timings_per_worker[worker] = x - self.finished_bundle_timings.append(bundle_latency) + self.finished_bundle_timings.add_number(bundle_latency) def record_processing_began(self, uuid: str): """Record when work on a bundle begins.""" @@ -508,9 +511,10 @@ class RemoteExecutorStatus: ret = f'\n\n{underline()}Remote Executor Pool Status{reset()}: ' qall = None if len(self.finished_bundle_timings) > 1: - qall = numpy.quantile(self.finished_bundle_timings, [0.5, 0.95]) + qall_median = self.finished_bundle_timings.get_median() + qall_p95 = self.finished_bundle_timings.get_percentile(95) ret += ( - f'⏱=∀p50:{qall[0]:.1f}s, ∀p95:{qall[1]:.1f}s, total={ts-self.start_time:.1f}s, ' + f'⏱=∀p50:{qall_median:.1f}s, ∀p95:{qall_p95:.1f}s, total={ts-self.start_time:.1f}s, ' f'✅={total_finished}/{self.total_bundles_submitted}, ' f'💻n={total_in_flight}/{self.worker_count}\n' ) @@ -523,12 +527,16 @@ class RemoteExecutorStatus: for worker in self.known_workers: ret += f' {fg("lightning yellow")}{worker.machine}{reset()}: ' - timings = self.finished_bundle_timings_per_worker.get(worker, []) + timings = self.finished_bundle_timings_per_worker.get( + worker, math_utils.NumericPopulation() + ) count = len(timings) - qworker = None + qworker_median = None + qworker_p95 = None if count > 1: - qworker = numpy.quantile(timings, [0.5, 0.95]) - ret += f' 💻p50: {qworker[0]:.1f}s, 💻p95: {qworker[1]:.1f}s\n' + qworker_median = timings.get_median() + qworker_p95 = timings.get_percentile(95) + ret += f' 💻p50: {qworker_median:.1f}s, 💻p95: {qworker_p95:.1f}s\n' else: ret += '\n' if count > 0: @@ -546,8 +554,8 @@ class RemoteExecutorStatus: ret += f' {details} setting up / copying data...' sec = 0.0 - if qworker is not None: - if sec > qworker[1]: + if qworker_p95 is not None: + if sec > qworker_p95: ret += f'{bg("red")}>💻p95{reset()} ' if details is not None: details.slower_than_local_p95 = True -- 2.45.2