Easier and more self documenting patterns for loading/saving Persistent
[python_utils.git] / histogram.py
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3
4 # © Copyright 2021-2022, Scott Gasch
5
6 """A text-based simple histogram helper class."""
7
8 import math
9 from dataclasses import dataclass
10 from typing import Dict, Generic, Iterable, List, Optional, Tuple, TypeVar
11
12 T = TypeVar("T", int, float)
13 Bound = int
14 Count = int
15
16
17 @dataclass
18 class BucketDetails:
19     """A collection of details about the internal histogram buckets."""
20
21     num_populated_buckets: int = 0
22     """Count of populated buckets"""
23
24     max_population: Optional[int] = None
25     """The max population in a bucket currently"""
26
27     last_bucket_start: Optional[int] = None
28     """The last bucket starting point"""
29
30     lowest_start: Optional[int] = None
31     """The lowest populated bucket's starting point"""
32
33     highest_end: Optional[int] = None
34     """The highest populated bucket's ending point"""
35
36     max_label_width: Optional[int] = None
37     """The maximum label width (for display purposes)"""
38
39
40 class SimpleHistogram(Generic[T]):
41     """A simple histogram."""
42
43     # Useful in defining wide open bottom/top bucket bounds:
44     POSITIVE_INFINITY = math.inf
45     NEGATIVE_INFINITY = -math.inf
46
47     def __init__(self, buckets: List[Tuple[Bound, Bound]]):
48         """C'tor.
49
50         Args:
51             buckets: a list of [start..end] tuples that define the
52                 buckets we are counting population in.  See also
53                 :meth:`n_evenly_spaced_buckets` to generate these
54                 buckets more easily.
55         """
56         from math_utils import NumericPopulation
57
58         self.buckets: Dict[Tuple[Bound, Bound], Count] = {}
59         for start_end in buckets:
60             if self._get_bucket(start_end[0]) is not None:
61                 raise Exception("Buckets overlap?!")
62             self.buckets[start_end] = 0
63         self.sigma: float = 0.0
64         self.stats: NumericPopulation = NumericPopulation()
65         self.maximum: Optional[T] = None
66         self.minimum: Optional[T] = None
67         self.count: Count = 0
68
69     @staticmethod
70     def n_evenly_spaced_buckets(
71         min_bound: T,
72         max_bound: T,
73         n: int,
74     ) -> List[Tuple[int, int]]:
75         """A helper method for generating the buckets argument to
76         our c'tor provided that you want N evenly spaced buckets.
77
78         Args:
79             min_bound: the minimum possible value
80             max_bound: the maximum possible value
81             n: how many buckets to create
82
83         Returns:
84             A list of bounds that define N evenly spaced buckets
85         """
86         ret: List[Tuple[int, int]] = []
87         stride = int((max_bound - min_bound) / n)
88         if stride <= 0:
89             raise Exception("Min must be < Max")
90         imax = math.ceil(max_bound)
91         imin = math.floor(min_bound)
92         for bucket_start in range(imin, imax, stride):
93             ret.append((bucket_start, bucket_start + stride))
94         return ret
95
96     def _get_bucket(self, item: T) -> Optional[Tuple[int, int]]:
97         """Given an item, what bucket is it in?"""
98         for start_end in self.buckets:
99             if start_end[0] <= item < start_end[1]:
100                 return start_end
101         return None
102
103     def add_item(self, item: T) -> bool:
104         """Adds a single item to the histogram (reculting in us incrementing
105         the population in the correct bucket.
106
107         Args:
108             item: the item to be added
109
110         Returns:
111             True if the item was successfully added or False if the item
112             is not within the bounds established during class construction.
113         """
114         bucket = self._get_bucket(item)
115         if bucket is None:
116             return False
117         self.count += 1
118         self.buckets[bucket] += 1
119         self.sigma += item
120         self.stats.add_number(item)
121         if self.maximum is None or item > self.maximum:
122             self.maximum = item
123         if self.minimum is None or item < self.minimum:
124             self.minimum = item
125         return True
126
127     def add_items(self, lst: Iterable[T]) -> bool:
128         """Adds a collection of items to the histogram and increments
129         the correct bucket's population for each item.
130
131         Args:
132             lst: An iterable of items to be added
133
134         Returns:
135             True if all items were added successfully or False if any
136             item was not able to be added because it was not within the
137             bounds established at object construction.
138         """
139         all_true = True
140         for item in lst:
141             all_true = all_true and self.add_item(item)
142         return all_true
143
144     def _get_bucket_details(self, label_formatter: str) -> BucketDetails:
145         """Get the details about one bucket."""
146         details = BucketDetails()
147         for (start, end), pop in sorted(self.buckets.items(), key=lambda x: x[0]):
148             if pop > 0:
149                 details.num_populated_buckets += 1
150                 details.last_bucket_start = start
151                 if details.max_population is None or pop > details.max_population:
152                     details.max_population = pop
153                 if details.lowest_start is None or start < details.lowest_start:
154                     details.lowest_start = start
155                 if details.highest_end is None or end > details.highest_end:
156                     details.highest_end = end
157                 label = f'[{label_formatter}..{label_formatter}): ' % (start, end)
158                 label_width = len(label)
159                 if details.max_label_width is None or label_width > details.max_label_width:
160                     details.max_label_width = label_width
161         return details
162
163     def __repr__(self, *, width: int = 80, label_formatter: str = '%d') -> str:
164         """Returns a pretty (text) representation of the histogram and
165         some vital stats about the population in it (min, max, mean,
166         median, mode, stdev, etc...)
167         """
168         from text_utils import BarGraphText, bar_graph_string
169
170         details = self._get_bucket_details(label_formatter)
171         txt = ""
172         if details.num_populated_buckets == 0:
173             return txt
174         assert details.max_label_width is not None
175         assert details.lowest_start is not None
176         assert details.highest_end is not None
177         assert details.max_population is not None
178         sigma_label = f'[{label_formatter}..{label_formatter}): ' % (
179             details.lowest_start,
180             details.highest_end,
181         )
182         if len(sigma_label) > details.max_label_width:
183             details.max_label_width = len(sigma_label)
184         bar_width = width - (details.max_label_width + 17)
185
186         for (start, end), pop in sorted(self.buckets.items(), key=lambda x: x[0]):
187             if start < details.lowest_start:
188                 continue
189             label = f'[{label_formatter}..{label_formatter}): ' % (start, end)
190             bar = bar_graph_string(
191                 pop,
192                 details.max_population,
193                 text=BarGraphText.NONE,
194                 width=bar_width,
195                 left_end="",
196                 right_end="",
197             )
198             txt += label.rjust(details.max_label_width)
199             txt += bar
200             txt += f"({pop/self.count*100.0:5.2f}% n={pop})\n"
201             if start == details.last_bucket_start:
202                 break
203         txt += '-' * width + '\n'
204         txt += sigma_label.rjust(details.max_label_width)
205         txt += ' ' * (bar_width - 2)
206         txt += f'     pop(Σn)={self.count}\n'
207         txt += ' ' * (bar_width + details.max_label_width - 2)
208         txt += f'     mean(x̄)={self.stats.get_mean():.3f}\n'
209         txt += ' ' * (bar_width + details.max_label_width - 2)
210         txt += f' median(p50)={self.stats.get_median():.3f}\n'
211         txt += ' ' * (bar_width + details.max_label_width - 2)
212         txt += f'    mode(Mo)={self.stats.get_mode()[0]:.3f}\n'
213         txt += ' ' * (bar_width + details.max_label_width - 2)
214         txt += f'    stdev(σ)={self.stats.get_stdev():.3f}\n'
215         txt += '\n'
216         return txt