From: Scott Gasch Date: Mon, 4 Jul 2022 07:33:26 +0000 (-0700) Subject: Add inital cut of iter_utils with a coupel of iters I needed. Tweak X-Git-Url: https://wannabe.guru.org/gitweb/?a=commitdiff_plain;h=72d255e75695973843f05f5bd60fb415ff782ffe;p=python_utils.git Add inital cut of iter_utils with a coupel of iters I needed. Tweak math_utils population. --- diff --git a/iter_utils.py b/iter_utils.py new file mode 100644 index 0000000..897493d --- /dev/null +++ b/iter_utils.py @@ -0,0 +1,119 @@ +#!/usr/bin/env python3 + +import random +from collections.abc import Iterator +from typing import Any, List, Optional + + +class PeekingIterator(Iterator): + """An iterator that lets you peek at the next item on deck. + + >>> p = PeekingIterator(iter(range(3))) + >>> p.__next__() + 0 + >>> p.peek() + 1 + >>> p.peek() + 1 + >>> p.__next__() + 1 + >>> p.__next__() + 2 + >>> p.peek() == None + True + >>> p.__next__() + Traceback (most recent call last): + ... + StopIteration + """ + + def __init__(self, source_iter: Iterator): + self.source_iter = source_iter + self.on_deck: List[Any] = [] + + def __iter__(self) -> Iterator: + return self + + def __next__(self) -> Any: + if len(self.on_deck) > 0: + return self.on_deck.pop() + else: + item = self.source_iter.__next__() + return item + + def peek(self) -> Optional[Any]: + if len(self.on_deck) > 0: + return self.on_deck[0] + try: + item = self.source_iter.__next__() + self.on_deck.append(item) + return self.peek() + except StopIteration: + return None + + +class SamplingIterator(Iterator): + """An iterator that simply echoes what source_iter produces but also + collects a random sample (of size sample_size) of the stream that can + be queried via get_random_sample() at any time. + + >>> import collections + >>> import random + + >>> random.seed(22) + >>> s = SamplingIterator(iter(range(100)), 10) + >>> s.__next__() + 0 + + >>> s.__next__() + 1 + + >>> s.resovoir + [0, 1] + + >>> collections.deque(s) + deque([2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99]) + + >>> s() + [78, 18, 47, 83, 93, 26, 25, 73, 94, 38] + + """ + + def __init__(self, source_iter: Iterator, sample_size: int): + self.source_iter = source_iter + self.sample_size = sample_size + self.resovoir: List[Any] = [] + self.stream_length_so_far = 0 + + def __iter__(self) -> Iterator: + return self + + def __next__(self) -> Any: + item = self.source_iter.__next__() # or raise + self.stream_length_so_far += 1 + + # Filling the resovoir + pop = len(self.resovoir) + if pop < self.sample_size: + self.resovoir.append(item) + if self.sample_size == (pop + 1): # just finished filling... + random.shuffle(self.resovoir) + + # Swap this item for one in the resovoir with probabilty + # sample_size / stream_length_so_far. See: + # + # https://en.wikipedia.org/wiki/Reservoir_sampling + else: + r = random.randint(0, self.stream_length_so_far) + if r < self.sample_size: + self.resovoir[r] = item + return item + + def __call__(self) -> List[Any]: + return self.resovoir + + +if __name__ == '__main__': + import doctest + + doctest.testmod() diff --git a/math_utils.py b/math_utils.py index 64cd9fb..ed9c2f4 100644 --- a/math_utils.py +++ b/math_utils.py @@ -41,6 +41,8 @@ class NumericPopulation(object): self.lowers, self.highers = [], [] self.aggregate = 0.0 self.sorted_copy: Optional[List[float]] = None + self.maximum = None + self.minimum = None def add_number(self, number: float): """Adds a number to the population. Runtime complexity of this @@ -52,6 +54,10 @@ class NumericPopulation(object): heappush(self.lowers, -number) # for lowers we need a max heap self.aggregate += number self._rebalance() + if not self.maximum or number > self.maximum: + self.maximum = number + if not self.minimum or number < self.minimum: + self.minimum = number def _rebalance(self): if len(self.lowers) - len(self.highers) > 1: