Most/least common item in a list.
[python_utils.git] / dict_utils.py
index 29a5cd0c5b10fb84a61d078698109b8ebd31ffb6..92fd1e06a21d06a3205a138639f9cf013cb055ec 100644 (file)
@@ -1,7 +1,7 @@
 #!/usr/bin/env python3
 
 from itertools import islice
-from typing import Any, Callable, Dict, Iterator
+from typing import Any, Callable, Dict, Iterator, Tuple
 
 
 def init_or_inc(
@@ -11,6 +11,22 @@ def init_or_inc(
     init_value: Any = 1,
     inc_function: Callable[..., Any] = lambda x: x + 1
 ) -> bool:
+    """
+    Initialize a dict value (if it doesn't exist) or increments it (using the
+    inc_function, which is customizable) if it already does exist.  Returns
+    True if the key already existed or False otherwise.
+
+    >>> d = {}
+    >>> init_or_inc(d, "test")
+    False
+    >>> init_or_inc(d, "test")
+    True
+    >>> init_or_inc(d, 'ing')
+    False
+    >>> d
+    {'test': 2, 'ing': 1}
+
+    """
     if key in d.keys():
         d[key] = inc_function(d[key])
         return True
@@ -19,54 +35,148 @@ def init_or_inc(
 
 
 def shard(d: Dict[Any, Any], size: int) -> Iterator[Dict[Any, Any]]:
+    """
+    Shards a dict into N subdicts which, together, contain all keys/values
+    from the original unsharded dict.
+    """
     items = d.items()
     for x in range(0, len(d), size):
         yield {key: value for (key, value) in islice(items, x, x + size)}
 
 
-def item_with_max_value(d: Dict[Any, Any]) -> Any:
+def coalesce_by_creating_list(key, v1, v2):
+    from list_utils import flatten
+    return flatten([v1, v2])
+
+
+def coalesce_by_creating_set(key, v1, v2):
+    return set(coalesce_by_creating_list(key, v1, v2))
+
+
+def raise_on_duplicated_keys(key, v1, v2):
+    raise Exception(f'Key {key} is duplicated in more than one input dict.')
+
+
+def coalesce(
+        inputs: Iterator[Dict[Any, Any]],
+        *,
+        aggregation_function: Callable[[Any, Any], Any] = coalesce_by_creating_list
+) -> Dict[Any, Any]:
+    """Merge N dicts into one dict containing the union of all keys/values in
+    the input dicts.  When keys collide, apply the aggregation_function which,
+    by default, creates a list of values.  See also coalesce_by_creating_set or
+    provide a user defined aggregation_function.
+
+    >>> a = {'a': 1, 'b': 2}
+    >>> b = {'b': 1, 'c': 2, 'd': 3}
+    >>> c = {'c': 1, 'd': 2}
+    >>> coalesce([a, b, c])
+    {'a': 1, 'b': [1, 2], 'c': [1, 2], 'd': [2, 3]}
+
+    """
+    out: Dict[Any, Any] = {}
+    for d in inputs:
+        for key in d:
+            if key in out:
+                value = aggregation_function(key, d[key], out[key])
+            else:
+                value = d[key]
+            out[key] = value
+    return out
+
+
+def item_with_max_value(d: Dict[Any, Any]) -> Tuple[Any, Any]:
+    """Returns the key and value with the max value in a dict.
+
+    >>> d = {'a': 1, 'b': 2, 'c': 3}
+    >>> item_with_max_value(d)
+    ('c', 3)
+    >>> item_with_max_value({})
+    Traceback (most recent call last):
+    ...
+    ValueError: max() arg is an empty sequence
+
+    """
     return max(d.items(), key=lambda _: _[1])
 
 
-def item_with_min_value(d: Dict[Any, Any]) -> Any:
+def item_with_min_value(d: Dict[Any, Any]) -> Tuple[Any, Any]:
+    """Returns the key and value with the min value in a dict.
+
+    >>> d = {'a': 1, 'b': 2, 'c': 3}
+    >>> item_with_min_value(d)
+    ('a', 1)
+
+    """
     return min(d.items(), key=lambda _: _[1])
 
 
 def key_with_max_value(d: Dict[Any, Any]) -> Any:
+    """Returns the key with the max value in the dict.
+
+    >>> d = {'a': 1, 'b': 2, 'c': 3}
+    >>> key_with_max_value(d)
+    'c'
+
+    """
     return item_with_max_value(d)[0]
 
 
 def key_with_min_value(d: Dict[Any, Any]) -> Any:
+    """Returns the key with the min value in the dict.
+
+    >>> d = {'a': 1, 'b': 2, 'c': 3}
+    >>> key_with_min_value(d)
+    'a'
+
+    """
     return item_with_min_value(d)[0]
 
 
 def max_value(d: Dict[Any, Any]) -> Any:
+    """Returns the maximum value in the dict.
+
+    >>> d = {'a': 1, 'b': 2, 'c': 3}
+    >>> max_value(d)
+    3
+
+    """
     return item_with_max_value(d)[1]
 
 
 def min_value(d: Dict[Any, Any]) -> Any:
+    """Returns the minimum value in the dict.
+
+    >>> d = {'a': 1, 'b': 2, 'c': 3}
+    >>> min_value(d)
+    1
+
+    """
     return item_with_min_value(d)[1]
 
 
 def max_key(d: Dict[Any, Any]) -> Any:
+    """Returns the maximum key in dict (ignoring values totally)
+
+    >>> d = {'a': 3, 'b': 2, 'c': 1}
+    >>> max_key(d)
+    'c'
+
+    """
     return max(d.keys())
 
 
 def min_key(d: Dict[Any, Any]) -> Any:
+    """Returns the minimum key in dict (ignoring values totally)
+
+    >>> d = {'a': 3, 'b': 2, 'c': 1}
+    >>> min_key(d)
+    'a'
+
+    """
     return min(d.keys())
 
 
-def merge(a: Dict[Any, Any], b: Dict[Any, Any], path=None) -> Dict[Any, Any]:
-    if path is None:
-        path = []
-    for key in b:
-        if key in a:
-            if isinstance(a[key], dict) and isinstance(b[key], dict):
-                merge(a[key], b[key], path + [str(key)])
-            elif a[key] == b[key]:
-                pass
-            else:
-                raise Exception("Conflict at %s" % ".".join(path + [str(key)]))
-        else:
-            a[key] = b[key]
-    return a
+if __name__ == '__main__':
+    import doctest
+    doctest.testmod()