Adds Graph.
authorScott Gasch <[email protected]>
Sat, 17 Dec 2022 05:44:24 +0000 (21:44 -0800)
committerScott Gasch <[email protected]>
Sat, 17 Dec 2022 05:44:24 +0000 (21:44 -0800)
src/pyutils/collectionz/interval_tree.py
src/pyutils/graph.py [new file with mode: 0644]
src/pyutils/math_utils.py
src/pyutils/types/rate.py
src/pyutils/types/simple.py [new file with mode: 0644]

index a8278a2dc8ea835a501951e3abddb9727d405930..878f6289f709c4838a268f24be4a320fb1e0d2bf 100644 (file)
@@ -1,5 +1,7 @@
 #!/usr/bin/env python3
 
+# © Copyright 2021-2022, Scott Gasch
+
 """This is an augmented interval tree for storing ranges and identifying overlaps as
 described by: https://en.wikipedia.org/wiki/Interval_tree.
 """
@@ -7,13 +9,12 @@ described by: https://en.wikipedia.org/wiki/Interval_tree.
 from __future__ import annotations
 
 from functools import total_ordering
-from typing import Any, Generator, Optional, Union
+from typing import Any, Generator, Optional
 
 from overrides import overrides
 
 from pyutils.collectionz import bst
-
-Numeric = Union[int, float]
+from pyutils.types.simple import Numeric
 
 
 @total_ordering
