Adds remove_edge and shortest path.
authorScott Gasch <[email protected]>
Sat, 17 Dec 2022 20:01:01 +0000 (12:01 -0800)
committerScott Gasch <[email protected]>
Sat, 17 Dec 2022 20:01:01 +0000 (12:01 -0800)
src/pyutils/graph.py

index e5966975139296365d284398fd0a97bdd3da051c..24a921372bf5d28cc0a08d386236b9f72bac45e0 100644 (file)
@@ -6,8 +6,10 @@
 some operations on it."""
 
 
-from typing import Dict, Generator, List, Optional, Set
+import math
+from typing import Dict, Generator, List, Optional, Set, Tuple
 
+from pyutils import list_utils
 from pyutils.types.simple import Numeric
 
 
@@ -21,6 +23,7 @@ class Graph(object):
         """
         self.directed = directed
         self.graph: Dict[str, Dict[str, Numeric]] = {}
+        self.dijkstra: Optional[Tuple[str, Dict[str, str], Dict[str, Numeric]]] = None
 
     def add_vertex(self, vertex_id: str) -> bool:
         """Adds a new vertex to the graph.
@@ -44,6 +47,7 @@ class Graph(object):
         """
         if vertex_id not in self.graph:
             self.graph[vertex_id] = {}
+            self.dijkstra = None
             return True
         return False
 
@@ -77,6 +81,38 @@ class Graph(object):
         self.graph[src][dest] = weight
         if not self.directed:
             self.graph[dest][src] = weight
+        self.dijkstra = None
+
+    def remove_edge(self, source: str, dest: str):
+        """Remove a previously added edge in the graph.  If the graph is
+        not directed (see :meth:`__ini__`), also removes the reciprocal
+        edge from dest back to source.
+
+        .. note::
+
+            This method does not remove vertexes (unlinked or otherwise).
+
+        Args:
+            source: the source vertex of the edge to remove
+            dest: the destination vertex of the edge to remove
+
+        >>> g = Graph()
+        >>> g.add_edge('A', 'B')
+        >>> g.add_edge('B', 'C')
+        >>> g.get_edges()
+        {'A': {'B': 1}, 'B': {'A': 1, 'C': 1}, 'C': {'B': 1}}
+        >>> g.remove_edge('A', 'B')
+        >>> g.get_edges()
+        {'B': {'C': 1}, 'C': {'B': 1}}
+        """
+        del self.graph[source][dest]
+        if len(self.graph[source]) == 0:
+            del self.graph[source]
+        if not self.directed:
+            del self.graph[dest][source]
+            if len(self.graph[dest]) == 0:
+                del self.graph[dest]
+        self.dijkstra = None
 
     def get_vertices(self) -> List[str]:
         """
@@ -236,6 +272,102 @@ class Graph(object):
                     todo.append(neighbor)
                     visited.add(neighbor)
 
+    def _generate_dijkstra(self, source: str) -> None:
+        """Internal helper that runs Dijkstra's from source"""
+        unvisited_nodes = self.get_vertices()
+
+        shortest_path: Dict[str, Numeric] = {}
+        for node in unvisited_nodes:
+            shortest_path[node] = math.inf
+        shortest_path[source] = 0
+
+        previous_nodes: Dict[str, str] = {}
+        while unvisited_nodes:
+            current_min_node = None
+            for node in unvisited_nodes:
+                if current_min_node is None:
+                    current_min_node = node
+                elif shortest_path[node] < shortest_path[current_min_node]:
+                    current_min_node = node
+
+            assert current_min_node
+            neighbors = self.graph[current_min_node]
+            for neighbor in neighbors:
+                tentative_value = (
+                    shortest_path[current_min_node]
+                    + self.graph[current_min_node][neighbor]
+                )
+                if tentative_value < shortest_path[neighbor]:
+                    shortest_path[neighbor] = tentative_value
+                    previous_nodes[neighbor] = current_min_node
+            unvisited_nodes.remove(current_min_node)
+        self.dijkstra = (source, previous_nodes, shortest_path)
+
+    def minimum_path_between(self, source: str, dest: str) -> Tuple[Numeric, List[str]]:
+        """Compute the minimum path (lowest cost path) between source
+        and dest.
+
+        .. note::
+
+        This method runs Dijkstra's algorithm
+        (https://en.wikipedia.org/wiki/Dijkstra%27s_algorithm)
+        internally and caches the results.  Subsequent calls made with
+        the same source node before modifying the graph are less
+        expensive due to these cached intermediate results.
+
+        Returns:
+            A tuple containing the minimum distance of the path and the path itself.
+            If there is no path between the requested nodes, returns (None, []).
+
+        .. graphviz::
+
+            graph g {
+                node [shape=record];
+                A -- B [w=3];
+                B -- D;
+                A -- C [w=2];
+                C -- D -- E -- F;
+                F -- F;
+                E -- G;
+                H;
+            }
+
+        >>> g = Graph()
+        >>> g.add_edge('A', 'B', 3)
+        >>> g.add_edge('A', 'C', 2)
+        >>> g.add_edge('B', 'D')
+        >>> g.add_edge('C', 'D')
+        >>> g.add_edge('D', 'E')
+        >>> g.add_edge('E', 'F')
+        >>> g.add_edge('E', 'G')
+        >>> g.add_edge('F', 'F')
+        >>> g.add_vertex('H')
+        True
+        >>> g.minimum_path_between('A', 'D')
+        (3, ['A', 'C', 'D'])
+        >>> g.minimum_path_between('A', 'H')
+        (None, [])
+
+        """
+        if self.dijkstra is None or self.dijkstra[0] != source:
+            self._generate_dijkstra(source)
+
+        assert self.dijkstra
+        path = []
+        node = dest
+        while node != source:
+            path.append(node)
+            node = self.dijkstra[1].get(node, None)
+            if node is None:
+                return (None, [])
+        path.append(source)
+        path = path[::-1]
+
+        cost: Numeric = 0
+        for (a, b) in list_utils.ngrams(path, 2):
+            cost += self.graph[a][b]
+        return (cost, path)
+
 
 if __name__ == "__main__":
     import doctest