Add coding comments for files with utf8 characters in there.
[python_utils.git] / histogram.py
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3
4 import math
5 from numbers import Number
6 from typing import Dict, Generic, Iterable, List, Optional, Tuple, TypeVar
7
8 T = TypeVar("T", int, float)
9 Bound = int
10 Count = int
11
12
13 class SimpleHistogram(Generic[T]):
14     # Useful in defining wide open bottom/top bucket bounds:
15     POSITIVE_INFINITY = math.inf
16     NEGATIVE_INFINITY = -math.inf
17
18     def __init__(self, buckets: List[Tuple[Bound, Bound]]):
19         from math_utils import RunningMedian
20
21         self.buckets: Dict[Tuple[Bound, Bound], Count] = {}
22         for start_end in buckets:
23             if self._get_bucket(start_end[0]) is not None:
24                 raise Exception("Buckets overlap?!")
25             self.buckets[start_end] = 0
26         self.sigma: float = 0.0
27         self.median: RunningMedian = RunningMedian()
28         self.maximum: Optional[T] = None
29         self.minimum: Optional[T] = None
30         self.count: Count = 0
31
32     @staticmethod
33     def n_evenly_spaced_buckets(
34         min_bound: T,
35         max_bound: T,
36         n: int,
37     ) -> List[Tuple[int, int]]:
38         ret: List[Tuple[int, int]] = []
39         stride = int((max_bound - min_bound) / n)
40         if stride <= 0:
41             raise Exception("Min must be < Max")
42         imax = math.ceil(max_bound)
43         imin = math.floor(min_bound)
44         for bucket_start in range(imin, imax, stride):
45             ret.append((bucket_start, bucket_start + stride))
46         return ret
47
48     def _get_bucket(self, item: T) -> Optional[Tuple[int, int]]:
49         for start_end in self.buckets:
50             if start_end[0] <= item < start_end[1]:
51                 return start_end
52         return None
53
54     def add_item(self, item: T) -> bool:
55         bucket = self._get_bucket(item)
56         if bucket is None:
57             return False
58         self.count += 1
59         self.buckets[bucket] += 1
60         self.sigma += item
61         self.median.add_number(item)
62         if self.maximum is None or item > self.maximum:
63             self.maximum = item
64         if self.minimum is None or item < self.minimum:
65             self.minimum = item
66         return True
67
68     def add_items(self, lst: Iterable[T]) -> bool:
69         all_true = True
70         for item in lst:
71             all_true = all_true and self.add_item(item)
72         return all_true
73
74     def __repr__(self, *, width: int = 80, label_formatter: str = '%d') -> str:
75         from text_utils import bar_graph
76
77         txt = ""
78         max_population: Optional[int] = None
79         for bucket in self.buckets:
80             pop = self.buckets[bucket]
81             if pop > 0:
82                 last_bucket_start = bucket[0]  # beginning of range
83             if max_population is None or pop > max_population:
84                 max_population = pop  # bucket with max items
85         if max_population is None:
86             return txt
87
88         max_label_width: Optional[int] = None
89         lowest_start: Optional[int] = None
90         highest_end: Optional[int] = None
91         for bucket in sorted(self.buckets, key=lambda x: x[0]):
92             start = bucket[0]
93             if lowest_start is None:
94                 lowest_start = start
95             end = bucket[1]
96             if highest_end is None or end > highest_end:
97                 highest_end = end
98             label = f'[{label_formatter}..{label_formatter}): ' % (start, end)
99             label_width = len(label)
100             if max_label_width is None or label_width > max_label_width:
101                 max_label_width = label_width
102             if start == last_bucket_start:
103                 break
104         assert max_label_width is not None
105         assert lowest_start is not None
106         assert highest_end is not None
107
108         sigma_label = f'[{label_formatter}..{label_formatter}): ' % (
109             lowest_start,
110             highest_end,
111         )
112         if len(sigma_label) > max_label_width:
113             max_label_width = len(sigma_label)
114         bar_width = width - (max_label_width + 16)
115
116         for bucket in sorted(self.buckets, key=lambda x: x[0]):
117             start = bucket[0]
118             end = bucket[1]
119             label = f'[{label_formatter}..{label_formatter}): ' % (start, end)
120             pop = self.buckets[bucket]
121             bar = bar_graph(
122                 (pop / max_population),
123                 include_text=False,
124                 width=bar_width,
125                 left_end="",
126                 right_end="",
127             )
128             txt += label.rjust(max_label_width)
129             txt += bar
130             txt += f"({pop/self.count*100.0:5.2f}% n={pop})\n"
131             if start == last_bucket_start:
132                 break
133         txt += '-' * width + '\n'
134         txt += sigma_label.rjust(max_label_width)
135         txt += ' ' * (bar_width - 2)
136         txt += f'Σ=(100.00% n={self.count})\n'
137         return txt