Ran black code formatter on everything.
[python_utils.git] / dict_utils.py
index 6dd79f3e5b7618b93f12eb75dbae41483041b44b..79c86edf286f2c9ea9983906385365199be74892 100644 (file)
@@ -1,7 +1,7 @@
 #!/usr/bin/env python3
 
 from itertools import islice
-from typing import Any, Callable, Dict, Iterator, Tuple
+from typing import Any, Callable, Dict, Iterator, List, Tuple
 
 
 def init_or_inc(
@@ -9,7 +9,7 @@ def init_or_inc(
     key: Any,
     *,
     init_value: Any = 1,
-    inc_function: Callable[..., Any] = lambda x: x + 1
+    inc_function: Callable[..., Any] = lambda x: x + 1,
 ) -> bool:
     """
     Initialize a dict value (if it doesn't exist) or increments it (using the
@@ -25,6 +25,7 @@ def init_or_inc(
     False
     >>> d
     {'test': 2, 'ing': 1}
+
     """
     if key in d.keys():
         d[key] = inc_function(d[key])
@@ -37,40 +38,62 @@ def shard(d: Dict[Any, Any], size: int) -> Iterator[Dict[Any, Any]]:
     """
     Shards a dict into N subdicts which, together, contain all keys/values
     from the original unsharded dict.
+
     """
     items = d.items()
     for x in range(0, len(d), size):
         yield {key: value for (key, value) in islice(items, x, x + size)}
 
 
-def coalesce_by_creating_list(key, v1, v2):
+def coalesce_by_creating_list(key, new_value, old_value):
     from list_utils import flatten
-    return flatten([v1, v2])
+
+    return flatten([new_value, old_value])
+
+
+def coalesce_by_creating_set(key, new_value, old_value):
+    return set(coalesce_by_creating_list(key, new_value, old_value))
 
 
-def coalesce_by_creating_set(key, v1, v2):
-    return set(coalesce_by_creating_list(key, v1, v2))
+def coalesce_last_write_wins(key, new_value, old_value):
+    return new_value
 
 
-def raise_on_duplicated_keys(key, v1, v2):
+def coalesce_first_write_wins(key, new_value, old_value):
+    return old_value
+
+
+def raise_on_duplicated_keys(key, new_value, old_value):
     raise Exception(f'Key {key} is duplicated in more than one input dict.')
 
 
 def coalesce(
-        inputs: Iterator[Dict[Any, Any]],
-        *,
-        aggregation_function: Callable[[Any, Any], Any] = coalesce_by_creating_list
+    inputs: Iterator[Dict[Any, Any]],
+    *,
+    aggregation_function: Callable[[Any, Any], Any] = coalesce_by_creating_list,
 ) -> Dict[Any, Any]:
-    """Merge N dicts into one dict containing the union of all keys/values in
-    the input dicts.  When keys collide, apply the aggregation_function which,
-    by default, creates a list of values.  See also coalesce_by_creating_set or
-    provide a user defined aggregation_function.
+    """Merge N dicts into one dict containing the union of all keys /
+    values in the input dicts.  When keys collide, apply the
+    aggregation_function which, by default, creates a list of values.
+    See also several other alternative functions for coalescing values
+    (coalesce_by_creating_set, coalesce_first_write_wins,
+    coalesce_last_write_wins, raise_on_duplicated_keys) or provide a
+    custom helper function.
 
     >>> a = {'a': 1, 'b': 2}
     >>> b = {'b': 1, 'c': 2, 'd': 3}
     >>> c = {'c': 1, 'd': 2}
     >>> coalesce([a, b, c])
     {'a': 1, 'b': [1, 2], 'c': [1, 2], 'd': [2, 3]}
+
+    >>> coalesce([a, b, c], aggregation_function=coalesce_last_write_wins)
+    {'a': 1, 'b': 1, 'c': 1, 'd': 2}
+
+    >>> coalesce([a, b, c], aggregation_function=raise_on_duplicated_keys)
+    Traceback (most recent call last):
+    ...
+    Exception: Key b is duplicated in more than one input dict.
+
     """
     out: Dict[Any, Any] = {}
     for d in inputs:
@@ -93,6 +116,7 @@ def item_with_max_value(d: Dict[Any, Any]) -> Tuple[Any, Any]:
     Traceback (most recent call last):
     ...
     ValueError: max() arg is an empty sequence
+
     """
     return max(d.items(), key=lambda _: _[1])
 
@@ -103,6 +127,7 @@ def item_with_min_value(d: Dict[Any, Any]) -> Tuple[Any, Any]:
     >>> d = {'a': 1, 'b': 2, 'c': 3}
     >>> item_with_min_value(d)
     ('a', 1)
+
     """
     return min(d.items(), key=lambda _: _[1])
 
@@ -113,6 +138,7 @@ def key_with_max_value(d: Dict[Any, Any]) -> Any:
     >>> d = {'a': 1, 'b': 2, 'c': 3}
     >>> key_with_max_value(d)
     'c'
+
     """
     return item_with_max_value(d)[0]
 
@@ -123,6 +149,7 @@ def key_with_min_value(d: Dict[Any, Any]) -> Any:
     >>> d = {'a': 1, 'b': 2, 'c': 3}
     >>> key_with_min_value(d)
     'a'
+
     """
     return item_with_min_value(d)[0]
 
@@ -133,6 +160,7 @@ def max_value(d: Dict[Any, Any]) -> Any:
     >>> d = {'a': 1, 'b': 2, 'c': 3}
     >>> max_value(d)
     3
+
     """
     return item_with_max_value(d)[1]
 
@@ -143,6 +171,7 @@ def min_value(d: Dict[Any, Any]) -> Any:
     >>> d = {'a': 1, 'b': 2, 'c': 3}
     >>> min_value(d)
     1
+
     """
     return item_with_min_value(d)[1]
 
@@ -153,6 +182,7 @@ def max_key(d: Dict[Any, Any]) -> Any:
     >>> d = {'a': 3, 'b': 2, 'c': 1}
     >>> max_key(d)
     'c'
+
     """
     return max(d.keys())
 
@@ -163,10 +193,48 @@ def min_key(d: Dict[Any, Any]) -> Any:
     >>> d = {'a': 3, 'b': 2, 'c': 1}
     >>> min_key(d)
     'a'
+
     """
     return min(d.keys())
 
 
+def parallel_lists_to_dict(
+    keys: List[Any], values: List[Any]
+) -> Dict[Any, Any]:
+    """Given two parallel lists (keys and values), create and return
+    a dict.
+
+    >>> k = ['name', 'phone', 'address', 'zip']
+    >>> v = ['scott', '555-1212', '123 main st.', '12345']
+    >>> parallel_lists_to_dict(k, v)
+    {'name': 'scott', 'phone': '555-1212', 'address': '123 main st.', 'zip': '12345'}
+
+    """
+    if len(keys) != len(values):
+        raise Exception(
+            "Parallel keys and values lists must have the same length"
+        )
+    return dict(zip(keys, values))
+
+
+def dict_to_key_value_lists(d: Dict[Any, Any]) -> Tuple[List[Any], List[Any]]:
+    """
+    >>> d = {'name': 'scott', 'phone': '555-1212', 'address': '123 main st.', 'zip': '12345'}
+    >>> (k, v) = dict_to_key_value_lists(d)
+    >>> k
+    ['name', 'phone', 'address', 'zip']
+    >>> v
+    ['scott', '555-1212', '123 main st.', '12345']
+
+    """
+    r = ([], [])
+    for (k, v) in d.items():
+        r[0].append(k)
+        r[1].append(v)
+    return r
+
+
 if __name__ == '__main__':
     import doctest
+
     doctest.testmod()