diff --git a/src/pyutils/graph.py b/src/pyutils/graph.py
new file mode 100644 (file)
index 0000000..428dd16
--- /dev/null
@@ -0,0 +1,240 @@
+#!/usr/bin/env python3
+
+# © Copyright 2021-2022, Scott Gasch
+
+"""A simple graph class that can be optionally directed and weighted and
+some operations on it."""
+
+
+from typing import Dict, Generator, List, Optional, Set
+
+from pyutils.types.simple import Numeric
+
+
+class Graph(object):
+    def __init__(self, directed: bool = False):
+        """Constructs a new Graph object.
+
+        Args:
+            directed: are we modeling a directed graph?  See ::meth
+                add_edge.
+
+        """
+        self.directed = directed
+        self.graph: Dict[str, Dict[str, Numeric]] = {}
+
+    def add_vertex(self, vertex_id: str) -> bool:
+        """Adds a new vertex to the graph.
+
+        Args:
+            vertex_id: the unique identifier of the new vertex.
+
+        Returns:
+            True unless vertex_id is already in the graph.
+
+        >>> g = Graph()
+        >>> g.add_vertex('a')
+        True
+        >>> g.add_vertex('b')
+        True
+        >>> g.add_vertex('a')
+        False
+        >>> len(g.get_vertices())
+        2
+
+        """
+        if vertex_id not in self.graph:
+            self.graph[vertex_id] = {}
+            return True
+        return False
+
+    def add_edge(self, src: str, dest: str, weight: Numeric = 1) -> None:
+        """Adds a new (optionally weighted) edge between src and dest
+        vertexes.  If the graph is not directed (see c'tor) this also
+        adds a reciprocal edge with the same weight back from dest to
+        src too.
+
+        .. note::
+
+        If either or both of src and dest are not already added to the
+        graph, they are implicitly added by adding this edge.
+
+        Args:
+            src: the source vertex id
+            dest: the destination vertex id
+            weight: optionally, the weight of the edge(s) added
+
+        >>> g = Graph()
+        >>> g.add_edge('a', 'b')
+        >>> g.add_edge('b', 'c', weight=2)
+        >>> len(g.get_vertices())
+        3
+        >>> g.get_edges()
+        {'a': {'b': 1}, 'b': {'a': 1, 'c': 2}, 'c': {'b': 2}}
+
+        """
+        self.add_vertex(src)
+        self.add_vertex(dest)
+        self.graph[src][dest] = weight
+        if not self.directed:
+            self.graph[dest][src] = weight
+
+    def get_vertices(self) -> List[str]:
+        """
+        Returns:
+            a list of the vertex ids in the graph.
+
+        >>> g = Graph()
+        >>> g.add_vertex('a')
+        True
+        >>> g.add_edge('b', 'c')
+        >>> g.get_vertices()
+        ['a', 'b', 'c']
+        """
+        return list(self.graph.keys())
+
+    def get_edges(self) -> Dict[str, Dict[str, Numeric]]:
+        """
+        Returns:
+            A dict whose keys are source vertexes and values
+            are dicts of destination vertexes with values describing the
+            weight of the edge from source to destination.
+
+        >>> g = Graph(directed=True)
+        >>> g.add_edge('a', 'b')
+        >>> g.add_edge('b', 'c', weight=2)
+        >>> len(g.get_vertices())
+        3
+        >>> g.get_edges()
+        {'a': {'b': 1}, 'b': {'c': 2}, 'c': {}}
+        """
+        return self.graph
+
+    def _dfs(self, vertex: str, visited: Set[str]):
+        yield vertex
+        visited.add(vertex)
+        for neighbor in self.graph[vertex]:
+            if neighbor not in visited:
+                yield from self._dfs(neighbor, visited)
+
+    def dfs(
+        self, starting_vertex: str, target: Optional[str] = None
+    ) -> Generator[str, None, None]:
+        """Performs a depth first traversal of the graph.
+
+        Args:
+            starting_vertex: The DFS starting point.
+            target: The vertex that, if found, indicates to halt.
+
+        Returns:
+            An ordered sequence of vertex ids visited by the traversal.
+
+                A ------ B
+                |        |
+                |        |
+                C ------ D ------ E ------ F -O
+                                  |
+                                  |
+                                  G
+
+        >>> g = Graph()
+        >>> g.add_edge('A', 'B')
+        >>> g.add_edge('A', 'C')
+        >>> g.add_edge('B', 'D')
+        >>> g.add_edge('C', 'D')
+        >>> g.add_edge('D', 'E')
+        >>> g.add_edge('E', 'F')
+        >>> g.add_edge('E', 'G')
+        >>> g.add_edge('F', 'F')
+        >>> for node in g.dfs('A'):
+        ...     print(node)
+        A
+        B
+        D
+        C
+        E
+        F
+        G
+
+        >>> for node in g.dfs('F', 'B'):
+        ...     print(node)
+        F
+        E
+        D
+        B
+        """
+        visited: Set[str] = set()
+        for node in self._dfs(starting_vertex, visited):
+            yield node
+            if node == target:
+                return
+
+    def bfs(
+        self, starting_vertex: str, target: Optional[str] = None
+    ) -> Generator[str, None, None]:
+        """Performs a breadth first traversal of the graph.
+
+        Args:
+            starting_vertex: The BFS starting point.
+            target: The vertex that, if found, we should halt the search.
+
+        Returns:
+            An ordered sequence of vertex ids visited by the traversal.
+
+                A ------ B
+                |        |
+                |        |
+                C ------ D ------ E ------ F -O
+                                  |
+                                  |
+                                  G
+
+        >>> g = Graph()
+        >>> g.add_edge('A', 'B')
+        >>> g.add_edge('A', 'C')
+        >>> g.add_edge('B', 'D')
+        >>> g.add_edge('C', 'D')
+        >>> g.add_edge('D', 'E')
+        >>> g.add_edge('E', 'F')
+        >>> g.add_edge('E', 'G')
+        >>> g.add_edge('F', 'F')
+        >>> for node in g.bfs('A'):
+        ...     print(node)
+        A
+        B
+        C
+        D
+        E
+        F
+        G
+
+        >>> for node in g.bfs('F', 'G'):
+        ...     print(node)
+        F
+        E
+        D
+        G
+        """
+        todo = []
+        visited = set()
+
+        todo.append(starting_vertex)
+        visited.add(starting_vertex)
+
+        while todo:
+            vertex = todo.pop(0)
+            yield vertex
+            if vertex == target:
+                return
+
+            neighbors = self.graph[vertex]
+            for neighbor in neighbors:
+                if neighbor not in visited:
+                    todo.append(neighbor)
+                    visited.add(neighbor)
+
+
+if __name__ == "__main__":
+    import doctest
+
+    doctest.testmod()
index dd8f33a2e4bf781b658d4ae7266e4a3035412dbb..2270364ed6521e341b713418b71be313a0732a57 100644 (file)
@@ -11,10 +11,11 @@ from heapq import heappop, heappush
 from typing import Dict, List, Optional, Tuple
 
 from pyutils import dict_utils
