8ac6b10a6f9c258d382328959e58496b89cb327e
[pyutils.git] / src / pyutils / math_utils.py
1 #!/usr/bin/env python3
2
3 # © Copyright 2021-2022, Scott Gasch
4
5 """Helper utilities with a mathematical / statictical focus."""
6
7 import collections
8 import functools
9 import math
10 from heapq import heappop, heappush
11 from typing import Dict, List, Optional, Tuple
12
13 from pyutils import dict_utils
14
15
16 class NumericPopulation(object):
17     """This object *store* a numerical population in a way that enables relatively
18     fast addition of new numbers (:math:`O(2log_2 n)`) and instant access to the
19     median value in the population (:math:`O(1)`).  It also provides other population
20     summary statistics such as the :meth:`get_mode`, :meth:`get_percentile` and
21     :meth:`get_stdev`.
22
23     .. note::
24
25         Because this class stores a copy of all numbers added to it, it shouldn't
26         be used for very large populations.  Consider sampling.
27
28     >>> pop = NumericPopulation()
29     >>> pop.add_number(1)
30     >>> pop.add_number(10)
31     >>> pop.add_number(3)
32     >>> len(pop)
33     3
34     >>> pop.get_median()
35     3
36     >>> pop.add_number(7)
37     >>> pop.add_number(5)
38     >>> pop.get_median()
39     5
40     >>> pop.get_mean()
41     5.2
42     >>> round(pop.get_stdev(), 1)
43     3.1
44     >>> pop.get_percentile(20)
45     3
46     >>> pop.get_percentile(60)
47     7
48     """
49
50     def __init__(self):
51         self.lowers, self.highers = [], []
52         self.aggregate = 0.0
53         self.sorted_copy: Optional[List[float]] = None
54         self.maximum = None
55         self.minimum = None
56
57     def add_number(self, number: float):
58         """Adds a number to the population.  Runtime complexity of this
59         operation is :math:`O(2 log_2 n)`
60
61         Args:
62             number: the number to add_number to the population
63         """
64
65         if not self.highers or number > self.highers[0]:
66             heappush(self.highers, number)
67         else:
68             heappush(self.lowers, -number)  # for lowers we need a max heap
69         self.aggregate += number
70         self._rebalance()
71         if not self.maximum or number > self.maximum:
72             self.maximum = number
73         if not self.minimum or number < self.minimum:
74             self.minimum = number
75
76     def __len__(self):
77         """
78         Returns:
79             the population's current size.
80         """
81         n = 0
82         if self.highers:
83             n += len(self.highers)
84         if self.lowers:
85             n += len(self.lowers)
86         return n
87
88     def _rebalance(self):
89         """Internal helper for rebalancing the `lowers` and `highers` heaps"""
90         if len(self.lowers) - len(self.highers) > 1:
91             heappush(self.highers, -heappop(self.lowers))
92         elif len(self.highers) - len(self.lowers) > 1:
93             heappush(self.lowers, -heappop(self.highers))
94
95     def get_median(self) -> float:
96         """
97         Returns:
98             The median (p50) of the current population in :math:`O(1)` time.
99         """
100         if len(self.lowers) == len(self.highers):
101             return -self.lowers[0]
102         elif len(self.lowers) > len(self.highers):
103             return -self.lowers[0]
104         else:
105             return self.highers[0]
106
107     def get_mean(self) -> float:
108         """
109         Returns:
110             The mean (arithmetic mean) so far in :math:`O(1)` time.
111         """
112         count = len(self)
113         return self.aggregate / count
114
115     def get_mode(self) -> Tuple[float, int]:
116         """
117         Returns:
118             The population mode (most common member in the population)
119             in :math:`O(n)` time.
120         """
121         count: Dict[float, int] = collections.defaultdict(int)
122         for n in self.lowers:
123             count[-n] += 1
124         for n in self.highers:
125             count[n] += 1
126         return dict_utils.item_with_max_value(count)
127
128     def get_stdev(self) -> float:
129         """
130         Returns:
131             The stdev of the current population in :math:`O(n)` time.
132         """
133         mean = self.get_mean()
134         variance = 0.0
135         for n in self.lowers:
136             n = -n
137             variance += (n - mean) ** 2
138         for n in self.highers:
139             variance += (n - mean) ** 2
140         count = len(self.lowers) + len(self.highers)
141         return math.sqrt(variance / count)
142
143     def _create_sorted_copy_if_needed(self, count: int):
144         """Internal helper."""
145         if not self.sorted_copy or count != len(self.sorted_copy):
146             self.sorted_copy = []
147             for x in self.lowers:
148                 self.sorted_copy.append(-x)
149             for x in self.highers:
150                 self.sorted_copy.append(x)
151             self.sorted_copy = sorted(self.sorted_copy)
152
153     def get_percentile(self, n: float) -> float:
154         """
155         Returns: the number at approximately pn% in the population
156         (i.e. the nth percentile) in :math:`O(n log_2 n)` time (it
157         performs a full sort).  This is not the most efficient
158         algorithm.
159
160         Not thread-safe; does caching across multiple calls without
161         an invocation to :meth:`add_number` for perf reasons.
162
163         Args:
164             n: the percentile to compute
165         """
166         if n == 50:
167             return self.get_median()
168         count = len(self)
169         self._create_sorted_copy_if_needed(count)
170         assert self.sorted_copy
171         index = round(count * (n / 100.0))
172         index = max(0, index)
173         index = min(count - 1, index)
174         return self.sorted_copy[index]
175
176
177 def gcd_floats(a: float, b: float) -> float:
178     """
179     Returns:
180         The greatest common divisor of a and b.
181
182     Args:
183         a: first operand
184         b: second operatnd
185     """
186     if a < b:
187         return gcd_floats(b, a)
188
189     # base case
190     if abs(b) < 0.001:
191         return a
192     return gcd_floats(b, a - math.floor(a / b) * b)
193
194
195 def gcd_float_sequence(lst: List[float]) -> float:
196     """
197     Returns:
198         The greatest common divisor of a list of floats.
199
200     Args:
201         lst: a list of operands
202     """
203     if len(lst) <= 0:
204         raise ValueError("Need at least one number")
205     elif len(lst) == 1:
206         return lst[0]
207     assert len(lst) >= 2
208     gcd = gcd_floats(lst[0], lst[1])
209     for i in range(2, len(lst)):
210         gcd = gcd_floats(gcd, lst[i])
211     return gcd
212
213
214 def truncate_float(n: float, decimals: int = 2):
215     """
216     Returns:
217         A truncated float to a particular number of decimals.
218
219     Args:
220         n: the float to truncate
221
222     >>> truncate_float(3.1415927, 3)
223     3.141
224     """
225     assert 0 < decimals < 10
226     multiplier = 10**decimals
227     return int(n * multiplier) / multiplier
228
229
230 def percentage_to_multiplier(percent: float) -> float:
231     """Given a percentage that represents a return or percent change
232     (e.g. 155%), determine the factor (i.e.  multiplier) needed to
233     scale a number by that percentage (e.g. 2.55x)
234
235     Args:
236         percent: the return percent to scale by
237
238     >>> percentage_to_multiplier(155)
239     2.55
240     >>> percentage_to_multiplier(45)
241     1.45
242     >>> percentage_to_multiplier(-25)
243     0.75
244
245     """
246     multiplier = percent / 100
247     multiplier += 1.0
248     return multiplier
249
250
251 def multiplier_to_percent(multiplier: float) -> float:
252     """Convert a multiplicative factor into a percent change or return
253     percentage.
254
255     Args:
256         multiplier: the multiplier for which to compute the percent change
257
258     >>> multiplier_to_percent(0.75)
259     -25.0
260     >>> multiplier_to_percent(1.0)
261     0.0
262     >>> multiplier_to_percent(1.99)
263     99.0
264     """
265     percent = multiplier
266     if percent > 0.0:
267         percent -= 1.0
268     else:
269         percent = 1.0 - percent
270     percent *= 100.0
271     return percent
272
273
274 @functools.lru_cache(maxsize=1024, typed=True)
275 def is_prime(n: int) -> bool:
276     """
277     Args:
278         n: the number for which primeness is to be determined.
279
280     Returns:
281         True if n is prime and False otherwise.
282
283     .. note::
284
285          Obviously(?) very slow for very large input numbers until
286          we get quantum computers.
287
288     >>> is_prime(13)
289     True
290     >>> is_prime(22)
291     False
292     >>> is_prime(51602981)
293     True
294     """
295     if not isinstance(n, int):
296         raise TypeError("argument passed to is_prime is not of 'int' type")
297
298     # Corner cases
299     if n <= 1:
300         return False
301     if n <= 3:
302         return True
303
304     # This is checked so that we can skip middle five numbers in below
305     # loop
306     if n % 2 == 0 or n % 3 == 0:
307         return False
308
309     i = 5
310     while i * i <= n:
311         if n % i == 0 or n % (i + 2) == 0:
312             return False
313         i = i + 6
314     return True
315
316
317 if __name__ == "__main__":
318     import doctest
319
320     doctest.testmod()