Better logging + cleanup.
[python_utils.git] / histogram.py
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3
4 # © Copyright 2021-2022, Scott Gasch
5
6 """A text-based simple histogram helper class."""
7
8 import math
9 from dataclasses import dataclass
10 from typing import Dict, Generic, Iterable, List, Optional, Tuple, TypeVar
11
12 T = TypeVar("T", int, float)
13 Bound = int
14 Count = int
15
16
17 @dataclass
18 class BucketDetails:
19     """A collection of details about the internal histogram buckets."""
20
21     num_populated_buckets: int = 0
22     max_population: Optional[int] = None
23     last_bucket_start: Optional[int] = None
24     lowest_start: Optional[int] = None
25     highest_end: Optional[int] = None
26     max_label_width: Optional[int] = None
27
28
29 class SimpleHistogram(Generic[T]):
30     """A simple histogram."""
31
32     # Useful in defining wide open bottom/top bucket bounds:
33     POSITIVE_INFINITY = math.inf
34     NEGATIVE_INFINITY = -math.inf
35
36     def __init__(self, buckets: List[Tuple[Bound, Bound]]):
37         from math_utils import NumericPopulation
38
39         self.buckets: Dict[Tuple[Bound, Bound], Count] = {}
40         for start_end in buckets:
41             if self._get_bucket(start_end[0]) is not None:
42                 raise Exception("Buckets overlap?!")
43             self.buckets[start_end] = 0
44         self.sigma: float = 0.0
45         self.stats: NumericPopulation = NumericPopulation()
46         self.maximum: Optional[T] = None
47         self.minimum: Optional[T] = None
48         self.count: Count = 0
49
50     @staticmethod
51     def n_evenly_spaced_buckets(
52         min_bound: T,
53         max_bound: T,
54         n: int,
55     ) -> List[Tuple[int, int]]:
56         ret: List[Tuple[int, int]] = []
57         stride = int((max_bound - min_bound) / n)
58         if stride <= 0:
59             raise Exception("Min must be < Max")
60         imax = math.ceil(max_bound)
61         imin = math.floor(min_bound)
62         for bucket_start in range(imin, imax, stride):
63             ret.append((bucket_start, bucket_start + stride))
64         return ret
65
66     def _get_bucket(self, item: T) -> Optional[Tuple[int, int]]:
67         for start_end in self.buckets:
68             if start_end[0] <= item < start_end[1]:
69                 return start_end
70         return None
71
72     def add_item(self, item: T) -> bool:
73         bucket = self._get_bucket(item)
74         if bucket is None:
75             return False
76         self.count += 1
77         self.buckets[bucket] += 1
78         self.sigma += item
79         self.stats.add_number(item)
80         if self.maximum is None or item > self.maximum:
81             self.maximum = item
82         if self.minimum is None or item < self.minimum:
83             self.minimum = item
84         return True
85
86     def add_items(self, lst: Iterable[T]) -> bool:
87         all_true = True
88         for item in lst:
89             all_true = all_true and self.add_item(item)
90         return all_true
91
92     def get_bucket_details(self, label_formatter: str) -> BucketDetails:
93         details = BucketDetails()
94         for (start, end), pop in sorted(self.buckets.items(), key=lambda x: x[0]):
95             if pop > 0:
96                 details.num_populated_buckets += 1
97                 details.last_bucket_start = start
98                 if details.max_population is None or pop > details.max_population:
99                     details.max_population = pop
100                 if details.lowest_start is None or start < details.lowest_start:
101                     details.lowest_start = start
102                 if details.highest_end is None or end > details.highest_end:
103                     details.highest_end = end
104                 label = f'[{label_formatter}..{label_formatter}): ' % (start, end)
105                 label_width = len(label)
106                 if details.max_label_width is None or label_width > details.max_label_width:
107                     details.max_label_width = label_width
108         return details
109
110     def __repr__(self, *, width: int = 80, label_formatter: str = '%d') -> str:
111         from text_utils import bar_graph
112
113         details = self.get_bucket_details(label_formatter)
114         txt = ""
115         if details.num_populated_buckets == 0:
116             return txt
117         assert details.max_label_width is not None
118         assert details.lowest_start is not None
119         assert details.highest_end is not None
120         assert details.max_population is not None
121         sigma_label = f'[{label_formatter}..{label_formatter}): ' % (
122             details.lowest_start,
123             details.highest_end,
124         )
125         if len(sigma_label) > details.max_label_width:
126             details.max_label_width = len(sigma_label)
127         bar_width = width - (details.max_label_width + 17)
128
129         for (start, end), pop in sorted(self.buckets.items(), key=lambda x: x[0]):
130             if start < details.lowest_start:
131                 continue
132             label = f'[{label_formatter}..{label_formatter}): ' % (start, end)
133             bar = bar_graph(
134                 (pop / details.max_population),
135                 include_text=False,
136                 width=bar_width,
137                 left_end="",
138                 right_end="",
139             )
140             txt += label.rjust(details.max_label_width)
141             txt += bar
142             txt += f"({pop/self.count*100.0:5.2f}% n={pop})\n"
143             if start == details.last_bucket_start:
144                 break
145         txt += '-' * width + '\n'
146         txt += sigma_label.rjust(details.max_label_width)
147         txt += ' ' * (bar_width - 2)
148         txt += f'     pop(Σn)={self.count}\n'
149         txt += ' ' * (bar_width + details.max_label_width - 2)
150         txt += f'     mean(x̄)={self.stats.get_mean():.3f}\n'
151         txt += ' ' * (bar_width + details.max_label_width - 2)
152         txt += f' median(p50)={self.stats.get_median():.3f}\n'
153         txt += ' ' * (bar_width + details.max_label_width - 2)
154         txt += f'    mode(Mo)={self.stats.get_mode()[0]:.3f}\n'
155         txt += ' ' * (bar_width + details.max_label_width - 2)
156         txt += f'    stdev(σ)={self.stats.get_stdev():.3f}\n'
157         txt += '\n'
158         return txt