3 # © Copyright 2021-2023, Scott Gasch
5 """A simple graph class that can be optionally directed and weighted and
6 some operations on it."""
10 from typing import Dict, Generator, List, Optional, Set, Tuple
12 from pyutils import list_utils
13 from pyutils.typez.typing import Numeric
17 def __init__(self, directed: bool = False):
18 """Constructs a new Graph object.
21 directed: are we modeling a directed graph? See :meth:`add_edge`.
24 self.directed = directed
25 self.graph: Dict[str, Dict[str, Numeric]] = {}
26 self.dijkstra: Optional[Tuple[str, Dict[str, str], Dict[str, Numeric]]] = None
28 def add_vertex(self, vertex_id: str) -> bool:
29 """Adds a new vertex to the graph.
32 vertex_id: the unique identifier of the new vertex.
35 True unless vertex_id is already in the graph.
44 >>> len(g.get_vertices())
48 if vertex_id not in self.graph:
49 self.graph[vertex_id] = {}
54 def add_edge(self, src: str, dest: str, weight: Numeric = 1) -> None:
55 """Adds a new (optionally weighted) edge between src and dest
56 vertexes. If the graph is not directed (see c'tor) this also
57 adds a reciprocal edge with the same weight back from dest to
62 If either or both of src and dest are not already added to
63 the graph, they are implicitly added by adding this edge.
66 src: the source vertex id
67 dest: the destination vertex id
68 weight: optionally, the weight of the edge(s) added
71 >>> g.add_edge('a', 'b')
72 >>> g.add_edge('b', 'c', weight=2)
73 >>> len(g.get_vertices())
76 {'a': {'b': 1}, 'b': {'a': 1, 'c': 2}, 'c': {'b': 2}}
81 self.graph[src][dest] = weight
83 self.graph[dest][src] = weight
86 def remove_edge(self, source: str, dest: str):
87 """Remove a previously added edge in the graph. If the graph is
88 not directed (see :meth:`__init__`), also removes the reciprocal
89 edge from dest back to source.
93 This method does not remove vertexes (unlinked or otherwise).
96 source: the source vertex of the edge to remove
97 dest: the destination vertex of the edge to remove
100 >>> g.add_edge('A', 'B')
101 >>> g.add_edge('B', 'C')
103 {'A': {'B': 1}, 'B': {'A': 1, 'C': 1}, 'C': {'B': 1}}
104 >>> g.remove_edge('A', 'B')
106 {'B': {'C': 1}, 'C': {'B': 1}}
108 del self.graph[source][dest]
109 if len(self.graph[source]) == 0:
110 del self.graph[source]
111 if not self.directed:
112 del self.graph[dest][source]
113 if len(self.graph[dest]) == 0:
117 def get_vertices(self) -> List[str]:
120 a list of the vertex ids in the graph.
123 >>> g.add_vertex('a')
125 >>> g.add_edge('b', 'c')
129 return list(self.graph.keys())
131 def get_edges(self) -> Dict[str, Dict[str, Numeric]]:
134 A dict whose keys are source vertexes and values
135 are dicts of destination vertexes with values describing the
136 weight of the edge from source to destination.
138 >>> g = Graph(directed=True)
139 >>> g.add_edge('a', 'b')
140 >>> g.add_edge('b', 'c', weight=2)
141 >>> len(g.get_vertices())
144 {'a': {'b': 1}, 'b': {'c': 2}, 'c': {}}
148 def __repr__(self) -> str:
151 A string representation of the graph in GraphViz format.
153 >>> g = Graph(directed=True)
154 >>> g.add_edge('a', 'b', weight=2)
155 >>> g.add_edge('b', 'a', weight=4)
156 >>> g.add_edge('a', 'c', weight=10)
165 >>> h = Graph(directed=False)
166 >>> h.add_edge('A', 'B')
167 >>> h.add_edge('B', 'C')
168 >>> h.add_edge('B', 'D')
169 >>> h.add_edge('D', 'A')
185 out = 'digraph G {\n'
189 out += ' node [shape=record];\n'
190 edges = self.get_edges()
191 for src, dests in edges.items():
192 for dest, weight in dests.items():
193 out += f' {src} {edge} {dest} [weight={weight}]\n'
197 def _dfs(self, vertex: str, visited: Set[str]):
200 for neighbor in self.graph[vertex]:
201 if neighbor not in visited:
202 yield from self._dfs(neighbor, visited)
205 self, starting_vertex: str, target: Optional[str] = None
206 ) -> Generator[str, None, None]:
207 """Performs a depth first traversal of the graph.
210 starting_vertex: The DFS starting point.
211 target: The vertex that, if found, indicates to halt.
214 An ordered sequence of vertex ids visited by the traversal.
221 A -- C -- D -- E -- F;
227 >>> g.add_edge('A', 'B')
228 >>> g.add_edge('A', 'C')
229 >>> g.add_edge('B', 'D')
230 >>> g.add_edge('C', 'D')
231 >>> g.add_edge('D', 'E')
232 >>> g.add_edge('E', 'F')
233 >>> g.add_edge('E', 'G')
234 >>> g.add_edge('F', 'F')
235 >>> for node in g.dfs('A'):
245 >>> for node in g.dfs('F', 'B'):
252 visited: Set[str] = set()
253 for node in self._dfs(starting_vertex, visited):
259 self, starting_vertex: str, target: Optional[str] = None
260 ) -> Generator[str, None, None]:
261 """Performs a breadth first traversal of the graph.
264 starting_vertex: The BFS starting point.
265 target: The vertex that, if found, we should halt the search.
268 An ordered sequence of vertex ids visited by the traversal.
275 A -- C -- D -- E -- F;
281 >>> g.add_edge('A', 'B')
282 >>> g.add_edge('A', 'C')
283 >>> g.add_edge('B', 'D')
284 >>> g.add_edge('C', 'D')
285 >>> g.add_edge('D', 'E')
286 >>> g.add_edge('E', 'F')
287 >>> g.add_edge('E', 'G')
288 >>> g.add_edge('F', 'F')
289 >>> for node in g.bfs('A'):
299 >>> for node in g.bfs('F', 'G'):
309 todo.append(starting_vertex)
310 visited.add(starting_vertex)
318 neighbors = self.graph[vertex]
319 for neighbor in neighbors:
320 if neighbor not in visited:
321 todo.append(neighbor)
322 visited.add(neighbor)
324 def _generate_dijkstra(self, source: str) -> None:
325 """Internal helper that runs Dijkstra's from source"""
326 unvisited_nodes = self.get_vertices()
328 shortest_path: Dict[str, Numeric] = {}
329 for node in unvisited_nodes:
330 shortest_path[node] = math.inf
331 shortest_path[source] = 0
333 previous_nodes: Dict[str, str] = {}
334 while unvisited_nodes:
335 current_min_node = None
336 for node in unvisited_nodes:
337 if current_min_node is None:
338 current_min_node = node
339 elif shortest_path[node] < shortest_path[current_min_node]:
340 current_min_node = node
342 assert current_min_node
343 neighbors = self.graph[current_min_node]
344 for neighbor in neighbors:
346 shortest_path[current_min_node]
347 + self.graph[current_min_node][neighbor]
349 if tentative_value < shortest_path[neighbor]:
350 shortest_path[neighbor] = tentative_value
351 previous_nodes[neighbor] = current_min_node
352 unvisited_nodes.remove(current_min_node)
353 self.dijkstra = (source, previous_nodes, shortest_path)
355 def minimum_path_between(
356 self, source: str, dest: str
357 ) -> Tuple[Optional[Numeric], List[str]]:
358 """Compute the minimum path (lowest cost path) between source
363 This method runs Dijkstra's algorithm
364 (https://en.wikipedia.org/wiki/Dijkstra%27s_algorithm)
365 internally and caches the results. Subsequent calls made with
366 the same source node before modifying the graph are less
367 expensive due to these cached intermediate results.
370 A tuple containing the minimum distance of the path and the path itself.
371 If there is no path between the requested nodes, returns (None, []).
387 >>> g.add_edge('A', 'B', 3)
388 >>> g.add_edge('A', 'C', 2)
389 >>> g.add_edge('B', 'D')
390 >>> g.add_edge('C', 'D')
391 >>> g.add_edge('D', 'E')
392 >>> g.add_edge('E', 'F')
393 >>> g.add_edge('E', 'G')
394 >>> g.add_edge('F', 'F')
395 >>> g.add_vertex('H')
397 >>> g.minimum_path_between('A', 'D')
399 >>> g.minimum_path_between('A', 'H')
403 if self.dijkstra is None or self.dijkstra[0] != source:
404 self._generate_dijkstra(source)
408 node: Optional[str] = dest
409 while node != source:
412 node = self.dijkstra[1].get(node, None)
419 for (a, b) in list_utils.ngrams(path, 2):
420 cost += self.graph[a][b]
424 if __name__ == "__main__":