Convert comment into doctest.
[python_utils.git] / dict_utils.py
1 #!/usr/bin/env python3
2
3 # © Copyright 2021-2022, Scott Gasch
4
5 """Helper functions for dealing with dictionaries."""
6
7 from itertools import islice
8 from typing import Any, Callable, Dict, Iterator, List, Tuple
9
10
11 def init_or_inc(
12     d: Dict[Any, Any],
13     key: Any,
14     *,
15     init_value: Any = 1,
16     inc_function: Callable[..., Any] = lambda x: x + 1,
17 ) -> bool:
18     """
19     Initialize a dict value (if it doesn't exist) or increments it (using the
20     inc_function, which is customizable) if it already does exist.  Returns
21     True if the key already existed or False otherwise.
22
23     >>> d = {}
24     >>> init_or_inc(d, "test")
25     False
26     >>> init_or_inc(d, "test")
27     True
28     >>> init_or_inc(d, 'ing')
29     False
30     >>> d
31     {'test': 2, 'ing': 1}
32
33     """
34     if key in d.keys():
35         d[key] = inc_function(d[key])
36         return True
37     d[key] = init_value
38     return False
39
40
41 def shard(d: Dict[Any, Any], size: int) -> Iterator[Dict[Any, Any]]:
42     """
43     Shards a dict into N subdicts which, together, contain all keys/values
44     from the original unsharded dict.
45
46     """
47     items = d.items()
48     for x in range(0, len(d), size):
49         yield dict(islice(items, x, x + size))
50
51
52 def coalesce_by_creating_list(_, new_value, old_value):
53     from list_utils import flatten
54
55     return flatten([new_value, old_value])
56
57
58 def coalesce_by_creating_set(key, new_value, old_value):
59     return set(coalesce_by_creating_list(key, new_value, old_value))
60
61
62 def coalesce_last_write_wins(_, new_value, discarded_old_value):
63     return new_value
64
65
66 def coalesce_first_write_wins(_, discarded_new_value, old_value):
67     return old_value
68
69
70 def raise_on_duplicated_keys(key, new_value, old_value):
71     raise Exception(f'Key {key} is duplicated in more than one input dict.')
72
73
74 def coalesce(
75     inputs: Iterator[Dict[Any, Any]],
76     *,
77     aggregation_function: Callable[[Any, Any, Any], Any] = coalesce_by_creating_list,
78 ) -> Dict[Any, Any]:
79     """Merge N dicts into one dict containing the union of all keys /
80     values in the input dicts.  When keys collide, apply the
81     aggregation_function which, by default, creates a list of values.
82     See also several other alternative functions for coalescing values
83     (coalesce_by_creating_set, coalesce_first_write_wins,
84     coalesce_last_write_wins, raise_on_duplicated_keys) or provide a
85     custom helper function.
86
87     >>> a = {'a': 1, 'b': 2}
88     >>> b = {'b': 1, 'c': 2, 'd': 3}
89     >>> c = {'c': 1, 'd': 2}
90     >>> coalesce([a, b, c])
91     {'a': 1, 'b': [1, 2], 'c': [1, 2], 'd': [2, 3]}
92
93     >>> coalesce([a, b, c], aggregation_function=coalesce_last_write_wins)
94     {'a': 1, 'b': 1, 'c': 1, 'd': 2}
95
96     >>> coalesce([a, b, c], aggregation_function=raise_on_duplicated_keys)
97     Traceback (most recent call last):
98     ...
99     Exception: Key b is duplicated in more than one input dict.
100
101     """
102     out: Dict[Any, Any] = {}
103     for d in inputs:
104         for key in d:
105             if key in out:
106                 value = aggregation_function(key, d[key], out[key])
107             else:
108                 value = d[key]
109             out[key] = value
110     return out
111
112
113 def item_with_max_value(d: Dict[Any, Any]) -> Tuple[Any, Any]:
114     """Returns the key and value with the max value in a dict.
115
116     >>> d = {'a': 1, 'b': 2, 'c': 3}
117     >>> item_with_max_value(d)
118     ('c', 3)
119     >>> item_with_max_value({})
120     Traceback (most recent call last):
121     ...
122     ValueError: max() arg is an empty sequence
123
124     """
125     return max(d.items(), key=lambda _: _[1])
126
127
128 def item_with_min_value(d: Dict[Any, Any]) -> Tuple[Any, Any]:
129     """Returns the key and value with the min value in a dict.
130
131     >>> d = {'a': 1, 'b': 2, 'c': 3}
132     >>> item_with_min_value(d)
133     ('a', 1)
134
135     """
136     return min(d.items(), key=lambda _: _[1])
137
138
139 def key_with_max_value(d: Dict[Any, Any]) -> Any:
140     """Returns the key with the max value in the dict.
141
142     >>> d = {'a': 1, 'b': 2, 'c': 3}
143     >>> key_with_max_value(d)
144     'c'
145
146     """
147     return item_with_max_value(d)[0]
148
149
150 def key_with_min_value(d: Dict[Any, Any]) -> Any:
151     """Returns the key with the min value in the dict.
152
153     >>> d = {'a': 1, 'b': 2, 'c': 3}
154     >>> key_with_min_value(d)
155     'a'
156
157     """
158     return item_with_min_value(d)[0]
159
160
161 def max_value(d: Dict[Any, Any]) -> Any:
162     """Returns the maximum value in the dict.
163
164     >>> d = {'a': 1, 'b': 2, 'c': 3}
165     >>> max_value(d)
166     3
167
168     """
169     return item_with_max_value(d)[1]
170
171
172 def min_value(d: Dict[Any, Any]) -> Any:
173     """Returns the minimum value in the dict.
174
175     >>> d = {'a': 1, 'b': 2, 'c': 3}
176     >>> min_value(d)
177     1
178
179     """
180     return item_with_min_value(d)[1]
181
182
183 def max_key(d: Dict[Any, Any]) -> Any:
184     """Returns the maximum key in dict (ignoring values totally)
185
186     >>> d = {'a': 3, 'b': 2, 'c': 1}
187     >>> max_key(d)
188     'c'
189
190     """
191     return max(d.keys())
192
193
194 def min_key(d: Dict[Any, Any]) -> Any:
195     """Returns the minimum key in dict (ignoring values totally)
196
197     >>> d = {'a': 3, 'b': 2, 'c': 1}
198     >>> min_key(d)
199     'a'
200
201     """
202     return min(d.keys())
203
204
205 def parallel_lists_to_dict(keys: List[Any], values: List[Any]) -> Dict[Any, Any]:
206     """Given two parallel lists (keys and values), create and return
207     a dict.
208
209     >>> k = ['name', 'phone', 'address', 'zip']
210     >>> v = ['scott', '555-1212', '123 main st.', '12345']
211     >>> parallel_lists_to_dict(k, v)
212     {'name': 'scott', 'phone': '555-1212', 'address': '123 main st.', 'zip': '12345'}
213
214     """
215     if len(keys) != len(values):
216         raise Exception("Parallel keys and values lists must have the same length")
217     return dict(zip(keys, values))
218
219
220 def dict_to_key_value_lists(d: Dict[Any, Any]) -> Tuple[List[Any], List[Any]]:
221     """
222     >>> d = {'name': 'scott', 'phone': '555-1212', 'address': '123 main st.', 'zip': '12345'}
223     >>> (k, v) = dict_to_key_value_lists(d)
224     >>> k
225     ['name', 'phone', 'address', 'zip']
226     >>> v
227     ['scott', '555-1212', '123 main st.', '12345']
228
229     """
230     r: Tuple[List[Any], List[Any]] = ([], [])
231     for (k, v) in d.items():
232         r[0].append(k)
233         r[1].append(v)
234     return r
235
236
237 if __name__ == '__main__':
238     import doctest
239
240     doctest.testmod()