#!/usr/bin/env python3
+# © Copyright 2021-2022, Scott Gasch
+
+"""Mathematical helpers."""
+
+import collections
import functools
import math
-from typing import List
-from heapq import heappush, heappop
+from heapq import heappop, heappush
+from typing import Dict, List, Optional, Tuple
+
+import dict_utils
-class RunningMedian(object):
- """A running median computer.
+class NumericPopulation(object):
+ """A numeric population with some statistics such as median, mean, pN,
+ stdev, etc...
- >>> median = RunningMedian()
- >>> median.add_number(1)
- >>> median.add_number(10)
- >>> median.add_number(3)
- >>> median.get_median()
+ >>> pop = NumericPopulation()
+ >>> pop.add_number(1)
+ >>> pop.add_number(10)
+ >>> pop.add_number(3)
+ >>> pop.get_median()
3
- >>> median.add_number(7)
- >>> median.add_number(5)
- >>> median.get_median()
+ >>> pop.add_number(7)
+ >>> pop.add_number(5)
+ >>> pop.get_median()
5
+ >>> pop.get_mean()
+ 5.2
+ >>> round(pop.get_stdev(), 2)
+ 1.75
+ >>> pop.get_percentile(20)
+ 3
+ >>> pop.get_percentile(60)
+ 7
+
"""
def __init__(self):
self.lowers, self.highers = [], []
+ self.aggregate = 0.0
+ self.sorted_copy: Optional[List[float]] = None
+
+ def add_number(self, number: float):
+ """O(2 log2 n)"""
- def add_number(self, number):
if not self.highers or number > self.highers[0]:
heappush(self.highers, number)
else:
heappush(self.lowers, -number) # for lowers we need a max heap
- self.rebalance()
+ self.aggregate += number
+ self._rebalance()
- def rebalance(self):
+ def _rebalance(self):
if len(self.lowers) - len(self.highers) > 1:
heappush(self.highers, -heappop(self.lowers))
elif len(self.highers) - len(self.lowers) > 1:
heappush(self.lowers, -heappop(self.highers))
- def get_median(self):
+ def get_median(self) -> float:
+ """Returns the approximate median (p50) so far in O(1) time."""
+
if len(self.lowers) == len(self.highers):
- return (-self.lowers[0] + self.highers[0]) / 2
+ return -self.lowers[0]
elif len(self.lowers) > len(self.highers):
return -self.lowers[0]
else:
return self.highers[0]
+ def get_mean(self) -> float:
+ """Returns the mean (arithmetic mean) so far in O(1) time."""
+
+ count = len(self.lowers) + len(self.highers)
+ return self.aggregate / count
+
+ def get_mode(self) -> Tuple[float, int]:
+ """Returns the mode (most common member)."""
+
+ count: Dict[float, int] = collections.defaultdict(int)
+ for n in self.lowers:
+ count[-n] += 1
+ for n in self.highers:
+ count[n] += 1
+ return dict_utils.item_with_max_value(count)
+
+ def get_stdev(self) -> float:
+ """Returns the stdev so far in O(n) time."""
+
+ mean = self.get_mean()
+ variance = 0.0
+ for n in self.lowers:
+ n = -n
+ variance += (n - mean) ** 2
+ for n in self.highers:
+ variance += (n - mean) ** 2
+ count = len(self.lowers) + len(self.highers) - 1
+ return math.sqrt(variance) / count
+
+ def get_percentile(self, n: float) -> float:
+ """Returns the number at approximately pn% (i.e. the nth percentile)
+ of the distribution in O(n log n) time (expensive, requires a
+ complete sort). Not thread safe. Caching does across
+ multiple calls without an invocation to add_number.
+
+ """
+ if n == 50:
+ return self.get_median()
+ count = len(self.lowers) + len(self.highers)
+ if self.sorted_copy is not None:
+ if count == len(self.sorted_copy):
+ index = round(count * (n / 100.0))
+ assert 0 <= index < count
+ return self.sorted_copy[index]
+ self.sorted_copy = [-x for x in self.lowers]
+ for x in self.highers:
+ self.sorted_copy.append(x)
+ self.sorted_copy = sorted(self.sorted_copy)
+ index = round(count * (n / 100.0))
+ assert 0 <= index < count
+ return self.sorted_copy[index]
+
def gcd_floats(a: float, b: float) -> float:
if a < b:
3.141
"""
- assert decimals > 0 and decimals < 10
- multiplier = 10 ** decimals
+ assert 0 < decimals < 10
+ multiplier = 10**decimals
return int(n * multiplier) / multiplier