Add __len__ to NumericPopulation and then use it instead of a dependency on
[pyutils.git] / src / pyutils / parallelize / executors.py
index 7bd44ca1ddf5d90662f5c5dcfd55c65642423c40..fd70e327b75b81a25e6e20a470de3c36a296d125 100644 (file)
@@ -28,11 +28,10 @@ from dataclasses import dataclass, fields
 from typing import Any, Callable, Dict, List, Optional, Set
 
 import cloudpickle  # type: ignore
-import numpy
 from overrides import overrides
 
 import pyutils.typez.histogram as hist
-from pyutils import argparse_utils, config, persistent, string_utils
+from pyutils import argparse_utils, config, math_utils, persistent, string_utils
 from pyutils.ansi import bg, fg, reset, underline
 from pyutils.decorator_utils import singleton
 from pyutils.exec_utils import cmd_exitcode, cmd_in_background, run_silently
@@ -408,11 +407,13 @@ class RemoteExecutorStatus:
         self.start_per_bundle: Dict[str, Optional[float]] = defaultdict(float)
         self.end_per_bundle: Dict[str, float] = defaultdict(float)
         self.finished_bundle_timings_per_worker: Dict[
-            RemoteWorkerRecord, List[float]
+            RemoteWorkerRecord, math_utils.NumericPopulation
         ] = {}
         self.in_flight_bundles_by_worker: Dict[RemoteWorkerRecord, Set[str]] = {}
         self.bundle_details_by_uuid: Dict[str, BundleDetails] = {}
-        self.finished_bundle_timings: List[float] = []
+        self.finished_bundle_timings: math_utils.NumericPopulation = (
+            math_utils.NumericPopulation()
+        )
         self.last_periodic_dump: Optional[float] = None
         self.total_bundles_submitted: int = 0
 
@@ -477,10 +478,12 @@ class RemoteExecutorStatus:
             start = self.start_per_bundle[uuid]
             assert start is not None
             bundle_latency = ts - start
-            x = self.finished_bundle_timings_per_worker.get(worker, [])
-            x.append(bundle_latency)
+            x = self.finished_bundle_timings_per_worker.get(
+                worker, math_utils.NumericPopulation()
+            )
+            x.add_number(bundle_latency)
             self.finished_bundle_timings_per_worker[worker] = x
-            self.finished_bundle_timings.append(bundle_latency)
+            self.finished_bundle_timings.add_number(bundle_latency)
 
     def record_processing_began(self, uuid: str):
         """Record when work on a bundle begins."""
@@ -508,9 +511,10 @@ class RemoteExecutorStatus:
         ret = f'\n\n{underline()}Remote Executor Pool Status{reset()}: '
         qall = None
         if len(self.finished_bundle_timings) > 1:
-            qall = numpy.quantile(self.finished_bundle_timings, [0.5, 0.95])
+            qall_median = self.finished_bundle_timings.get_median()
+            qall_p95 = self.finished_bundle_timings.get_percentile(95)
             ret += (
-                f'⏱=∀p50:{qall[0]:.1f}s, ∀p95:{qall[1]:.1f}s, total={ts-self.start_time:.1f}s, '
+                f'⏱=∀p50:{qall_median:.1f}s, ∀p95:{qall_p95:.1f}s, total={ts-self.start_time:.1f}s, '
                 f'✅={total_finished}/{self.total_bundles_submitted}, '
                 f'💻n={total_in_flight}/{self.worker_count}\n'
             )
@@ -523,12 +527,16 @@ class RemoteExecutorStatus:
 
         for worker in self.known_workers:
             ret += f'  {fg("lightning yellow")}{worker.machine}{reset()}: '
-            timings = self.finished_bundle_timings_per_worker.get(worker, [])
+            timings = self.finished_bundle_timings_per_worker.get(
+                worker, math_utils.NumericPopulation()
+            )
             count = len(timings)
-            qworker = None
+            qworker_median = None
+            qworker_p95 = None
             if count > 1:
-                qworker = numpy.quantile(timings, [0.5, 0.95])
-                ret += f' 💻p50: {qworker[0]:.1f}s, 💻p95: {qworker[1]:.1f}s\n'
+                qworker_median = timings.get_median()
+                qworker_p95 = timings.get_percentile(95)
+                ret += f' 💻p50: {qworker_median:.1f}s, 💻p95: {qworker_p95:.1f}s\n'
             else:
                 ret += '\n'
             if count > 0:
@@ -546,8 +554,8 @@ class RemoteExecutorStatus:
                         ret += f'       {details} setting up / copying data...'
                         sec = 0.0
 
-                    if qworker is not None:
-                        if sec > qworker[1]:
+                    if qworker_p95 is not None:
+                        if sec > qworker_p95:
                             ret += f'{bg("red")}>💻p95{reset()} '
                             if details is not None:
                                 details.slower_than_local_p95 = True