+from pyutils.types.simple import Numeric
 
 
 class NumericPopulation(object):
-    """This object *store* a numerical population in a way that enables relatively
+    """This object *store* a numeric population in a way that enables relatively
     fast addition of new numbers (:math:`O(2log_2 n)`) and instant access to the
     median value in the population (:math:`O(1)`).  It also provides other population
     summary statistics such as the :meth:`get_mode`, :meth:`get_percentile` and
@@ -50,11 +51,11 @@ class NumericPopulation(object):
     def __init__(self):
         self.lowers, self.highers = [], []
         self.aggregate = 0.0
-        self.sorted_copy: Optional[List[float]] = None
+        self.sorted_copy: Optional[List[Numeric]] = None
         self.maximum = None
         self.minimum = None
 
-    def add_number(self, number: float):
+    def add_number(self, number: Numeric):
         """Adds a number to the population.  Runtime complexity of this
         operation is :math:`O(2 log_2 n)`
 
@@ -92,7 +93,7 @@ class NumericPopulation(object):
         elif len(self.highers) - len(self.lowers) > 1:
             heappush(self.lowers, -heappop(self.highers))
 
-    def get_median(self) -> float:
+    def get_median(self) -> Numeric:
         """
         Returns:
             The median (p50) of the current population in :math:`O(1)` time.
@@ -112,13 +113,13 @@ class NumericPopulation(object):
         count = len(self)
         return self.aggregate / count
 
-    def get_mode(self) -> Tuple[float, int]:
+    def get_mode(self) -> Tuple[Numeric, int]:
         """
         Returns:
             The population mode (most common member in the population)
             in :math:`O(n)` time.
         """
-        count: Dict[float, int] = collections.defaultdict(int)
+        count: Dict[Numeric, int] = collections.defaultdict(int)
         for n in self.lowers:
             count[-n] += 1
         for n in self.highers:
@@ -150,7 +151,7 @@ class NumericPopulation(object):
                 self.sorted_copy.append(x)
             self.sorted_copy = sorted(self.sorted_copy)
 
-    def get_percentile(self, n: float) -> float:
+    def get_percentile(self, n: float) -> Numeric:
         """
         Returns: the number at approximately pn% in the population
         (i.e. the nth percentile) in :math:`O(n log_2 n)` time (it
index a4947e6b2cc15a6fc24a3ce99c461b0a0741c780..4e007e67f699d44bba090d6b4b403fecb181455d 100644 (file)
@@ -39,7 +39,7 @@ class Rate(object):
         count = 0
         if multiplier is not None:
             if isinstance(multiplier, str):
-                multiplier = multiplier.replace('%', '')
+                multiplier = multiplier.replace("%", "")
                 m = float(multiplier)
                 m /= 100
                 self.multiplier: float = m
@@ -54,7 +54,7 @@ class Rate(object):
             count += 1
         if count != 1:
             raise Exception(
-                'Exactly one of percentage, percent_change or multiplier is required.'
+                "Exactly one of percentage, percent_change or multiplier is required."
             )
 
     def apply_to(self, other):
@@ -124,4 +124,4 @@ class Rate(object):
             percentage = (self.multiplier - 1.0) * 100.0
         else:
             percentage = self.multiplier * 100.0
-        return f'{percentage:+.{places}f}%'
+        return f"{percentage:+.{places}f}%"
diff --git a/src/pyutils/types/simple.py b/src/pyutils/types/simple.py
new file mode 100644 (file)
index 0000000..7b80c17
--- /dev/null
@@ -0,0 +1,9 @@
+#!/usr/bin/env python3
+
+# © Copyright 2021-2022, Scott Gasch
+
+"""Simple type helpers."""
+
+from typing import Union
+
+Numeric = Union[int, float]