Ran black code formatter on everything.
[python_utils.git] / dict_utils.py
index 92fd1e06a21d06a3205a138639f9cf013cb055ec..79c86edf286f2c9ea9983906385365199be74892 100644 (file)
@@ -1,7 +1,7 @@
 #!/usr/bin/env python3
 
 from itertools import islice
-from typing import Any, Callable, Dict, Iterator, Tuple
+from typing import Any, Callable, Dict, Iterator, List, Tuple
 
 
 def init_or_inc(
@@ -9,7 +9,7 @@ def init_or_inc(
     key: Any,
     *,
     init_value: Any = 1,
-    inc_function: Callable[..., Any] = lambda x: x + 1
+    inc_function: Callable[..., Any] = lambda x: x + 1,
 ) -> bool:
     """
     Initialize a dict value (if it doesn't exist) or increments it (using the
@@ -38,34 +38,47 @@ def shard(d: Dict[Any, Any], size: int) -> Iterator[Dict[Any, Any]]:
     """
     Shards a dict into N subdicts which, together, contain all keys/values
     from the original unsharded dict.
+
     """
     items = d.items()
     for x in range(0, len(d), size):
         yield {key: value for (key, value) in islice(items, x, x + size)}
 
 
-def coalesce_by_creating_list(key, v1, v2):
+def coalesce_by_creating_list(key, new_value, old_value):
     from list_utils import flatten
-    return flatten([v1, v2])
+
+    return flatten([new_value, old_value])
+
+
+def coalesce_by_creating_set(key, new_value, old_value):
+    return set(coalesce_by_creating_list(key, new_value, old_value))
+
+
+def coalesce_last_write_wins(key, new_value, old_value):
+    return new_value
 
 
-def coalesce_by_creating_set(key, v1, v2):
-    return set(coalesce_by_creating_list(key, v1, v2))
+def coalesce_first_write_wins(key, new_value, old_value):
+    return old_value
 
 
-def raise_on_duplicated_keys(key, v1, v2):
+def raise_on_duplicated_keys(key, new_value, old_value):
     raise Exception(f'Key {key} is duplicated in more than one input dict.')
 
 
 def coalesce(
-        inputs: Iterator[Dict[Any, Any]],
-        *,
-        aggregation_function: Callable[[Any, Any], Any] = coalesce_by_creating_list
+    inputs: Iterator[Dict[Any, Any]],
+    *,
+    aggregation_function: Callable[[Any, Any], Any] = coalesce_by_creating_list,
 ) -> Dict[Any, Any]:
-    """Merge N dicts into one dict containing the union of all keys/values in
-    the input dicts.  When keys collide, apply the aggregation_function which,
-    by default, creates a list of values.  See also coalesce_by_creating_set or
-    provide a user defined aggregation_function.
+    """Merge N dicts into one dict containing the union of all keys /
+    values in the input dicts.  When keys collide, apply the
+    aggregation_function which, by default, creates a list of values.
+    See also several other alternative functions for coalescing values
+    (coalesce_by_creating_set, coalesce_first_write_wins,
+    coalesce_last_write_wins, raise_on_duplicated_keys) or provide a
+    custom helper function.
 
     >>> a = {'a': 1, 'b': 2}
     >>> b = {'b': 1, 'c': 2, 'd': 3}
@@ -73,6 +86,14 @@ def coalesce(
     >>> coalesce([a, b, c])
     {'a': 1, 'b': [1, 2], 'c': [1, 2], 'd': [2, 3]}
 
+    >>> coalesce([a, b, c], aggregation_function=coalesce_last_write_wins)
+    {'a': 1, 'b': 1, 'c': 1, 'd': 2}
+
+    >>> coalesce([a, b, c], aggregation_function=raise_on_duplicated_keys)
+    Traceback (most recent call last):
+    ...
+    Exception: Key b is duplicated in more than one input dict.
+
     """
     out: Dict[Any, Any] = {}
     for d in inputs:
@@ -177,6 +198,43 @@ def min_key(d: Dict[Any, Any]) -> Any:
     return min(d.keys())
 
 
+def parallel_lists_to_dict(
+    keys: List[Any], values: List[Any]
+) -> Dict[Any, Any]:
+    """Given two parallel lists (keys and values), create and return
+    a dict.
+
+    >>> k = ['name', 'phone', 'address', 'zip']
+    >>> v = ['scott', '555-1212', '123 main st.', '12345']
+    >>> parallel_lists_to_dict(k, v)
+    {'name': 'scott', 'phone': '555-1212', 'address': '123 main st.', 'zip': '12345'}
+
+    """
+    if len(keys) != len(values):
+        raise Exception(
+            "Parallel keys and values lists must have the same length"
+        )
+    return dict(zip(keys, values))
+
+
+def dict_to_key_value_lists(d: Dict[Any, Any]) -> Tuple[List[Any], List[Any]]:
+    """
+    >>> d = {'name': 'scott', 'phone': '555-1212', 'address': '123 main st.', 'zip': '12345'}
+    >>> (k, v) = dict_to_key_value_lists(d)
+    >>> k
+    ['name', 'phone', 'address', 'zip']
+    >>> v
+    ['scott', '555-1212', '123 main st.', '12345']
+
+    """
+    r = ([], [])
+    for (k, v) in d.items():
+        r[0].append(k)
+        r[1].append(v)
+    return r
+
+
 if __name__ == '__main__':
     import doctest
+
     doctest.testmod()