self.lowers, self.highers = [], []
self.aggregate = 0.0
self.sorted_copy: Optional[List[float]] = None
+ self.maximum = None
+ self.minimum = None
def add_number(self, number: float):
"""Adds a number to the population. Runtime complexity of this
heappush(self.lowers, -number) # for lowers we need a max heap
self.aggregate += number
self._rebalance()
+ if not self.maximum or number > self.maximum:
+ self.maximum = number
+ if not self.minimum or number < self.minimum:
+ self.minimum = number
def _rebalance(self):
if len(self.lowers) - len(self.highers) > 1:
variance += (n - mean) ** 2
for n in self.highers:
variance += (n - mean) ** 2
- count = len(self.lowers) + len(self.highers) - 1
+ count = len(self.lowers) + len(self.highers)
return math.sqrt(variance) / count
+ def _create_sorted_copy_if_needed(self, count: int):
+ if not self.sorted_copy or count != len(self.sorted_copy):
+ self.sorted_copy = []
+ for x in self.lowers:
+ self.sorted_copy.append(-x)
+ for x in self.highers:
+ self.sorted_copy.append(x)
+ self.sorted_copy = sorted(self.sorted_copy)
+
def get_percentile(self, n: float) -> float:
"""Returns the number at approximately pn% (i.e. the nth percentile)
of the distribution in O(n log n) time. Not thread-safe;
if n == 50:
return self.get_median()
count = len(self.lowers) + len(self.highers)
- if self.sorted_copy is not None:
- if count == len(self.sorted_copy):
- index = round(count * (n / 100.0))
- assert 0 <= index < count
- return self.sorted_copy[index]
- self.sorted_copy = [-x for x in self.lowers]
- for x in self.highers:
- self.sorted_copy.append(x)
- self.sorted_copy = sorted(self.sorted_copy)
+ self._create_sorted_copy_if_needed(count)
+ assert self.sorted_copy
index = round(count * (n / 100.0))
- assert 0 <= index < count
+ index = max(0, index)
+ index = min(count - 1, index)
return self.sorted_copy[index]