Fix typo in README.
[pyutils.git] / src / pyutils / dict_utils.py
1 #!/usr/bin/env python3
2
3 # © Copyright 2021-2022, Scott Gasch
4
5 """This module contains helper functions for dealing with Python dictionaries."""
6
7 from itertools import islice
8 from typing import Any, Callable, Dict, Iterator, List, Tuple
9
10 from pyutils import dataclass_utils
11
12
13 def init_or_inc(
14     d: Dict[Any, Any],
15     key: Any,
16     *,
17     init_value: Any = 1,
18     inc_function: Callable[..., Any] = lambda x: x + 1,
19 ) -> bool:
20     """
21     Initialize a dict value (if it doesn't exist) or increments it (using the
22     inc_function, which is customizable) if it already does exist.
23
24     Args:
25         d: the dict to increment or initialize a value in
26         key: the key to increment or initialize
27         init_value: default initial value
28         inc_function: Callable use to increment a value
29
30     Returns:
31         True if the key already existed or False otherwise
32
33     See also: :py:class:`collections.defaultdict` and
34     :py:class:`collections.Counter`.
35
36     >>> d = {}
37     >>> init_or_inc(d, "test")
38     False
39     >>> init_or_inc(d, "test")
40     True
41     >>> init_or_inc(d, 'ing')
42     False
43     >>> d
44     {'test': 2, 'ing': 1}
45     """
46     if key in d.keys():
47         d[key] = inc_function(d[key])
48         return True
49     d[key] = init_value
50     return False
51
52
53 def shard(d: Dict[Any, Any], size: int) -> Iterator[Dict[Any, Any]]:
54     """
55     Shards (i.e. splits) a dict into N subdicts which, together,
56     contain all keys/values from the original unsharded dict.
57
58     Args:
59         d: the input dict to be sharded (split)
60         size: the ideal shard size (number of elements per shard)
61
62     Returns:
63         A generator that yields subsequent shards.
64
65     .. note::
66
67         If `len(d)` is not an even multiple of `size` then the last
68         shard will not have `size` items in it.  It will have
69         `len(d) % size` items instead.
70
71     >>> d = {
72     ...     'a': 1, 'b': 2, 'c': 3, 'd': 4, 'e': 5, 'f': 6,
73     ...     'g': 7, 'h': 8, 'i': 9, 'j': 10, 'k': 11, 'l': 12,
74     ... }
75     >>> for r in shard(d, 5):
76     ...     r
77     {'a': 1, 'b': 2, 'c': 3, 'd': 4, 'e': 5}
78     {'f': 6, 'g': 7, 'h': 8, 'i': 9, 'j': 10}
79     {'k': 11, 'l': 12}
80     """
81     items = d.items()
82     for x in range(0, len(d), size):
83         yield dict(islice(items, x, x + size))
84
85
86 def coalesce_by_creating_list(_, new_value, old_value):
87     """Helper for use with :meth:`coalesce` that creates a list on
88     collision."""
89     from pyutils.list_utils import flatten
90
91     return flatten([new_value, old_value])
92
93
94 def coalesce_by_creating_set(key, new_value, old_value):
95     """Helper for use with :meth:`coalesce` that creates a set on
96     collision."""
97     return set(coalesce_by_creating_list(key, new_value, old_value))
98
99
100 def coalesce_last_write_wins(_, new_value, discarded_old_value):
101     """Helper for use with :meth:`coalsce` that klobbers the old
102     with the new one on collision."""
103     return new_value
104
105
106 def coalesce_first_write_wins(_, discarded_new_value, old_value):
107     """Helper for use with :meth:`coalsce` that preserves the old
108     value and discards the new one on collision."""
109     return old_value
110
111
112 def raise_on_duplicated_keys(key, new_value, old_value):
113     """Helper for use with :meth:`coalesce` that raises an exception
114     when a collision is detected.
115     """
116     raise Exception(f'Key {key} is duplicated in more than one input dict.')
117
118
119 def coalesce(
120     inputs: Iterator[Dict[Any, Any]],
121     *,
122     aggregation_function: Callable[[Any, Any, Any], Any] = coalesce_by_creating_list,
123 ) -> Dict[Any, Any]:
124     """Coalesce (i.e. combine) N input dicts into one output dict
125     ontaining the union of all keys / values in every input dict.
126     When keys collide, apply the aggregation_function which, by
127     default, creates a list of values with the same key in the output
128     dict.
129
130     Args:
131         inputs: an iterable set of dicts to coalesce
132         aggregation_function: a Callable to deal with key collisions; one of
133             the below functions already defined or your own strategy:
134
135             * :meth:`coalesce_by_creating_list` creates a list of values
136               with the same key in the output dict.
137             * :meth:`coalesce_by_creating_set` creates a set of values with
138               the same key in the output dict.
139             * :meth:`coalesce_first_write_wins` only preserves the first
140               value with a duplicated key.  Others are dropped silently.
141             * :meth:`coalesce_last_write_wins` only preserves the last
142               value with a duplicated key.  Others are dropped silently.
143             * :meth:`raise_on_duplicated_keys` raises an Exception on
144               duplicated keys; use when keys should never collide.
145             * Your own strategy; Callables will be passed the key and
146               two values and can return whatever they want which will
147               be stored in the output dict.
148
149     Returns:
150         The coalesced output dict.
151
152     >>> a = {'a': 1, 'b': 2}
153     >>> b = {'b': 1, 'c': 2, 'd': 3}
154     >>> c = {'c': 1, 'd': 2}
155     >>> coalesce([a, b, c])
156     {'a': 1, 'b': [1, 2], 'c': [1, 2], 'd': [2, 3]}
157
158     >>> coalesce([a, b, c], aggregation_function=coalesce_last_write_wins)
159     {'a': 1, 'b': 1, 'c': 1, 'd': 2}
160
161     >>> coalesce([a, b, c], aggregation_function=raise_on_duplicated_keys)
162     Traceback (most recent call last):
163     ...
164     Exception: Key b is duplicated in more than one input dict.
165     """
166     out: Dict[Any, Any] = {}
167     for d in inputs:
168         for key in d:
169             if key in out:
170                 value = aggregation_function(key, d[key], out[key])
171             else:
172                 value = d[key]
173             out[key] = value
174     return out
175
176
177 def item_with_max_value(d: Dict[Any, Any]) -> Tuple[Any, Any]:
178     """
179     Args:
180         d: a dict with comparable values
181
182     Returns:
183         The key and value of the item with the highest value in a
184         dict as a `Tuple[key, value]`.
185
186     >>> d = {'a': 1, 'b': 2, 'c': 3}
187     >>> item_with_max_value(d)
188     ('c', 3)
189     >>> item_with_max_value({})
190     Traceback (most recent call last):
191     ...
192     ValueError: max() arg is an empty sequence
193
194     """
195     return max(d.items(), key=lambda _: _[1])
196
197
198 def item_with_min_value(d: Dict[Any, Any]) -> Tuple[Any, Any]:
199     """
200     Args:
201         d: a dict with comparable values
202
203     Returns:
204         The key and value of the item with the lowest value in a
205         dict as a `Tuple[key, value]`.
206
207     >>> d = {'a': 1, 'b': 2, 'c': 3}
208     >>> item_with_min_value(d)
209     ('a', 1)
210
211     """
212     return min(d.items(), key=lambda _: _[1])
213
214
215 def key_with_max_value(d: Dict[Any, Any]) -> Any:
216     """
217     Args:
218         d: a dict with comparable keys
219
220     Returns:
221         The maximum key in the dict when comparing the keys with
222         each other.
223
224     .. note:: This code totally ignores values; it is comparing key
225         against key to find the maximum key in the keyspace.
226
227     >>> d = {'a': 1, 'b': 2, 'c': 3}
228     >>> key_with_max_value(d)
229     'c'
230
231     """
232     return item_with_max_value(d)[0]
233
234
235 def key_with_min_value(d: Dict[Any, Any]) -> Any:
236     """
237     Args:
238         d: a dict with comparable keys
239
240     Returns:
241         The minimum key in the dict when comparing the keys with
242         each other.
243
244     .. note:: This code totally ignores values; it is comparing key
245         against key to find the minimum key in the keyspace.
246
247     >>> d = {'a': 1, 'b': 2, 'c': 3}
248     >>> key_with_min_value(d)
249     'a'
250
251     """
252     return item_with_min_value(d)[0]
253
254
255 def max_value(d: Dict[Any, Any]) -> Any:
256     """
257     Args:
258         d: a dict with compatable values
259
260     Returns:
261         The maximum value in the dict *without its key*.
262
263     >>> d = {'a': 1, 'b': 2, 'c': 3}
264     >>> max_value(d)
265     3
266     """
267     return item_with_max_value(d)[1]
268
269
270 def min_value(d: Dict[Any, Any]) -> Any:
271     """
272     Args:
273         d: a dict with comparable values
274
275     Returns:
276         The minimum value in the dict *without its key*.
277
278     >>> d = {'a': 1, 'b': 2, 'c': 3}
279     >>> min_value(d)
280     1
281     """
282     return item_with_min_value(d)[1]
283
284
285 def max_key(d: Dict[Any, Any]) -> Any:
286     """
287     Args:
288         d: a dict with comparable keys
289
290     Returns:
291         The maximum key in dict (ignoring values totally)
292
293     .. note:: This code totally ignores values; it is comparing key
294         against key to find the maximum key in the keyspace.
295
296     >>> d = {'a': 3, 'b': 2, 'c': 1}
297     >>> max_key(d)
298     'c'
299     """
300     return max(d.keys())
301
302
303 def min_key(d: Dict[Any, Any]) -> Any:
304     """
305     Args:
306         d: a dict with comparable keys
307
308     Returns:
309         The minimum key in dict (ignoring values totally)
310
311     .. note:: This code totally ignores values; it is comparing key
312         against key to find the minimum key in the keyspace.
313
314     >>> d = {'a': 3, 'b': 2, 'c': 1}
315     >>> min_key(d)
316     'a'
317     """
318     return min(d.keys())
319
320
321 def parallel_lists_to_dict(keys: List[Any], values: List[Any]) -> Dict[Any, Any]:
322     """Given two parallel lists (keys and values), create and return
323     a dict.
324
325     Args:
326         keys: list containing keys and no duplicated keys
327         values: a parallel list (to keys) containing values
328
329     Returns:
330         A dict composed of zipping the keys list and values list together.
331
332     >>> k = ['name', 'phone', 'address', 'zip']
333     >>> v = ['scott', '555-1212', '123 main st.', '12345']
334     >>> parallel_lists_to_dict(k, v)
335     {'name': 'scott', 'phone': '555-1212', 'address': '123 main st.', 'zip': '12345'}
336     """
337     if len(keys) != len(values):
338         raise Exception("Parallel keys and values lists must have the same length")
339     return dict(zip(keys, values))
340
341
342 def dict_to_key_value_lists(d: Dict[Any, Any]) -> Tuple[List[Any], List[Any]]:
343     """Given a dict, decompose it into a list of keys and values.
344
345     Args:
346         d: a dict
347
348     Returns:
349         A tuple of two elements: the first is the keys list and the second
350         is the values list.
351
352     >>> d = {'name': 'scott', 'phone': '555-1212', 'address': '123 main st.', 'zip': '12345'}
353     >>> (k, v) = dict_to_key_value_lists(d)
354     >>> k
355     ['name', 'phone', 'address', 'zip']
356     >>> v
357     ['scott', '555-1212', '123 main st.', '12345']
358     """
359     r: Tuple[List[Any], List[Any]] = ([], [])
360     for (k, v) in d.items():
361         r[0].append(k)
362         r[1].append(v)
363     return r
364
365
366 dict_to_dataclass = dataclass_utils.dataclass_from_dict
367
368 dict_from_dataclass = dataclass_utils.dataclass_to_dict
369
370
371 if __name__ == '__main__':
372     import doctest
373
374     doctest.testmod()