More type annotations.
[python_utils.git] / dict_utils.py
1 #!/usr/bin/env python3
2
3 from itertools import islice
4 from typing import Any, Callable, Dict, Iterator, List, Tuple
5
6
7 def init_or_inc(
8     d: Dict[Any, Any],
9     key: Any,
10     *,
11     init_value: Any = 1,
12     inc_function: Callable[..., Any] = lambda x: x + 1,
13 ) -> bool:
14     """
15     Initialize a dict value (if it doesn't exist) or increments it (using the
16     inc_function, which is customizable) if it already does exist.  Returns
17     True if the key already existed or False otherwise.
18
19     >>> d = {}
20     >>> init_or_inc(d, "test")
21     False
22     >>> init_or_inc(d, "test")
23     True
24     >>> init_or_inc(d, 'ing')
25     False
26     >>> d
27     {'test': 2, 'ing': 1}
28
29     """
30     if key in d.keys():
31         d[key] = inc_function(d[key])
32         return True
33     d[key] = init_value
34     return False
35
36
37 def shard(d: Dict[Any, Any], size: int) -> Iterator[Dict[Any, Any]]:
38     """
39     Shards a dict into N subdicts which, together, contain all keys/values
40     from the original unsharded dict.
41
42     """
43     items = d.items()
44     for x in range(0, len(d), size):
45         yield {key: value for (key, value) in islice(items, x, x + size)}
46
47
48 def coalesce_by_creating_list(key, new_value, old_value):
49     from list_utils import flatten
50
51     return flatten([new_value, old_value])
52
53
54 def coalesce_by_creating_set(key, new_value, old_value):
55     return set(coalesce_by_creating_list(key, new_value, old_value))
56
57
58 def coalesce_last_write_wins(key, new_value, old_value):
59     return new_value
60
61
62 def coalesce_first_write_wins(key, new_value, old_value):
63     return old_value
64
65
66 def raise_on_duplicated_keys(key, new_value, old_value):
67     raise Exception(f'Key {key} is duplicated in more than one input dict.')
68
69
70 def coalesce(
71     inputs: Iterator[Dict[Any, Any]],
72     *,
73     aggregation_function: Callable[[Any, Any, Any], Any] = coalesce_by_creating_list,
74 ) -> Dict[Any, Any]:
75     """Merge N dicts into one dict containing the union of all keys /
76     values in the input dicts.  When keys collide, apply the
77     aggregation_function which, by default, creates a list of values.
78     See also several other alternative functions for coalescing values
79     (coalesce_by_creating_set, coalesce_first_write_wins,
80     coalesce_last_write_wins, raise_on_duplicated_keys) or provide a
81     custom helper function.
82
83     >>> a = {'a': 1, 'b': 2}
84     >>> b = {'b': 1, 'c': 2, 'd': 3}
85     >>> c = {'c': 1, 'd': 2}
86     >>> coalesce([a, b, c])
87     {'a': 1, 'b': [1, 2], 'c': [1, 2], 'd': [2, 3]}
88
89     >>> coalesce([a, b, c], aggregation_function=coalesce_last_write_wins)
90     {'a': 1, 'b': 1, 'c': 1, 'd': 2}
91
92     >>> coalesce([a, b, c], aggregation_function=raise_on_duplicated_keys)
93     Traceback (most recent call last):
94     ...
95     Exception: Key b is duplicated in more than one input dict.
96
97     """
98     out: Dict[Any, Any] = {}
99     for d in inputs:
100         for key in d:
101             if key in out:
102                 value = aggregation_function(key, d[key], out[key])
103             else:
104                 value = d[key]
105             out[key] = value
106     return out
107
108
109 def item_with_max_value(d: Dict[Any, Any]) -> Tuple[Any, Any]:
110     """Returns the key and value with the max value in a dict.
111
112     >>> d = {'a': 1, 'b': 2, 'c': 3}
113     >>> item_with_max_value(d)
114     ('c', 3)
115     >>> item_with_max_value({})
116     Traceback (most recent call last):
117     ...
118     ValueError: max() arg is an empty sequence
119
120     """
121     return max(d.items(), key=lambda _: _[1])
122
123
124 def item_with_min_value(d: Dict[Any, Any]) -> Tuple[Any, Any]:
125     """Returns the key and value with the min value in a dict.
126
127     >>> d = {'a': 1, 'b': 2, 'c': 3}
128     >>> item_with_min_value(d)
129     ('a', 1)
130
131     """
132     return min(d.items(), key=lambda _: _[1])
133
134
135 def key_with_max_value(d: Dict[Any, Any]) -> Any:
136     """Returns the key with the max value in the dict.
137
138     >>> d = {'a': 1, 'b': 2, 'c': 3}
139     >>> key_with_max_value(d)
140     'c'
141
142     """
143     return item_with_max_value(d)[0]
144
145
146 def key_with_min_value(d: Dict[Any, Any]) -> Any:
147     """Returns the key with the min value in the dict.
148
149     >>> d = {'a': 1, 'b': 2, 'c': 3}
150     >>> key_with_min_value(d)
151     'a'
152
153     """
154     return item_with_min_value(d)[0]
155
156
157 def max_value(d: Dict[Any, Any]) -> Any:
158     """Returns the maximum value in the dict.
159
160     >>> d = {'a': 1, 'b': 2, 'c': 3}
161     >>> max_value(d)
162     3
163
164     """
165     return item_with_max_value(d)[1]
166
167
168 def min_value(d: Dict[Any, Any]) -> Any:
169     """Returns the minimum value in the dict.
170
171     >>> d = {'a': 1, 'b': 2, 'c': 3}
172     >>> min_value(d)
173     1
174
175     """
176     return item_with_min_value(d)[1]
177
178
179 def max_key(d: Dict[Any, Any]) -> Any:
180     """Returns the maximum key in dict (ignoring values totally)
181
182     >>> d = {'a': 3, 'b': 2, 'c': 1}
183     >>> max_key(d)
184     'c'
185
186     """
187     return max(d.keys())
188
189
190 def min_key(d: Dict[Any, Any]) -> Any:
191     """Returns the minimum key in dict (ignoring values totally)
192
193     >>> d = {'a': 3, 'b': 2, 'c': 1}
194     >>> min_key(d)
195     'a'
196
197     """
198     return min(d.keys())
199
200
201 def parallel_lists_to_dict(keys: List[Any], values: List[Any]) -> Dict[Any, Any]:
202     """Given two parallel lists (keys and values), create and return
203     a dict.
204
205     >>> k = ['name', 'phone', 'address', 'zip']
206     >>> v = ['scott', '555-1212', '123 main st.', '12345']
207     >>> parallel_lists_to_dict(k, v)
208     {'name': 'scott', 'phone': '555-1212', 'address': '123 main st.', 'zip': '12345'}
209
210     """
211     if len(keys) != len(values):
212         raise Exception("Parallel keys and values lists must have the same length")
213     return dict(zip(keys, values))
214
215
216 def dict_to_key_value_lists(d: Dict[Any, Any]) -> Tuple[List[Any], List[Any]]:
217     """
218     >>> d = {'name': 'scott', 'phone': '555-1212', 'address': '123 main st.', 'zip': '12345'}
219     >>> (k, v) = dict_to_key_value_lists(d)
220     >>> k
221     ['name', 'phone', 'address', 'zip']
222     >>> v
223     ['scott', '555-1212', '123 main st.', '12345']
224
225     """
226     r: Tuple[List[Any], List[Any]] = ([], [])
227     for (k, v) in d.items():
228         r[0].append(k)
229         r[1].append(v)
230     return r
231
232
233 if __name__ == '__main__':
234     import doctest
235
236     doctest.testmod()