More type annotations.
[python_utils.git] / histogram.py
1 #!/usr/bin/env python3
2
3 import math
4 from numbers import Number
5 from typing import Dict, Generic, Iterable, List, Optional, Tuple, TypeVar
6
7 T = TypeVar("T", int, float)
8 Bound = int
9 Count = int
10
11
12 class SimpleHistogram(Generic[T]):
13     # Useful in defining wide open bottom/top bucket bounds:
14     POSITIVE_INFINITY = math.inf
15     NEGATIVE_INFINITY = -math.inf
16
17     def __init__(self, buckets: List[Tuple[Bound, Bound]]):
18         from math_utils import RunningMedian
19
20         self.buckets: Dict[Tuple[Bound, Bound], Count] = {}
21         for start_end in buckets:
22             if self._get_bucket(start_end[0]) is not None:
23                 raise Exception("Buckets overlap?!")
24             self.buckets[start_end] = 0
25         self.sigma: float = 0.0
26         self.median: RunningMedian = RunningMedian()
27         self.maximum: Optional[T] = None
28         self.minimum: Optional[T] = None
29         self.count: Count = 0
30
31     @staticmethod
32     def n_evenly_spaced_buckets(
33         min_bound: T,
34         max_bound: T,
35         n: int,
36     ) -> List[Tuple[int, int]]:
37         ret: List[Tuple[int, int]] = []
38         stride = int((max_bound - min_bound) / n)
39         if stride <= 0:
40             raise Exception("Min must be < Max")
41         imax = math.ceil(max_bound)
42         imin = math.floor(min_bound)
43         for bucket_start in range(imin, imax, stride):
44             ret.append((bucket_start, bucket_start + stride))
45         return ret
46
47     def _get_bucket(self, item: T) -> Optional[Tuple[int, int]]:
48         for start_end in self.buckets:
49             if start_end[0] <= item < start_end[1]:
50                 return start_end
51         return None
52
53     def add_item(self, item: T) -> bool:
54         bucket = self._get_bucket(item)
55         if bucket is None:
56             return False
57         self.count += 1
58         self.buckets[bucket] += 1
59         self.sigma += item
60         self.median.add_number(item)
61         if self.maximum is None or item > self.maximum:
62             self.maximum = item
63         if self.minimum is None or item < self.minimum:
64             self.minimum = item
65         return True
66
67     def add_items(self, lst: Iterable[T]) -> bool:
68         all_true = True
69         for item in lst:
70             all_true = all_true and self.add_item(item)
71         return all_true
72
73     def __repr__(self, *, width: int = 80, label_formatter: str = '%d') -> str:
74         from text_utils import bar_graph
75
76         txt = ""
77         max_population: Optional[int] = None
78         for bucket in self.buckets:
79             pop = self.buckets[bucket]
80             if pop > 0:
81                 last_bucket_start = bucket[0]  # beginning of range
82             if max_population is None or pop > max_population:
83                 max_population = pop  # bucket with max items
84         if max_population is None:
85             return txt
86
87         max_label_width: Optional[int] = None
88         lowest_start: Optional[int] = None
89         highest_end: Optional[int] = None
90         for bucket in sorted(self.buckets, key=lambda x: x[0]):
91             start = bucket[0]
92             if lowest_start is None:
93                 lowest_start = start
94             end = bucket[1]
95             if highest_end is None or end > highest_end:
96                 highest_end = end
97             label = f'[{label_formatter}..{label_formatter}): ' % (start, end)
98             label_width = len(label)
99             if max_label_width is None or label_width > max_label_width:
100                 max_label_width = label_width
101             if start == last_bucket_start:
102                 break
103         assert max_label_width
104         assert lowest_start
105         assert highest_end
106
107         sigma_label = f'[{label_formatter}..{label_formatter}): ' % (
108             lowest_start,
109             highest_end,
110         )
111         if len(sigma_label) > max_label_width:
112             max_label_width = len(sigma_label)
113         bar_width = width - (max_label_width + 16)
114
115         for bucket in sorted(self.buckets, key=lambda x: x[0]):
116             start = bucket[0]
117             end = bucket[1]
118             label = f'[{label_formatter}..{label_formatter}): ' % (start, end)
119             pop = self.buckets[bucket]
120             bar = bar_graph(
121                 (pop / max_population),
122                 include_text=False,
123                 width=bar_width,
124                 left_end="",
125                 right_end="",
126             )
127             txt += label.rjust(max_label_width)
128             txt += bar
129             txt += f"({pop/self.count*100.0:5.2f}% n={pop})\n"
130             if start == last_bucket_start:
131                 break
132         txt += '-' * width + '\n'
133         txt += sigma_label.rjust(max_label_width)
134         txt += ' ' * (bar_width - 2)
135         txt += f'Σ=(100.00% n={self.count})\n'
136         return txt