Optional hooks for system wide non-zero exit log and unhandled top
[pyutils.git] / src / pyutils / dict_utils.py
1 #!/usr/bin/env python3
2
3 # © Copyright 2021-2023, Scott Gasch
4
5 """This module contains helper functions for dealing with Python dictionaries."""
6
7 from itertools import islice
8 from typing import Any, Callable, Dict, Hashable, Iterator, List, Tuple
9
10 from pyutils import dataclass_utils
11 from pyutils.typez.typing import Comparable
12
13 AnyDict = Dict[Hashable, Any]
14 DictWithComparableKeys = Dict[Comparable, Any]
15
16
17 def init_or_inc(
18     d: AnyDict,
19     key: Hashable,
20     *,
21     init_value: Any = 1,
22     inc_function: Callable[..., Any] = lambda x: x + 1,
23 ) -> bool:
24     """
25     Initialize a dict value (if it doesn't exist) or increments it (using the
26     inc_function, which is customizable) if it already does exist.
27
28     Args:
29         d: the dict to increment or initialize a value in
30         key: the key to increment or initialize
31         init_value: default initial value
32         inc_function: Callable use to increment a value
33
34     Returns:
35         True if the key already existed or False otherwise
36
37     See also: :py:class:`collections.defaultdict` and
38     :py:class:`collections.Counter`.
39
40     >>> d = {}
41     >>> init_or_inc(d, "test")
42     False
43     >>> init_or_inc(d, "test")
44     True
45     >>> init_or_inc(d, 'ing')
46     False
47     >>> d
48     {'test': 2, 'ing': 1}
49     """
50     if key in d.keys():
51         d[key] = inc_function(d[key])
52         return True
53     d[key] = init_value
54     return False
55
56
57 def shard(d: AnyDict, size: int) -> Iterator[AnyDict]:
58     """
59     Shards (i.e. splits) a dict into N subdicts which, together,
60     contain all keys/values from the original unsharded dict.
61
62     Args:
63         d: the input dict to be sharded (split)
64         size: the ideal shard size (number of elements per shard)
65
66     Returns:
67         A generator that yields subsequent shards.
68
69     .. note::
70
71         If `len(d)` is not an even multiple of `size` then the last
72         shard will not have `size` items in it.  It will have
73         `len(d) % size` items instead.
74
75     >>> d = {
76     ...     'a': 1, 'b': 2, 'c': 3, 'd': 4, 'e': 5, 'f': 6,
77     ...     'g': 7, 'h': 8, 'i': 9, 'j': 10, 'k': 11, 'l': 12,
78     ... }
79     >>> for r in shard(d, 5):
80     ...     r
81     {'a': 1, 'b': 2, 'c': 3, 'd': 4, 'e': 5}
82     {'f': 6, 'g': 7, 'h': 8, 'i': 9, 'j': 10}
83     {'k': 11, 'l': 12}
84     """
85     items = d.items()
86     for x in range(0, len(d), size):
87         yield dict(islice(items, x, x + size))
88
89
90 def coalesce_by_creating_list(_, new_value, old_value):
91     """Helper for use with :meth:`coalesce` that creates a list on
92     collision."""
93     from pyutils.list_utils import flatten
94
95     return flatten([new_value, old_value])
96
97
98 def coalesce_by_creating_set(key, new_value, old_value):
99     """Helper for use with :meth:`coalesce` that creates a set on
100     collision."""
101     return set(coalesce_by_creating_list(key, new_value, old_value))
102
103
104 def coalesce_last_write_wins(_, new_value, discarded_old_value):
105     """Helper for use with :meth:`coalsce` that klobbers the old
106     with the new one on collision."""
107     return new_value
108
109
110 def coalesce_first_write_wins(_, discarded_new_value, old_value):
111     """Helper for use with :meth:`coalsce` that preserves the old
112     value and discards the new one on collision."""
113     return old_value
114
115
116 def raise_on_duplicated_keys(key, new_value, old_value):
117     """Helper for use with :meth:`coalesce` that raises an exception
118     when a collision is detected.
119     """
120     raise Exception(f'Key {key} is duplicated in more than one input dict.')
121
122
123 def coalesce(
124     inputs: Iterator[AnyDict],
125     *,
126     aggregation_function: Callable[[Any, Any, Any], Any] = coalesce_by_creating_list,
127 ) -> AnyDict:
128     """Coalesce (i.e. combine) N input dicts into one output dict
129     ontaining the union of all keys / values in every input dict.
130     When keys collide, apply the aggregation_function which, by
131     default, creates a list of values with the same key in the output
132     dict.
133
134     Args:
135         inputs: an iterable set of dicts to coalesce
136         aggregation_function: a Callable to deal with key collisions; one of
137             the below functions already defined or your own strategy:
138
139             * :meth:`coalesce_by_creating_list` creates a list of values
140               with the same key in the output dict.
141             * :meth:`coalesce_by_creating_set` creates a set of values with
142               the same key in the output dict.
143             * :meth:`coalesce_first_write_wins` only preserves the first
144               value with a duplicated key.  Others are dropped silently.
145             * :meth:`coalesce_last_write_wins` only preserves the last
146               value with a duplicated key.  Others are dropped silently.
147             * :meth:`raise_on_duplicated_keys` raises an Exception on
148               duplicated keys; use when keys should never collide.
149             * Your own strategy; Callables will be passed the key and
150               two values and can return whatever they want which will
151               be stored in the output dict.
152
153     Returns:
154         The coalesced output dict.
155
156     >>> a = {'a': 1, 'b': 2}
157     >>> b = {'b': 1, 'c': 2, 'd': 3}
158     >>> c = {'c': 1, 'd': 2}
159     >>> coalesce([a, b, c])
160     {'a': 1, 'b': [1, 2], 'c': [1, 2], 'd': [2, 3]}
161
162     >>> coalesce([a, b, c], aggregation_function=coalesce_last_write_wins)
163     {'a': 1, 'b': 1, 'c': 1, 'd': 2}
164
165     >>> coalesce([a, b, c], aggregation_function=raise_on_duplicated_keys)
166     Traceback (most recent call last):
167     ...
168     Exception: Key b is duplicated in more than one input dict.
169     """
170     out: AnyDict = {}
171     for d in inputs:
172         for key in d:
173             if key in out:
174                 value = aggregation_function(key, d[key], out[key])
175             else:
176                 value = d[key]
177             out[key] = value
178     return out
179
180
181 def item_with_max_value(d: AnyDict) -> Tuple[Hashable, Any]:
182     """
183     Args:
184         d: a dict with comparable values
185
186     Returns:
187         The key and value of the item with the highest value in a
188         dict as a `Tuple[key, value]`.
189
190     >>> d = {'a': 1, 'b': 2, 'c': 3}
191     >>> item_with_max_value(d)
192     ('c', 3)
193     >>> item_with_max_value({})
194     Traceback (most recent call last):
195     ...
196     ValueError: max() arg is an empty sequence
197
198     """
199     return max(d.items(), key=lambda _: _[1])
200
201
202 def item_with_min_value(d: AnyDict) -> Tuple[Hashable, Any]:
203     """
204     Args:
205         d: a dict with comparable values
206
207     Returns:
208         The key and value of the item with the lowest value in a
209         dict as a `Tuple[key, value]`.
210
211     >>> d = {'a': 1, 'b': 2, 'c': 3}
212     >>> item_with_min_value(d)
213     ('a', 1)
214
215     """
216     return min(d.items(), key=lambda _: _[1])
217
218
219 def key_with_max_value(d: AnyDict) -> Hashable:
220     """
221     Args:
222         d: a dict with comparable keys
223
224     Returns:
225         The maximum key in the dict when comparing the keys with
226         each other.
227
228     .. note:: This code totally ignores values; it is comparing key
229         against key to find the maximum key in the keyspace.
230
231     >>> d = {'a': 1, 'b': 2, 'c': 3}
232     >>> key_with_max_value(d)
233     'c'
234
235     """
236     return item_with_max_value(d)[0]
237
238
239 def key_with_min_value(d: AnyDict) -> Hashable:
240     """
241     Args:
242         d: a dict with comparable keys
243
244     Returns:
245         The minimum key in the dict when comparing the keys with
246         each other.
247
248     .. note:: This code totally ignores values; it is comparing key
249         against key to find the minimum key in the keyspace.
250
251     >>> d = {'a': 1, 'b': 2, 'c': 3}
252     >>> key_with_min_value(d)
253     'a'
254
255     """
256     return item_with_min_value(d)[0]
257
258
259 def max_value(d: AnyDict) -> Any:
260     """
261     Args:
262         d: a dict with compatable values
263
264     Returns:
265         The maximum value in the dict *without its key*.
266
267     >>> d = {'a': 1, 'b': 2, 'c': 3}
268     >>> max_value(d)
269     3
270     """
271     return item_with_max_value(d)[1]
272
273
274 def min_value(d: AnyDict) -> Any:
275     """
276     Args:
277         d: a dict with comparable values
278
279     Returns:
280         The minimum value in the dict *without its key*.
281
282     >>> d = {'a': 1, 'b': 2, 'c': 3}
283     >>> min_value(d)
284     1
285     """
286     return item_with_min_value(d)[1]
287
288
289 def max_key(d: DictWithComparableKeys) -> Comparable:
290     """
291     Args:
292         d: a dict with comparable keys
293
294     Returns:
295         The maximum key in dict (ignoring values totally)
296
297     .. note:: This code totally ignores values; it is comparing key
298         against key to find the maximum key in the keyspace.
299
300     >>> d = {'a': 3, 'b': 2, 'c': 1}
301     >>> max_key(d)
302     'c'
303     """
304     return max(d.keys())
305
306
307 def min_key(d: DictWithComparableKeys) -> Comparable:
308     """
309     Args:
310         d: a dict with comparable keys
311
312     Returns:
313         The minimum key in dict (ignoring values totally)
314
315     .. note:: This code totally ignores values; it is comparing key
316         against key to find the minimum key in the keyspace.
317
318     >>> d = {'a': 3, 'b': 2, 'c': 1}
319     >>> min_key(d)
320     'a'
321     """
322     return min(d.keys())
323
324
325 def parallel_lists_to_dict(keys: List[Hashable], values: List[Any]) -> AnyDict:
326     """Given two parallel lists (keys and values), create and return
327     a dict.
328
329     Args:
330         keys: list containing keys and no duplicated keys
331         values: a parallel list (to keys) containing values
332
333     Returns:
334         A dict composed of zipping the keys list and values list together.
335
336     >>> k = ['name', 'phone', 'address', 'zip']
337     >>> v = ['scott', '555-1212', '123 main st.', '12345']
338     >>> parallel_lists_to_dict(k, v)
339     {'name': 'scott', 'phone': '555-1212', 'address': '123 main st.', 'zip': '12345'}
340     """
341     if len(keys) != len(values):
342         raise Exception("Parallel keys and values lists must have the same length")
343     return dict(zip(keys, values))
344
345
346 def dict_to_key_value_lists(d: AnyDict) -> Tuple[List[Hashable], List[Any]]:
347     """Given a dict, decompose it into a list of keys and values.
348
349     Args:
350         d: a dict
351
352     Returns:
353         A tuple of two elements: the first is the keys list and the second
354         is the values list.
355
356     >>> d = {'name': 'scott', 'phone': '555-1212', 'address': '123 main st.', 'zip': '12345'}
357     >>> (k, v) = dict_to_key_value_lists(d)
358     >>> k
359     ['name', 'phone', 'address', 'zip']
360     >>> v
361     ['scott', '555-1212', '123 main st.', '12345']
362     """
363     r: Tuple[List[Any], List[Any]] = ([], [])
364     for (k, v) in d.items():
365         r[0].append(k)
366         r[1].append(v)
367     return r
368
369
370 dict_to_dataclass = dataclass_utils.dataclass_from_dict
371
372 dict_from_dataclass = dataclass_utils.dataclass_to_dict
373
374
375 if __name__ == '__main__':
376     import doctest
377
378     doctest.testmod()