Add percentile and change name of RunningMedian class.
[python_utils.git] / math_utils.py
index f77e0a1f2514f5a0201c81f57e9740d36d2ede47..188d3234986c5f6dd5b0474512402c9adc98cbf1 100644 (file)
@@ -5,59 +5,72 @@
 import functools
 import math
 from heapq import heappop, heappush
-from typing import List
+from typing import List, Optional
 
 
-class RunningMedian(object):
+class NumericPopulation(object):
     """A running median computer.
 
-    >>> median = RunningMedian()
-    >>> median.add_number(1)
-    >>> median.add_number(10)
-    >>> median.add_number(3)
-    >>> median.get_median()
+    >>> pop = NumericPopulation()
+    >>> pop.add_number(1)
+    >>> pop.add_number(10)
+    >>> pop.add_number(3)
+    >>> pop.get_median()
     3
-    >>> median.add_number(7)
-    >>> median.add_number(5)
-    >>> median.get_median()
+    >>> pop.add_number(7)
+    >>> pop.add_number(5)
+    >>> pop.get_median()
     5
-    >>> median.get_mean()
+    >>> pop.get_mean()
     5.2
-    >>> round(median.get_stdev(), 2)
+    >>> round(pop.get_stdev(), 2)
     6.99
+    >>> pop.get_percentile(20)
+    3
+    >>> pop.get_percentile(60)
+    7
     """
 
     def __init__(self):
         self.lowers, self.highers = [], []
         self.aggregate = 0.0
+        self.sorted_copy: Optional[List[float]] = None
 
     def add_number(self, number: float):
+        """O(2 log2 n)"""
+
         if not self.highers or number > self.highers[0]:
             heappush(self.highers, number)
         else:
             heappush(self.lowers, -number)  # for lowers we need a max heap
         self.aggregate += number
-        self.rebalance()
+        self._rebalance()
 
-    def rebalance(self):
+    def _rebalance(self):
         if len(self.lowers) - len(self.highers) > 1:
             heappush(self.highers, -heappop(self.lowers))
         elif len(self.highers) - len(self.lowers) > 1:
             heappush(self.lowers, -heappop(self.highers))
 
     def get_median(self) -> float:
+        """Returns the approximate median (p50) so far in O(1) time."""
+
         if len(self.lowers) == len(self.highers):
-            return (-self.lowers[0] + self.highers[0]) / 2
+            return -self.lowers[0]
         elif len(self.lowers) > len(self.highers):
             return -self.lowers[0]
         else:
             return self.highers[0]
 
     def get_mean(self) -> float:
+        """Returns the mean (arithmetic mean) so far in O(1) time."""
+
         count = len(self.lowers) + len(self.highers)
         return self.aggregate / count
 
     def get_stdev(self) -> float:
+        """Returns the stdev so far in O(n) time."""
+
         mean = self.get_mean()
         variance = 0.0
         for n in self.lowers:
@@ -67,6 +80,29 @@ class RunningMedian(object):
             variance += (n - mean) ** 2
         return math.sqrt(variance)
 
+    def get_percentile(self, n: float) -> float:
+        """Returns the number at approximately pn% (i.e. the nth percentile)
+        of the distribution in O(n log n) time (expensive, requires a
+        complete sort).  Not thread safe.  Caching does across
+        multiple calls without an invocation to add_number.
+
+        """
+        if n == 50:
+            return self.get_median()
+        count = len(self.lowers) + len(self.highers)
+        if self.sorted_copy is not None:
+            if count == len(self.sorted_copy):
+                index = round(count * (n / 100.0))
+                assert 0 <= index < count
+                return self.sorted_copy[index]
+        self.sorted_copy = [-x for x in self.lowers]
+        for x in self.highers:
+            self.sorted_copy.append(x)
+        self.sorted_copy = sorted(self.sorted_copy)
+        index = round(count * (n / 100.0))
+        assert 0 <= index < count
+        return self.sorted_copy[index]
+
 
 def gcd_floats(a: float, b: float) -> float:
     if a < b: