NEGATIVE_INFINITY = -math.inf
def __init__(self, buckets: List[Tuple[Bound, Bound]]):
- from math_utils import RunningMedian
+ from math_utils import NumericPopulation
self.buckets: Dict[Tuple[Bound, Bound], Count] = {}
for start_end in buckets:
raise Exception("Buckets overlap?!")
self.buckets[start_end] = 0
self.sigma: float = 0.0
- self.stats: RunningMedian = RunningMedian()
+ self.stats: NumericPopulation = NumericPopulation()
self.maximum: Optional[T] = None
self.minimum: Optional[T] = None
self.count: Count = 0
import functools
import math
from heapq import heappop, heappush
-from typing import List
+from typing import List, Optional
-class RunningMedian(object):
+class NumericPopulation(object):
"""A running median computer.
- >>> median = RunningMedian()
- >>> median.add_number(1)
- >>> median.add_number(10)
- >>> median.add_number(3)
- >>> median.get_median()
+ >>> pop = NumericPopulation()
+ >>> pop.add_number(1)
+ >>> pop.add_number(10)
+ >>> pop.add_number(3)
+ >>> pop.get_median()
3
- >>> median.add_number(7)
- >>> median.add_number(5)
- >>> median.get_median()
+ >>> pop.add_number(7)
+ >>> pop.add_number(5)
+ >>> pop.get_median()
5
- >>> median.get_mean()
+ >>> pop.get_mean()
5.2
- >>> round(median.get_stdev(), 2)
+ >>> round(pop.get_stdev(), 2)
6.99
+ >>> pop.get_percentile(20)
+ 3
+ >>> pop.get_percentile(60)
+ 7
"""
def __init__(self):
self.lowers, self.highers = [], []
self.aggregate = 0.0
+ self.sorted_copy: Optional[List[float]] = None
def add_number(self, number: float):
+ """O(2 log2 n)"""
+
if not self.highers or number > self.highers[0]:
heappush(self.highers, number)
else:
heappush(self.lowers, -number) # for lowers we need a max heap
self.aggregate += number
- self.rebalance()
+ self._rebalance()
- def rebalance(self):
+ def _rebalance(self):
if len(self.lowers) - len(self.highers) > 1:
heappush(self.highers, -heappop(self.lowers))
elif len(self.highers) - len(self.lowers) > 1:
heappush(self.lowers, -heappop(self.highers))
def get_median(self) -> float:
+ """Returns the approximate median (p50) so far in O(1) time."""
+
if len(self.lowers) == len(self.highers):
- return (-self.lowers[0] + self.highers[0]) / 2
+ return -self.lowers[0]
elif len(self.lowers) > len(self.highers):
return -self.lowers[0]
else:
return self.highers[0]
def get_mean(self) -> float:
+ """Returns the mean (arithmetic mean) so far in O(1) time."""
+
count = len(self.lowers) + len(self.highers)
return self.aggregate / count
def get_stdev(self) -> float:
+ """Returns the stdev so far in O(n) time."""
+
mean = self.get_mean()
variance = 0.0
for n in self.lowers:
variance += (n - mean) ** 2
return math.sqrt(variance)
+ def get_percentile(self, n: float) -> float:
+ """Returns the number at approximately pn% (i.e. the nth percentile)
+ of the distribution in O(n log n) time (expensive, requires a
+ complete sort). Not thread safe. Caching does across
+ multiple calls without an invocation to add_number.
+
+ """
+ if n == 50:
+ return self.get_median()
+ count = len(self.lowers) + len(self.highers)
+ if self.sorted_copy is not None:
+ if count == len(self.sorted_copy):
+ index = round(count * (n / 100.0))
+ assert 0 <= index < count
+ return self.sorted_copy[index]
+ self.sorted_copy = [-x for x in self.lowers]
+ for x in self.highers:
+ self.sorted_copy.append(x)
+ self.sorted_copy = sorted(self.sorted_copy)
+ index = round(count * (n / 100.0))
+ assert 0 <= index < count
+ return self.sorted_copy[index]
+
def gcd_floats(a: float, b: float) -> float:
if a < b: