More cleanup, yey!
[python_utils.git] / histogram.py
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3
4 """A text-based simple histogram helper class."""
5
6 import math
7 from typing import Dict, Generic, Iterable, List, Optional, Tuple, TypeVar
8
9 T = TypeVar("T", int, float)
10 Bound = int
11 Count = int
12
13
14 class SimpleHistogram(Generic[T]):
15     """A simple histogram."""
16
17     # Useful in defining wide open bottom/top bucket bounds:
18     POSITIVE_INFINITY = math.inf
19     NEGATIVE_INFINITY = -math.inf
20
21     def __init__(self, buckets: List[Tuple[Bound, Bound]]):
22         from math_utils import RunningMedian
23
24         self.buckets: Dict[Tuple[Bound, Bound], Count] = {}
25         for start_end in buckets:
26             if self._get_bucket(start_end[0]) is not None:
27                 raise Exception("Buckets overlap?!")
28             self.buckets[start_end] = 0
29         self.sigma: float = 0.0
30         self.median: RunningMedian = RunningMedian()
31         self.maximum: Optional[T] = None
32         self.minimum: Optional[T] = None
33         self.count: Count = 0
34
35     @staticmethod
36     def n_evenly_spaced_buckets(
37         min_bound: T,
38         max_bound: T,
39         n: int,
40     ) -> List[Tuple[int, int]]:
41         ret: List[Tuple[int, int]] = []
42         stride = int((max_bound - min_bound) / n)
43         if stride <= 0:
44             raise Exception("Min must be < Max")
45         imax = math.ceil(max_bound)
46         imin = math.floor(min_bound)
47         for bucket_start in range(imin, imax, stride):
48             ret.append((bucket_start, bucket_start + stride))
49         return ret
50
51     def _get_bucket(self, item: T) -> Optional[Tuple[int, int]]:
52         for start_end in self.buckets:
53             if start_end[0] <= item < start_end[1]:
54                 return start_end
55         return None
56
57     def add_item(self, item: T) -> bool:
58         bucket = self._get_bucket(item)
59         if bucket is None:
60             return False
61         self.count += 1
62         self.buckets[bucket] += 1
63         self.sigma += item
64         self.median.add_number(item)
65         if self.maximum is None or item > self.maximum:
66             self.maximum = item
67         if self.minimum is None or item < self.minimum:
68             self.minimum = item
69         return True
70
71     def add_items(self, lst: Iterable[T]) -> bool:
72         all_true = True
73         for item in lst:
74             all_true = all_true and self.add_item(item)
75         return all_true
76
77     def __repr__(self, *, width: int = 80, label_formatter: str = '%d') -> str:
78         from text_utils import bar_graph
79
80         txt = ""
81         max_population: Optional[int] = None
82         for bucket in self.buckets:
83             pop = self.buckets[bucket]
84             if pop > 0:
85                 last_bucket_start = bucket[0]  # beginning of range
86             if max_population is None or pop > max_population:
87                 max_population = pop  # bucket with max items
88         if max_population is None:
89             return txt
90
91         max_label_width: Optional[int] = None
92         lowest_start: Optional[int] = None
93         highest_end: Optional[int] = None
94         for bucket in sorted(self.buckets, key=lambda x: x[0]):
95             start = bucket[0]
96             if lowest_start is None:
97                 lowest_start = start
98             end = bucket[1]
99             if highest_end is None or end > highest_end:
100                 highest_end = end
101             label = f'[{label_formatter}..{label_formatter}): ' % (start, end)
102             label_width = len(label)
103             if max_label_width is None or label_width > max_label_width:
104                 max_label_width = label_width
105             if start == last_bucket_start:
106                 break
107         assert max_label_width is not None
108         assert lowest_start is not None
109         assert highest_end is not None
110
111         sigma_label = f'[{label_formatter}..{label_formatter}): ' % (
112             lowest_start,
113             highest_end,
114         )
115         if len(sigma_label) > max_label_width:
116             max_label_width = len(sigma_label)
117         bar_width = width - (max_label_width + 16)
118
119         for bucket in sorted(self.buckets, key=lambda x: x[0]):
120             start = bucket[0]
121             end = bucket[1]
122             label = f'[{label_formatter}..{label_formatter}): ' % (start, end)
123             pop = self.buckets[bucket]
124             bar = bar_graph(
125                 (pop / max_population),
126                 include_text=False,
127                 width=bar_width,
128                 left_end="",
129                 right_end="",
130             )
131             txt += label.rjust(max_label_width)
132             txt += bar
133             txt += f"({pop/self.count*100.0:5.2f}% n={pop})\n"
134             if start == last_bucket_start:
135                 break
136         txt += '-' * width + '\n'
137         txt += sigma_label.rjust(max_label_width)
138         txt += ' ' * (bar_width - 2)
139         txt += f'Σ=(100.00% n={self.count})\n'
140         return txt