X-Git-Url: https://wannabe.guru.org/gitweb/?a=blobdiff_plain;f=dict_utils.py;h=79c86edf286f2c9ea9983906385365199be74892;hb=36fea7f15ed17150691b5b3ead75450e575229ef;hp=29a5cd0c5b10fb84a61d078698109b8ebd31ffb6;hpb=497fb9e21f45ec08e1486abaee6dfa7b20b8a691;p=python_utils.git diff --git a/dict_utils.py b/dict_utils.py index 29a5cd0..79c86ed 100644 --- a/dict_utils.py +++ b/dict_utils.py @@ -1,7 +1,7 @@ #!/usr/bin/env python3 from itertools import islice -from typing import Any, Callable, Dict, Iterator +from typing import Any, Callable, Dict, Iterator, List, Tuple def init_or_inc( @@ -9,8 +9,24 @@ def init_or_inc( key: Any, *, init_value: Any = 1, - inc_function: Callable[..., Any] = lambda x: x + 1 + inc_function: Callable[..., Any] = lambda x: x + 1, ) -> bool: + """ + Initialize a dict value (if it doesn't exist) or increments it (using the + inc_function, which is customizable) if it already does exist. Returns + True if the key already existed or False otherwise. + + >>> d = {} + >>> init_or_inc(d, "test") + False + >>> init_or_inc(d, "test") + True + >>> init_or_inc(d, 'ing') + False + >>> d + {'test': 2, 'ing': 1} + + """ if key in d.keys(): d[key] = inc_function(d[key]) return True @@ -19,54 +35,206 @@ def init_or_inc( def shard(d: Dict[Any, Any], size: int) -> Iterator[Dict[Any, Any]]: + """ + Shards a dict into N subdicts which, together, contain all keys/values + from the original unsharded dict. + + """ items = d.items() for x in range(0, len(d), size): yield {key: value for (key, value) in islice(items, x, x + size)} -def item_with_max_value(d: Dict[Any, Any]) -> Any: +def coalesce_by_creating_list(key, new_value, old_value): + from list_utils import flatten + + return flatten([new_value, old_value]) + + +def coalesce_by_creating_set(key, new_value, old_value): + return set(coalesce_by_creating_list(key, new_value, old_value)) + + +def coalesce_last_write_wins(key, new_value, old_value): + return new_value + + +def coalesce_first_write_wins(key, new_value, old_value): + return old_value + + +def raise_on_duplicated_keys(key, new_value, old_value): + raise Exception(f'Key {key} is duplicated in more than one input dict.') + + +def coalesce( + inputs: Iterator[Dict[Any, Any]], + *, + aggregation_function: Callable[[Any, Any], Any] = coalesce_by_creating_list, +) -> Dict[Any, Any]: + """Merge N dicts into one dict containing the union of all keys / + values in the input dicts. When keys collide, apply the + aggregation_function which, by default, creates a list of values. + See also several other alternative functions for coalescing values + (coalesce_by_creating_set, coalesce_first_write_wins, + coalesce_last_write_wins, raise_on_duplicated_keys) or provide a + custom helper function. + + >>> a = {'a': 1, 'b': 2} + >>> b = {'b': 1, 'c': 2, 'd': 3} + >>> c = {'c': 1, 'd': 2} + >>> coalesce([a, b, c]) + {'a': 1, 'b': [1, 2], 'c': [1, 2], 'd': [2, 3]} + + >>> coalesce([a, b, c], aggregation_function=coalesce_last_write_wins) + {'a': 1, 'b': 1, 'c': 1, 'd': 2} + + >>> coalesce([a, b, c], aggregation_function=raise_on_duplicated_keys) + Traceback (most recent call last): + ... + Exception: Key b is duplicated in more than one input dict. + + """ + out: Dict[Any, Any] = {} + for d in inputs: + for key in d: + if key in out: + value = aggregation_function(key, d[key], out[key]) + else: + value = d[key] + out[key] = value + return out + + +def item_with_max_value(d: Dict[Any, Any]) -> Tuple[Any, Any]: + """Returns the key and value with the max value in a dict. + + >>> d = {'a': 1, 'b': 2, 'c': 3} + >>> item_with_max_value(d) + ('c', 3) + >>> item_with_max_value({}) + Traceback (most recent call last): + ... + ValueError: max() arg is an empty sequence + + """ return max(d.items(), key=lambda _: _[1]) -def item_with_min_value(d: Dict[Any, Any]) -> Any: +def item_with_min_value(d: Dict[Any, Any]) -> Tuple[Any, Any]: + """Returns the key and value with the min value in a dict. + + >>> d = {'a': 1, 'b': 2, 'c': 3} + >>> item_with_min_value(d) + ('a', 1) + + """ return min(d.items(), key=lambda _: _[1]) def key_with_max_value(d: Dict[Any, Any]) -> Any: + """Returns the key with the max value in the dict. + + >>> d = {'a': 1, 'b': 2, 'c': 3} + >>> key_with_max_value(d) + 'c' + + """ return item_with_max_value(d)[0] def key_with_min_value(d: Dict[Any, Any]) -> Any: + """Returns the key with the min value in the dict. + + >>> d = {'a': 1, 'b': 2, 'c': 3} + >>> key_with_min_value(d) + 'a' + + """ return item_with_min_value(d)[0] def max_value(d: Dict[Any, Any]) -> Any: + """Returns the maximum value in the dict. + + >>> d = {'a': 1, 'b': 2, 'c': 3} + >>> max_value(d) + 3 + + """ return item_with_max_value(d)[1] def min_value(d: Dict[Any, Any]) -> Any: + """Returns the minimum value in the dict. + + >>> d = {'a': 1, 'b': 2, 'c': 3} + >>> min_value(d) + 1 + + """ return item_with_min_value(d)[1] def max_key(d: Dict[Any, Any]) -> Any: + """Returns the maximum key in dict (ignoring values totally) + + >>> d = {'a': 3, 'b': 2, 'c': 1} + >>> max_key(d) + 'c' + + """ return max(d.keys()) def min_key(d: Dict[Any, Any]) -> Any: + """Returns the minimum key in dict (ignoring values totally) + + >>> d = {'a': 3, 'b': 2, 'c': 1} + >>> min_key(d) + 'a' + + """ return min(d.keys()) -def merge(a: Dict[Any, Any], b: Dict[Any, Any], path=None) -> Dict[Any, Any]: - if path is None: - path = [] - for key in b: - if key in a: - if isinstance(a[key], dict) and isinstance(b[key], dict): - merge(a[key], b[key], path + [str(key)]) - elif a[key] == b[key]: - pass - else: - raise Exception("Conflict at %s" % ".".join(path + [str(key)])) - else: - a[key] = b[key] - return a +def parallel_lists_to_dict( + keys: List[Any], values: List[Any] +) -> Dict[Any, Any]: + """Given two parallel lists (keys and values), create and return + a dict. + + >>> k = ['name', 'phone', 'address', 'zip'] + >>> v = ['scott', '555-1212', '123 main st.', '12345'] + >>> parallel_lists_to_dict(k, v) + {'name': 'scott', 'phone': '555-1212', 'address': '123 main st.', 'zip': '12345'} + + """ + if len(keys) != len(values): + raise Exception( + "Parallel keys and values lists must have the same length" + ) + return dict(zip(keys, values)) + + +def dict_to_key_value_lists(d: Dict[Any, Any]) -> Tuple[List[Any], List[Any]]: + """ + >>> d = {'name': 'scott', 'phone': '555-1212', 'address': '123 main st.', 'zip': '12345'} + >>> (k, v) = dict_to_key_value_lists(d) + >>> k + ['name', 'phone', 'address', 'zip'] + >>> v + ['scott', '555-1212', '123 main st.', '12345'] + + """ + r = ([], []) + for (k, v) in d.items(): + r[0].append(k) + r[1].append(v) + return r + + +if __name__ == '__main__': + import doctest + + doctest.testmod()