Easier and more self documenting patterns for loading/saving Persistent
[python_utils.git] / dict_utils.py
1 #!/usr/bin/env python3
2
3 # © Copyright 2021-2022, Scott Gasch
4
5 """Helper functions for dealing with dictionaries."""
6
7 from itertools import islice
8 from typing import Any, Callable, Dict, Iterator, List, Tuple
9
10
11 def init_or_inc(
12     d: Dict[Any, Any],
13     key: Any,
14     *,
15     init_value: Any = 1,
16     inc_function: Callable[..., Any] = lambda x: x + 1,
17 ) -> bool:
18     """
19     Initialize a dict value (if it doesn't exist) or increments it (using the
20     inc_function, which is customizable) if it already does exist.  Returns
21     True if the key already existed or False otherwise.
22
23     >>> d = {}
24     >>> init_or_inc(d, "test")
25     False
26     >>> init_or_inc(d, "test")
27     True
28     >>> init_or_inc(d, 'ing')
29     False
30     >>> d
31     {'test': 2, 'ing': 1}
32
33     """
34     if key in d.keys():
35         d[key] = inc_function(d[key])
36         return True
37     d[key] = init_value
38     return False
39
40
41 def shard(d: Dict[Any, Any], size: int) -> Iterator[Dict[Any, Any]]:
42     """
43     Shards a dict into N subdicts which, together, contain all keys/values
44     from the original unsharded dict.
45     """
46     items = d.items()
47     for x in range(0, len(d), size):
48         yield dict(islice(items, x, x + size))
49
50
51 def coalesce_by_creating_list(_, new_value, old_value):
52     """Helper for use with :meth:`coalesce` that creates a list on
53     collision."""
54     from list_utils import flatten
55
56     return flatten([new_value, old_value])
57
58
59 def coalesce_by_creating_set(key, new_value, old_value):
60     """Helper for use with :meth:`coalesce` that creates a set on
61     collision."""
62     return set(coalesce_by_creating_list(key, new_value, old_value))
63
64
65 def coalesce_last_write_wins(_, new_value, discarded_old_value):
66     """Helper for use with :meth:`coalsce` that klobbers the old
67     with the new one on collision."""
68     return new_value
69
70
71 def coalesce_first_write_wins(_, discarded_new_value, old_value):
72     """Helper for use with :meth:`coalsce` that preserves the old
73     value and discards the new one on collision."""
74     return old_value
75
76
77 def raise_on_duplicated_keys(key, new_value, old_value):
78     """Helper for use with :meth:`coalesce` that raises an exception
79     when a collision is detected.
80     """
81     raise Exception(f'Key {key} is duplicated in more than one input dict.')
82
83
84 def coalesce(
85     inputs: Iterator[Dict[Any, Any]],
86     *,
87     aggregation_function: Callable[[Any, Any, Any], Any] = coalesce_by_creating_list,
88 ) -> Dict[Any, Any]:
89     """Merge N dicts into one dict containing the union of all keys /
90     values in the input dicts.  When keys collide, apply the
91     aggregation_function which, by default, creates a list of values.
92     See also several other alternative functions for coalescing values:
93
94         * :meth:`coalesce_by_creating_set`
95         * :meth:`coalesce_first_write_wins`
96         * :meth:`coalesce_last_write_wins`
97         * :meth:`raise_on_duplicated_keys`
98         * or provive your own collision resolution code.
99
100     >>> a = {'a': 1, 'b': 2}
101     >>> b = {'b': 1, 'c': 2, 'd': 3}
102     >>> c = {'c': 1, 'd': 2}
103     >>> coalesce([a, b, c])
104     {'a': 1, 'b': [1, 2], 'c': [1, 2], 'd': [2, 3]}
105
106     >>> coalesce([a, b, c], aggregation_function=coalesce_last_write_wins)
107     {'a': 1, 'b': 1, 'c': 1, 'd': 2}
108
109     >>> coalesce([a, b, c], aggregation_function=raise_on_duplicated_keys)
110     Traceback (most recent call last):
111     ...
112     Exception: Key b is duplicated in more than one input dict.
113
114     """
115     out: Dict[Any, Any] = {}
116     for d in inputs:
117         for key in d:
118             if key in out:
119                 value = aggregation_function(key, d[key], out[key])
120             else:
121                 value = d[key]
122             out[key] = value
123     return out
124
125
126 def item_with_max_value(d: Dict[Any, Any]) -> Tuple[Any, Any]:
127     """Returns the key and value of the item with the max value in a dict.
128
129     >>> d = {'a': 1, 'b': 2, 'c': 3}
130     >>> item_with_max_value(d)
131     ('c', 3)
132     >>> item_with_max_value({})
133     Traceback (most recent call last):
134     ...
135     ValueError: max() arg is an empty sequence
136
137     """
138     return max(d.items(), key=lambda _: _[1])
139
140
141 def item_with_min_value(d: Dict[Any, Any]) -> Tuple[Any, Any]:
142     """Returns the key and value of the item with the min value in a dict.
143
144     >>> d = {'a': 1, 'b': 2, 'c': 3}
145     >>> item_with_min_value(d)
146     ('a', 1)
147
148     """
149     return min(d.items(), key=lambda _: _[1])
150
151
152 def key_with_max_value(d: Dict[Any, Any]) -> Any:
153     """Returns the key with the max value in the dict.
154
155     >>> d = {'a': 1, 'b': 2, 'c': 3}
156     >>> key_with_max_value(d)
157     'c'
158
159     """
160     return item_with_max_value(d)[0]
161
162
163 def key_with_min_value(d: Dict[Any, Any]) -> Any:
164     """Returns the key with the min value in the dict.
165
166     >>> d = {'a': 1, 'b': 2, 'c': 3}
167     >>> key_with_min_value(d)
168     'a'
169
170     """
171     return item_with_min_value(d)[0]
172
173
174 def max_value(d: Dict[Any, Any]) -> Any:
175     """Returns the maximum value in the dict.
176
177     >>> d = {'a': 1, 'b': 2, 'c': 3}
178     >>> max_value(d)
179     3
180
181     """
182     return item_with_max_value(d)[1]
183
184
185 def min_value(d: Dict[Any, Any]) -> Any:
186     """Returns the minimum value in the dict.
187
188     >>> d = {'a': 1, 'b': 2, 'c': 3}
189     >>> min_value(d)
190     1
191
192     """
193     return item_with_min_value(d)[1]
194
195
196 def max_key(d: Dict[Any, Any]) -> Any:
197     """Returns the maximum key in dict (ignoring values totally)
198
199     >>> d = {'a': 3, 'b': 2, 'c': 1}
200     >>> max_key(d)
201     'c'
202
203     """
204     return max(d.keys())
205
206
207 def min_key(d: Dict[Any, Any]) -> Any:
208     """Returns the minimum key in dict (ignoring values totally)
209
210     >>> d = {'a': 3, 'b': 2, 'c': 1}
211     >>> min_key(d)
212     'a'
213
214     """
215     return min(d.keys())
216
217
218 def parallel_lists_to_dict(keys: List[Any], values: List[Any]) -> Dict[Any, Any]:
219     """Given two parallel lists (keys and values), create and return
220     a dict.
221
222     >>> k = ['name', 'phone', 'address', 'zip']
223     >>> v = ['scott', '555-1212', '123 main st.', '12345']
224     >>> parallel_lists_to_dict(k, v)
225     {'name': 'scott', 'phone': '555-1212', 'address': '123 main st.', 'zip': '12345'}
226
227     """
228     if len(keys) != len(values):
229         raise Exception("Parallel keys and values lists must have the same length")
230     return dict(zip(keys, values))
231
232
233 def dict_to_key_value_lists(d: Dict[Any, Any]) -> Tuple[List[Any], List[Any]]:
234     """
235     >>> d = {'name': 'scott', 'phone': '555-1212', 'address': '123 main st.', 'zip': '12345'}
236     >>> (k, v) = dict_to_key_value_lists(d)
237     >>> k
238     ['name', 'phone', 'address', 'zip']
239     >>> v
240     ['scott', '555-1212', '123 main st.', '12345']
241
242     """
243     r: Tuple[List[Any], List[Any]] = ([], [])
244     for (k, v) in d.items():
245         r[0].append(k)
246         r[1].append(v)
247     return r
248
249
250 if __name__ == '__main__':
251     import doctest
252
253     doctest.testmod()