Reformatting.
[python_utils.git] / dict_utils.py
1 #!/usr/bin/env python3
2
3 from itertools import islice
4 from typing import Any, Callable, Dict, Iterator, List, Tuple
5
6 def init_or_inc(
7     d: Dict[Any, Any],
8     key: Any,
9     *,
10     init_value: Any = 1,
11     inc_function: Callable[..., Any] = lambda x: x + 1
12 ) -> bool:
13     """
14     Initialize a dict value (if it doesn't exist) or increments it (using the
15     inc_function, which is customizable) if it already does exist.  Returns
16     True if the key already existed or False otherwise.
17
18     >>> d = {}
19     >>> init_or_inc(d, "test")
20     False
21     >>> init_or_inc(d, "test")
22     True
23     >>> init_or_inc(d, 'ing')
24     False
25     >>> d
26     {'test': 2, 'ing': 1}
27
28     """
29     if key in d.keys():
30         d[key] = inc_function(d[key])
31         return True
32     d[key] = init_value
33     return False
34
35
36 def shard(d: Dict[Any, Any], size: int) -> Iterator[Dict[Any, Any]]:
37     """
38     Shards a dict into N subdicts which, together, contain all keys/values
39     from the original unsharded dict.
40     """
41     items = d.items()
42     for x in range(0, len(d), size):
43         yield {key: value for (key, value) in islice(items, x, x + size)}
44
45
46 def coalesce_by_creating_list(key, new_value, old_value):
47     from list_utils import flatten
48     return flatten([new_value, old_value])
49
50
51 def coalesce_by_creating_set(key, new_value, old_value):
52     return set(coalesce_by_creating_list(key, new_value, old_value))
53
54
55 def coalesce_last_write_wins(key, new_value, old_value):
56     return new_value
57
58
59 def coalesce_first_write_wins(key, new_value, old_value):
60     return old_value
61
62
63 def raise_on_duplicated_keys(key, new_value, old_value):
64     raise Exception(f'Key {key} is duplicated in more than one input dict.')
65
66
67 def coalesce(
68         inputs: Iterator[Dict[Any, Any]],
69         *,
70         aggregation_function: Callable[[Any, Any], Any] = coalesce_by_creating_list
71 ) -> Dict[Any, Any]:
72     """Merge N dicts into one dict containing the union of all keys /
73     values in the input dicts.  When keys collide, apply the
74     aggregation_function which, by default, creates a list of values.
75     See also several other alternative functions for coalescing values
76     (coalesce_by_creating_set, coalesce_first_write_wins,
77     coalesce_last_write_wins, raise_on_duplicated_keys) or provide a
78     custom helper function.
79
80     >>> a = {'a': 1, 'b': 2}
81     >>> b = {'b': 1, 'c': 2, 'd': 3}
82     >>> c = {'c': 1, 'd': 2}
83     >>> coalesce([a, b, c])
84     {'a': 1, 'b': [1, 2], 'c': [1, 2], 'd': [2, 3]}
85
86     >>> coalesce([a, b, c], aggregation_function=coalesce_last_write_wins)
87     {'a': 1, 'b': 1, 'c': 1, 'd': 2}
88
89     >>> coalesce([a, b, c], aggregation_function=raise_on_duplicated_keys)
90     Traceback (most recent call last):
91     ...
92     Exception: Key b is duplicated in more than one input dict.
93
94     """
95     out: Dict[Any, Any] = {}
96     for d in inputs:
97         for key in d:
98             if key in out:
99                 value = aggregation_function(key, d[key], out[key])
100             else:
101                 value = d[key]
102             out[key] = value
103     return out
104
105
106 def item_with_max_value(d: Dict[Any, Any]) -> Tuple[Any, Any]:
107     """Returns the key and value with the max value in a dict.
108
109     >>> d = {'a': 1, 'b': 2, 'c': 3}
110     >>> item_with_max_value(d)
111     ('c', 3)
112     >>> item_with_max_value({})
113     Traceback (most recent call last):
114     ...
115     ValueError: max() arg is an empty sequence
116
117     """
118     return max(d.items(), key=lambda _: _[1])
119
120
121 def item_with_min_value(d: Dict[Any, Any]) -> Tuple[Any, Any]:
122     """Returns the key and value with the min value in a dict.
123
124     >>> d = {'a': 1, 'b': 2, 'c': 3}
125     >>> item_with_min_value(d)
126     ('a', 1)
127
128     """
129     return min(d.items(), key=lambda _: _[1])
130
131
132 def key_with_max_value(d: Dict[Any, Any]) -> Any:
133     """Returns the key with the max value in the dict.
134
135     >>> d = {'a': 1, 'b': 2, 'c': 3}
136     >>> key_with_max_value(d)
137     'c'
138
139     """
140     return item_with_max_value(d)[0]
141
142
143 def key_with_min_value(d: Dict[Any, Any]) -> Any:
144     """Returns the key with the min value in the dict.
145
146     >>> d = {'a': 1, 'b': 2, 'c': 3}
147     >>> key_with_min_value(d)
148     'a'
149
150     """
151     return item_with_min_value(d)[0]
152
153
154 def max_value(d: Dict[Any, Any]) -> Any:
155     """Returns the maximum value in the dict.
156
157     >>> d = {'a': 1, 'b': 2, 'c': 3}
158     >>> max_value(d)
159     3
160
161     """
162     return item_with_max_value(d)[1]
163
164
165 def min_value(d: Dict[Any, Any]) -> Any:
166     """Returns the minimum value in the dict.
167
168     >>> d = {'a': 1, 'b': 2, 'c': 3}
169     >>> min_value(d)
170     1
171
172     """
173     return item_with_min_value(d)[1]
174
175
176 def max_key(d: Dict[Any, Any]) -> Any:
177     """Returns the maximum key in dict (ignoring values totally)
178
179     >>> d = {'a': 3, 'b': 2, 'c': 1}
180     >>> max_key(d)
181     'c'
182
183     """
184     return max(d.keys())
185
186
187 def min_key(d: Dict[Any, Any]) -> Any:
188     """Returns the minimum key in dict (ignoring values totally)
189
190     >>> d = {'a': 3, 'b': 2, 'c': 1}
191     >>> min_key(d)
192     'a'
193
194     """
195     return min(d.keys())
196
197
198 def parallel_lists_to_dict(keys: List[Any], values: List[Any]) -> Dict[Any, Any]:
199     """Given two parallel lists (keys and values), create and return
200     a dict.
201
202     >>> k = ['name', 'phone', 'address', 'zip']
203     >>> v = ['scott', '555-1212', '123 main st.', '12345']
204     >>> parallel_lists_to_dict(k, v)
205     {'name': 'scott', 'phone': '555-1212', 'address': '123 main st.', 'zip': '12345'}
206
207     """
208     if len(keys) != len(values):
209         raise Exception("Parallel keys and values lists must have the same length")
210     return dict(zip(keys, values))
211
212
213 def dict_to_key_value_lists(d: Dict[Any, Any]) -> Tuple[List[Any], List[Any]]:
214     """
215     >>> d = {'name': 'scott', 'phone': '555-1212', 'address': '123 main st.', 'zip': '12345'}
216     >>> (k, v) = dict_to_key_value_lists(d)
217     >>> k
218     ['name', 'phone', 'address', 'zip']
219     >>> v
220     ['scott', '555-1212', '123 main st.', '12345']
221
222     """
223     r = ([], [])
224     for (k, v) in d.items():
225         r[0].append(k)
226         r[1].append(v)
227     return r
228
229
230 if __name__ == '__main__':
231     import doctest
232     doctest.testmod()