From 61faaa42ace9ecae318dd93069db743b7d49a0c9 Mon Sep 17 00:00:00 2001 From: Scott Gasch Date: Thu, 10 Feb 2022 10:38:48 -0800 Subject: [PATCH] Add percentile and change name of RunningMedian class. --- histogram.py | 4 ++-- math_utils.py | 66 +++++++++++++++++++++++++++++++++++++++------------ 2 files changed, 53 insertions(+), 17 deletions(-) diff --git a/histogram.py b/histogram.py index 9c07df9..dd47319 100644 --- a/histogram.py +++ b/histogram.py @@ -32,7 +32,7 @@ class SimpleHistogram(Generic[T]): NEGATIVE_INFINITY = -math.inf def __init__(self, buckets: List[Tuple[Bound, Bound]]): - from math_utils import RunningMedian + from math_utils import NumericPopulation self.buckets: Dict[Tuple[Bound, Bound], Count] = {} for start_end in buckets: @@ -40,7 +40,7 @@ class SimpleHistogram(Generic[T]): raise Exception("Buckets overlap?!") self.buckets[start_end] = 0 self.sigma: float = 0.0 - self.stats: RunningMedian = RunningMedian() + self.stats: NumericPopulation = NumericPopulation() self.maximum: Optional[T] = None self.minimum: Optional[T] = None self.count: Count = 0 diff --git a/math_utils.py b/math_utils.py index f77e0a1..188d323 100644 --- a/math_utils.py +++ b/math_utils.py @@ -5,59 +5,72 @@ import functools import math from heapq import heappop, heappush -from typing import List +from typing import List, Optional -class RunningMedian(object): +class NumericPopulation(object): """A running median computer. - >>> median = RunningMedian() - >>> median.add_number(1) - >>> median.add_number(10) - >>> median.add_number(3) - >>> median.get_median() + >>> pop = NumericPopulation() + >>> pop.add_number(1) + >>> pop.add_number(10) + >>> pop.add_number(3) + >>> pop.get_median() 3 - >>> median.add_number(7) - >>> median.add_number(5) - >>> median.get_median() + >>> pop.add_number(7) + >>> pop.add_number(5) + >>> pop.get_median() 5 - >>> median.get_mean() + >>> pop.get_mean() 5.2 - >>> round(median.get_stdev(), 2) + >>> round(pop.get_stdev(), 2) 6.99 + >>> pop.get_percentile(20) + 3 + >>> pop.get_percentile(60) + 7 """ def __init__(self): self.lowers, self.highers = [], [] self.aggregate = 0.0 + self.sorted_copy: Optional[List[float]] = None def add_number(self, number: float): + """O(2 log2 n)""" + if not self.highers or number > self.highers[0]: heappush(self.highers, number) else: heappush(self.lowers, -number) # for lowers we need a max heap self.aggregate += number - self.rebalance() + self._rebalance() - def rebalance(self): + def _rebalance(self): if len(self.lowers) - len(self.highers) > 1: heappush(self.highers, -heappop(self.lowers)) elif len(self.highers) - len(self.lowers) > 1: heappush(self.lowers, -heappop(self.highers)) def get_median(self) -> float: + """Returns the approximate median (p50) so far in O(1) time.""" + if len(self.lowers) == len(self.highers): - return (-self.lowers[0] + self.highers[0]) / 2 + return -self.lowers[0] elif len(self.lowers) > len(self.highers): return -self.lowers[0] else: return self.highers[0] def get_mean(self) -> float: + """Returns the mean (arithmetic mean) so far in O(1) time.""" + count = len(self.lowers) + len(self.highers) return self.aggregate / count def get_stdev(self) -> float: + """Returns the stdev so far in O(n) time.""" + mean = self.get_mean() variance = 0.0 for n in self.lowers: @@ -67,6 +80,29 @@ class RunningMedian(object): variance += (n - mean) ** 2 return math.sqrt(variance) + def get_percentile(self, n: float) -> float: + """Returns the number at approximately pn% (i.e. the nth percentile) + of the distribution in O(n log n) time (expensive, requires a + complete sort). Not thread safe. Caching does across + multiple calls without an invocation to add_number. + + """ + if n == 50: + return self.get_median() + count = len(self.lowers) + len(self.highers) + if self.sorted_copy is not None: + if count == len(self.sorted_copy): + index = round(count * (n / 100.0)) + assert 0 <= index < count + return self.sorted_copy[index] + self.sorted_copy = [-x for x in self.lowers] + for x in self.highers: + self.sorted_copy.append(x) + self.sorted_copy = sorted(self.sorted_copy) + index = round(count * (n / 100.0)) + assert 0 <= index < count + return self.sorted_copy[index] + def gcd_floats(a: float, b: float) -> float: if a < b: -- 2.45.2