More cleanup.
[python_utils.git] / dict_utils.py
1 #!/usr/bin/env python3
2
3 """Helper functions for dealing with dictionaries."""
4
5 from itertools import islice
6 from typing import Any, Callable, Dict, Iterator, List, Tuple
7
8
9 def init_or_inc(
10     d: Dict[Any, Any],
11     key: Any,
12     *,
13     init_value: Any = 1,
14     inc_function: Callable[..., Any] = lambda x: x + 1,
15 ) -> bool:
16     """
17     Initialize a dict value (if it doesn't exist) or increments it (using the
18     inc_function, which is customizable) if it already does exist.  Returns
19     True if the key already existed or False otherwise.
20
21     >>> d = {}
22     >>> init_or_inc(d, "test")
23     False
24     >>> init_or_inc(d, "test")
25     True
26     >>> init_or_inc(d, 'ing')
27     False
28     >>> d
29     {'test': 2, 'ing': 1}
30
31     """
32     if key in d.keys():
33         d[key] = inc_function(d[key])
34         return True
35     d[key] = init_value
36     return False
37
38
39 def shard(d: Dict[Any, Any], size: int) -> Iterator[Dict[Any, Any]]:
40     """
41     Shards a dict into N subdicts which, together, contain all keys/values
42     from the original unsharded dict.
43
44     """
45     items = d.items()
46     for x in range(0, len(d), size):
47         yield dict(islice(items, x, x + size))
48
49
50 def coalesce_by_creating_list(_, new_value, old_value):
51     from list_utils import flatten
52
53     return flatten([new_value, old_value])
54
55
56 def coalesce_by_creating_set(key, new_value, old_value):
57     return set(coalesce_by_creating_list(key, new_value, old_value))
58
59
60 def coalesce_last_write_wins(_, new_value, discarded_old_value):
61     return new_value
62
63
64 def coalesce_first_write_wins(_, discarded_new_value, old_value):
65     return old_value
66
67
68 def raise_on_duplicated_keys(key, new_value, old_value):
69     raise Exception(f'Key {key} is duplicated in more than one input dict.')
70
71
72 def coalesce(
73     inputs: Iterator[Dict[Any, Any]],
74     *,
75     aggregation_function: Callable[[Any, Any, Any], Any] = coalesce_by_creating_list,
76 ) -> Dict[Any, Any]:
77     """Merge N dicts into one dict containing the union of all keys /
78     values in the input dicts.  When keys collide, apply the
79     aggregation_function which, by default, creates a list of values.
80     See also several other alternative functions for coalescing values
81     (coalesce_by_creating_set, coalesce_first_write_wins,
82     coalesce_last_write_wins, raise_on_duplicated_keys) or provide a
83     custom helper function.
84
85     >>> a = {'a': 1, 'b': 2}
86     >>> b = {'b': 1, 'c': 2, 'd': 3}
87     >>> c = {'c': 1, 'd': 2}
88     >>> coalesce([a, b, c])
89     {'a': 1, 'b': [1, 2], 'c': [1, 2], 'd': [2, 3]}
90
91     >>> coalesce([a, b, c], aggregation_function=coalesce_last_write_wins)
92     {'a': 1, 'b': 1, 'c': 1, 'd': 2}
93
94     >>> coalesce([a, b, c], aggregation_function=raise_on_duplicated_keys)
95     Traceback (most recent call last):
96     ...
97     Exception: Key b is duplicated in more than one input dict.
98
99     """
100     out: Dict[Any, Any] = {}
101     for d in inputs:
102         for key in d:
103             if key in out:
104                 value = aggregation_function(key, d[key], out[key])
105             else:
106                 value = d[key]
107             out[key] = value
108     return out
109
110
111 def item_with_max_value(d: Dict[Any, Any]) -> Tuple[Any, Any]:
112     """Returns the key and value with the max value in a dict.
113
114     >>> d = {'a': 1, 'b': 2, 'c': 3}
115     >>> item_with_max_value(d)
116     ('c', 3)
117     >>> item_with_max_value({})
118     Traceback (most recent call last):
119     ...
120     ValueError: max() arg is an empty sequence
121
122     """
123     return max(d.items(), key=lambda _: _[1])
124
125
126 def item_with_min_value(d: Dict[Any, Any]) -> Tuple[Any, Any]:
127     """Returns the key and value with the min value in a dict.
128
129     >>> d = {'a': 1, 'b': 2, 'c': 3}
130     >>> item_with_min_value(d)
131     ('a', 1)
132
133     """
134     return min(d.items(), key=lambda _: _[1])
135
136
137 def key_with_max_value(d: Dict[Any, Any]) -> Any:
138     """Returns the key with the max value in the dict.
139
140     >>> d = {'a': 1, 'b': 2, 'c': 3}
141     >>> key_with_max_value(d)
142     'c'
143
144     """
145     return item_with_max_value(d)[0]
146
147
148 def key_with_min_value(d: Dict[Any, Any]) -> Any:
149     """Returns the key with the min value in the dict.
150
151     >>> d = {'a': 1, 'b': 2, 'c': 3}
152     >>> key_with_min_value(d)
153     'a'
154
155     """
156     return item_with_min_value(d)[0]
157
158
159 def max_value(d: Dict[Any, Any]) -> Any:
160     """Returns the maximum value in the dict.
161
162     >>> d = {'a': 1, 'b': 2, 'c': 3}
163     >>> max_value(d)
164     3
165
166     """
167     return item_with_max_value(d)[1]
168
169
170 def min_value(d: Dict[Any, Any]) -> Any:
171     """Returns the minimum value in the dict.
172
173     >>> d = {'a': 1, 'b': 2, 'c': 3}
174     >>> min_value(d)
175     1
176
177     """
178     return item_with_min_value(d)[1]
179
180
181 def max_key(d: Dict[Any, Any]) -> Any:
182     """Returns the maximum key in dict (ignoring values totally)
183
184     >>> d = {'a': 3, 'b': 2, 'c': 1}
185     >>> max_key(d)
186     'c'
187
188     """
189     return max(d.keys())
190
191
192 def min_key(d: Dict[Any, Any]) -> Any:
193     """Returns the minimum key in dict (ignoring values totally)
194
195     >>> d = {'a': 3, 'b': 2, 'c': 1}
196     >>> min_key(d)
197     'a'
198
199     """
200     return min(d.keys())
201
202
203 def parallel_lists_to_dict(keys: List[Any], values: List[Any]) -> Dict[Any, Any]:
204     """Given two parallel lists (keys and values), create and return
205     a dict.
206
207     >>> k = ['name', 'phone', 'address', 'zip']
208     >>> v = ['scott', '555-1212', '123 main st.', '12345']
209     >>> parallel_lists_to_dict(k, v)
210     {'name': 'scott', 'phone': '555-1212', 'address': '123 main st.', 'zip': '12345'}
211
212     """
213     if len(keys) != len(values):
214         raise Exception("Parallel keys and values lists must have the same length")
215     return dict(zip(keys, values))
216
217
218 def dict_to_key_value_lists(d: Dict[Any, Any]) -> Tuple[List[Any], List[Any]]:
219     """
220     >>> d = {'name': 'scott', 'phone': '555-1212', 'address': '123 main st.', 'zip': '12345'}
221     >>> (k, v) = dict_to_key_value_lists(d)
222     >>> k
223     ['name', 'phone', 'address', 'zip']
224     >>> v
225     ['scott', '555-1212', '123 main st.', '12345']
226
227     """
228     r: Tuple[List[Any], List[Any]] = ([], [])
229     for (k, v) in d.items():
230         r[0].append(k)
231         r[1].append(v)
232     return r
233
234
235 if __name__ == '__main__':
236     import doctest
237
238     doctest.testmod()