Comparable.. and better type hints on dict_utils.
[pyutils.git] / src / pyutils / dict_utils.py
1 #!/usr/bin/env python3
2
3 # © Copyright 2021-2023, Scott Gasch
4
5 """This module contains helper functions for dealing with Python dictionaries."""
6
7 from itertools import islice
8 from typing import Any, Callable, Dict, Hashable, Iterator, List, Tuple
9
10 from pyutils import dataclass_utils
11 from pyutils.typez.typing import Comparable
12
13 AnyDict = Dict[Hashable, Any]
14
15
16 def init_or_inc(
17     d: AnyDict,
18     key: Hashable,
19     *,
20     init_value: Any = 1,
21     inc_function: Callable[..., Any] = lambda x: x + 1,
22 ) -> bool:
23     """
24     Initialize a dict value (if it doesn't exist) or increments it (using the
25     inc_function, which is customizable) if it already does exist.
26
27     Args:
28         d: the dict to increment or initialize a value in
29         key: the key to increment or initialize
30         init_value: default initial value
31         inc_function: Callable use to increment a value
32
33     Returns:
34         True if the key already existed or False otherwise
35
36     See also: :py:class:`collections.defaultdict` and
37     :py:class:`collections.Counter`.
38
39     >>> d = {}
40     >>> init_or_inc(d, "test")
41     False
42     >>> init_or_inc(d, "test")
43     True
44     >>> init_or_inc(d, 'ing')
45     False
46     >>> d
47     {'test': 2, 'ing': 1}
48     """
49     if key in d.keys():
50         d[key] = inc_function(d[key])
51         return True
52     d[key] = init_value
53     return False
54
55
56 def shard(d: AnyDict, size: int) -> Iterator[AnyDict]:
57     """
58     Shards (i.e. splits) a dict into N subdicts which, together,
59     contain all keys/values from the original unsharded dict.
60
61     Args:
62         d: the input dict to be sharded (split)
63         size: the ideal shard size (number of elements per shard)
64
65     Returns:
66         A generator that yields subsequent shards.
67
68     .. note::
69
70         If `len(d)` is not an even multiple of `size` then the last
71         shard will not have `size` items in it.  It will have
72         `len(d) % size` items instead.
73
74     >>> d = {
75     ...     'a': 1, 'b': 2, 'c': 3, 'd': 4, 'e': 5, 'f': 6,
76     ...     'g': 7, 'h': 8, 'i': 9, 'j': 10, 'k': 11, 'l': 12,
77     ... }
78     >>> for r in shard(d, 5):
79     ...     r
80     {'a': 1, 'b': 2, 'c': 3, 'd': 4, 'e': 5}
81     {'f': 6, 'g': 7, 'h': 8, 'i': 9, 'j': 10}
82     {'k': 11, 'l': 12}
83     """
84     items = d.items()
85     for x in range(0, len(d), size):
86         yield dict(islice(items, x, x + size))
87
88
89 def coalesce_by_creating_list(_, new_value, old_value):
90     """Helper for use with :meth:`coalesce` that creates a list on
91     collision."""
92     from pyutils.list_utils import flatten
93
94     return flatten([new_value, old_value])
95
96
97 def coalesce_by_creating_set(key, new_value, old_value):
98     """Helper for use with :meth:`coalesce` that creates a set on
99     collision."""
100     return set(coalesce_by_creating_list(key, new_value, old_value))
101
102
103 def coalesce_last_write_wins(_, new_value, discarded_old_value):
104     """Helper for use with :meth:`coalsce` that klobbers the old
105     with the new one on collision."""
106     return new_value
107
108
109 def coalesce_first_write_wins(_, discarded_new_value, old_value):
110     """Helper for use with :meth:`coalsce` that preserves the old
111     value and discards the new one on collision."""
112     return old_value
113
114
115 def raise_on_duplicated_keys(key, new_value, old_value):
116     """Helper for use with :meth:`coalesce` that raises an exception
117     when a collision is detected.
118     """
119     raise Exception(f'Key {key} is duplicated in more than one input dict.')
120
121
122 def coalesce(
123     inputs: Iterator[AnyDict],
124     *,
125     aggregation_function: Callable[[Any, Any, Any], Any] = coalesce_by_creating_list,
126 ) -> AnyDict:
127     """Coalesce (i.e. combine) N input dicts into one output dict
128     ontaining the union of all keys / values in every input dict.
129     When keys collide, apply the aggregation_function which, by
130     default, creates a list of values with the same key in the output
131     dict.
132
133     Args:
134         inputs: an iterable set of dicts to coalesce
135         aggregation_function: a Callable to deal with key collisions; one of
136             the below functions already defined or your own strategy:
137
138             * :meth:`coalesce_by_creating_list` creates a list of values
139               with the same key in the output dict.
140             * :meth:`coalesce_by_creating_set` creates a set of values with
141               the same key in the output dict.
142             * :meth:`coalesce_first_write_wins` only preserves the first
143               value with a duplicated key.  Others are dropped silently.
144             * :meth:`coalesce_last_write_wins` only preserves the last
145               value with a duplicated key.  Others are dropped silently.
146             * :meth:`raise_on_duplicated_keys` raises an Exception on
147               duplicated keys; use when keys should never collide.
148             * Your own strategy; Callables will be passed the key and
149               two values and can return whatever they want which will
150               be stored in the output dict.
151
152     Returns:
153         The coalesced output dict.
154
155     >>> a = {'a': 1, 'b': 2}
156     >>> b = {'b': 1, 'c': 2, 'd': 3}
157     >>> c = {'c': 1, 'd': 2}
158     >>> coalesce([a, b, c])
159     {'a': 1, 'b': [1, 2], 'c': [1, 2], 'd': [2, 3]}
160
161     >>> coalesce([a, b, c], aggregation_function=coalesce_last_write_wins)
162     {'a': 1, 'b': 1, 'c': 1, 'd': 2}
163
164     >>> coalesce([a, b, c], aggregation_function=raise_on_duplicated_keys)
165     Traceback (most recent call last):
166     ...
167     Exception: Key b is duplicated in more than one input dict.
168     """
169     out: AnyDict = {}
170     for d in inputs:
171         for key in d:
172             if key in out:
173                 value = aggregation_function(key, d[key], out[key])
174             else:
175                 value = d[key]
176             out[key] = value
177     return out
178
179
180 def item_with_max_value(d: AnyDict) -> Tuple[Hashable, Any]:
181     """
182     Args:
183         d: a dict with comparable values
184
185     Returns:
186         The key and value of the item with the highest value in a
187         dict as a `Tuple[key, value]`.
188
189     >>> d = {'a': 1, 'b': 2, 'c': 3}
190     >>> item_with_max_value(d)
191     ('c', 3)
192     >>> item_with_max_value({})
193     Traceback (most recent call last):
194     ...
195     ValueError: max() arg is an empty sequence
196
197     """
198     return max(d.items(), key=lambda _: _[1])
199
200
201 def item_with_min_value(d: AnyDict) -> Tuple[Hashable, Any]:
202     """
203     Args:
204         d: a dict with comparable values
205
206     Returns:
207         The key and value of the item with the lowest value in a
208         dict as a `Tuple[key, value]`.
209
210     >>> d = {'a': 1, 'b': 2, 'c': 3}
211     >>> item_with_min_value(d)
212     ('a', 1)
213
214     """
215     return min(d.items(), key=lambda _: _[1])
216
217
218 def key_with_max_value(d: AnyDict) -> Hashable:
219     """
220     Args:
221         d: a dict with comparable keys
222
223     Returns:
224         The maximum key in the dict when comparing the keys with
225         each other.
226
227     .. note:: This code totally ignores values; it is comparing key
228         against key to find the maximum key in the keyspace.
229
230     >>> d = {'a': 1, 'b': 2, 'c': 3}
231     >>> key_with_max_value(d)
232     'c'
233
234     """
235     return item_with_max_value(d)[0]
236
237
238 def key_with_min_value(d: AnyDict) -> Hashable:
239     """
240     Args:
241         d: a dict with comparable keys
242
243     Returns:
244         The minimum key in the dict when comparing the keys with
245         each other.
246
247     .. note:: This code totally ignores values; it is comparing key
248         against key to find the minimum key in the keyspace.
249
250     >>> d = {'a': 1, 'b': 2, 'c': 3}
251     >>> key_with_min_value(d)
252     'a'
253
254     """
255     return item_with_min_value(d)[0]
256
257
258 def max_value(d: AnyDict) -> Any:
259     """
260     Args:
261         d: a dict with compatable values
262
263     Returns:
264         The maximum value in the dict *without its key*.
265
266     >>> d = {'a': 1, 'b': 2, 'c': 3}
267     >>> max_value(d)
268     3
269     """
270     return item_with_max_value(d)[1]
271
272
273 def min_value(d: AnyDict) -> Any:
274     """
275     Args:
276         d: a dict with comparable values
277
278     Returns:
279         The minimum value in the dict *without its key*.
280
281     >>> d = {'a': 1, 'b': 2, 'c': 3}
282     >>> min_value(d)
283     1
284     """
285     return item_with_min_value(d)[1]
286
287
288 def max_key(d: Dict[Comparable, Any]) -> Comparable:
289     """
290     Args:
291         d: a dict with comparable keys
292
293     Returns:
294         The maximum key in dict (ignoring values totally)
295
296     .. note:: This code totally ignores values; it is comparing key
297         against key to find the maximum key in the keyspace.
298
299     >>> d = {'a': 3, 'b': 2, 'c': 1}
300     >>> max_key(d)
301     'c'
302     """
303     return max(d.keys())
304
305
306 def min_key(d: Dict[Comparable, Any]) -> Comparable:
307     """
308     Args:
309         d: a dict with comparable keys
310
311     Returns:
312         The minimum key in dict (ignoring values totally)
313
314     .. note:: This code totally ignores values; it is comparing key
315         against key to find the minimum key in the keyspace.
316
317     >>> d = {'a': 3, 'b': 2, 'c': 1}
318     >>> min_key(d)
319     'a'
320     """
321     return min(d.keys())
322
323
324 def parallel_lists_to_dict(keys: List[Hashable], values: List[Any]) -> AnyDict:
325     """Given two parallel lists (keys and values), create and return
326     a dict.
327
328     Args:
329         keys: list containing keys and no duplicated keys
330         values: a parallel list (to keys) containing values
331
332     Returns:
333         A dict composed of zipping the keys list and values list together.
334
335     >>> k = ['name', 'phone', 'address', 'zip']
336     >>> v = ['scott', '555-1212', '123 main st.', '12345']
337     >>> parallel_lists_to_dict(k, v)
338     {'name': 'scott', 'phone': '555-1212', 'address': '123 main st.', 'zip': '12345'}
339     """
340     if len(keys) != len(values):
341         raise Exception("Parallel keys and values lists must have the same length")
342     return dict(zip(keys, values))
343
344
345 def dict_to_key_value_lists(d: AnyDict) -> Tuple[List[Hashable], List[Any]]:
346     """Given a dict, decompose it into a list of keys and values.
347
348     Args:
349         d: a dict
350
351     Returns:
352         A tuple of two elements: the first is the keys list and the second
353         is the values list.
354
355     >>> d = {'name': 'scott', 'phone': '555-1212', 'address': '123 main st.', 'zip': '12345'}
356     >>> (k, v) = dict_to_key_value_lists(d)
357     >>> k
358     ['name', 'phone', 'address', 'zip']
359     >>> v
360     ['scott', '555-1212', '123 main st.', '12345']
361     """
362     r: Tuple[List[Any], List[Any]] = ([], [])
363     for (k, v) in d.items():
364         r[0].append(k)
365         r[1].append(v)
366     return r
367
368
369 dict_to_dataclass = dataclass_utils.dataclass_from_dict
370
371 dict_from_dataclass = dataclass_utils.dataclass_to_dict
372
373
374 if __name__ == '__main__':
375     import doctest
376
377     doctest.testmod()