Source code for pyutils.graph

#!/usr/bin/env python3

# © Copyright 2021-2023, Scott Gasch

"""A simple graph class that can be optionally directed and weighted and
some operations on it."""


import math
from typing import Dict, Generator, List, Optional, Set, Tuple

from pyutils import list_utils
from pyutils.typez.type_hints import Numeric


[docs]class Graph(object): def __init__(self, directed: bool = False): """Constructs a new Graph object. Args: directed: are we modeling a directed graph? See :meth:`add_edge`. """ self.directed = directed self.graph: Dict[str, Dict[str, Numeric]] = {} self.dijkstra: Optional[Tuple[str, Dict[str, str], Dict[str, Numeric]]] = None
[docs] def add_vertex(self, vertex_id: str) -> bool: """Adds a new vertex to the graph. Args: vertex_id: the unique identifier of the new vertex. Returns: True unless vertex_id is already in the graph. >>> g = Graph() >>> g.add_vertex('a') True >>> g.add_vertex('b') True >>> g.add_vertex('a') False >>> len(g.get_vertices()) 2 """ if vertex_id not in self.graph: self.graph[vertex_id] = {} self.dijkstra = None return True return False
[docs] def add_edge(self, src: str, dest: str, weight: Numeric = 1) -> None: """Adds a new (optionally weighted) edge between src and dest vertexes. If the graph is not directed (see c'tor) this also adds a reciprocal edge with the same weight back from dest to src too. .. note:: If either or both of src and dest are not already added to the graph, they are implicitly added by adding this edge. Args: src: the source vertex id dest: the destination vertex id weight: optionally, the weight of the edge(s) added >>> g = Graph() >>> g.add_edge('a', 'b') >>> g.add_edge('b', 'c', weight=2) >>> len(g.get_vertices()) 3 >>> g.get_edges() {'a': {'b': 1}, 'b': {'a': 1, 'c': 2}, 'c': {'b': 2}} """ self.add_vertex(src) self.add_vertex(dest) self.graph[src][dest] = weight if not self.directed: self.graph[dest][src] = weight self.dijkstra = None
[docs] def remove_edge(self, source: str, dest: str): """Remove a previously added edge in the graph. If the graph is not directed (see :meth:`__init__`), also removes the reciprocal edge from dest back to source. .. note:: This method does not remove vertexes (unlinked or otherwise). Args: source: the source vertex of the edge to remove dest: the destination vertex of the edge to remove >>> g = Graph() >>> g.add_edge('A', 'B') >>> g.add_edge('B', 'C') >>> g.get_edges() {'A': {'B': 1}, 'B': {'A': 1, 'C': 1}, 'C': {'B': 1}} >>> g.remove_edge('A', 'B') >>> g.get_edges() {'B': {'C': 1}, 'C': {'B': 1}} """ del self.graph[source][dest] if len(self.graph[source]) == 0: del self.graph[source] if not self.directed: del self.graph[dest][source] if len(self.graph[dest]) == 0: del self.graph[dest] self.dijkstra = None
[docs] def get_vertices(self) -> List[str]: """ Returns: a list of the vertex ids in the graph. >>> g = Graph() >>> g.add_vertex('a') True >>> g.add_edge('b', 'c') >>> g.get_vertices() ['a', 'b', 'c'] """ return list(self.graph.keys())
[docs] def get_edges(self) -> Dict[str, Dict[str, Numeric]]: """ Returns: A dict whose keys are source vertexes and values are dicts of destination vertexes with values describing the weight of the edge from source to destination. >>> g = Graph(directed=True) >>> g.add_edge('a', 'b') >>> g.add_edge('b', 'c', weight=2) >>> len(g.get_vertices()) 3 >>> g.get_edges() {'a': {'b': 1}, 'b': {'c': 2}, 'c': {}} """ return self.graph
def __repr__(self) -> str: """ Returns: A string representation of the graph in GraphViz format. >>> g = Graph(directed=True) >>> g.add_edge('a', 'b', weight=2) >>> g.add_edge('b', 'a', weight=4) >>> g.add_edge('a', 'c', weight=10) >>> print(g) digraph G { node [shape=record]; a -> b [weight=2] a -> c [weight=10] b -> a [weight=4] } >>> h = Graph(directed=False) >>> h.add_edge('A', 'B') >>> h.add_edge('B', 'C') >>> h.add_edge('B', 'D') >>> h.add_edge('D', 'A') >>> print(h) graph G { node [shape=record]; A -- B [weight=1] A -- D [weight=1] B -- A [weight=1] B -- C [weight=1] B -- D [weight=1] C -- B [weight=1] D -- B [weight=1] D -- A [weight=1] } """ if self.directed: edge = '->' out = 'digraph G {\n' else: edge = '--' out = 'graph G {\n' out += ' node [shape=record];\n' edges = self.get_edges() for src, dests in edges.items(): for dest, weight in dests.items(): out += f' {src} {edge} {dest} [weight={weight}]\n' out += '}\n' return out.strip() def _dfs(self, vertex: str, visited: Set[str]): yield vertex visited.add(vertex) for neighbor in self.graph[vertex]: if neighbor not in visited: yield from self._dfs(neighbor, visited)
[docs] def dfs( self, starting_vertex: str, target: Optional[str] = None ) -> Generator[str, None, None]: """Performs a depth first traversal of the graph. Args: starting_vertex: The DFS starting point. target: The vertex that, if found, indicates to halt. Returns: An ordered sequence of vertex ids visited by the traversal. .. graphviz:: graph g { node [shape=record]; A -- B -- D; A -- C -- D -- E -- F; F -- F; E -- G; } >>> g = Graph() >>> g.add_edge('A', 'B') >>> g.add_edge('A', 'C') >>> g.add_edge('B', 'D') >>> g.add_edge('C', 'D') >>> g.add_edge('D', 'E') >>> g.add_edge('E', 'F') >>> g.add_edge('E', 'G') >>> g.add_edge('F', 'F') >>> for node in g.dfs('A'): ... print(node) A B D C E F G >>> for node in g.dfs('F', 'B'): ... print(node) F E D B """ visited: Set[str] = set() for node in self._dfs(starting_vertex, visited): yield node if node == target: return
[docs] def bfs( self, starting_vertex: str, target: Optional[str] = None ) -> Generator[str, None, None]: """Performs a breadth first traversal of the graph. Args: starting_vertex: The BFS starting point. target: The vertex that, if found, we should halt the search. Returns: An ordered sequence of vertex ids visited by the traversal. .. graphviz:: graph g { node [shape=record]; A -- B -- D; A -- C -- D -- E -- F; F -- F; E -- G; } >>> g = Graph() >>> g.add_edge('A', 'B') >>> g.add_edge('A', 'C') >>> g.add_edge('B', 'D') >>> g.add_edge('C', 'D') >>> g.add_edge('D', 'E') >>> g.add_edge('E', 'F') >>> g.add_edge('E', 'G') >>> g.add_edge('F', 'F') >>> for node in g.bfs('A'): ... print(node) A B C D E F G >>> for node in g.bfs('F', 'G'): ... print(node) F E D G """ todo = [] visited = set() todo.append(starting_vertex) visited.add(starting_vertex) while todo: vertex = todo.pop(0) yield vertex if vertex == target: return neighbors = self.graph[vertex] for neighbor in neighbors: if neighbor not in visited: todo.append(neighbor) visited.add(neighbor)
def _generate_dijkstra(self, source: str) -> None: """Internal helper that runs Dijkstra's from source""" unvisited_nodes = self.get_vertices() shortest_path: Dict[str, Numeric] = {} for node in unvisited_nodes: shortest_path[node] = math.inf shortest_path[source] = 0 previous_nodes: Dict[str, str] = {} while unvisited_nodes: current_min_node = None for node in unvisited_nodes: if current_min_node is None: current_min_node = node elif shortest_path[node] < shortest_path[current_min_node]: current_min_node = node assert current_min_node neighbors = self.graph[current_min_node] for neighbor in neighbors: tentative_value = ( shortest_path[current_min_node] + self.graph[current_min_node][neighbor] ) if tentative_value < shortest_path[neighbor]: shortest_path[neighbor] = tentative_value previous_nodes[neighbor] = current_min_node unvisited_nodes.remove(current_min_node) self.dijkstra = (source, previous_nodes, shortest_path)
[docs] def minimum_path_between( self, source: str, dest: str ) -> Tuple[Optional[Numeric], List[str]]: """Compute the minimum path (lowest cost path) between source and dest. .. note:: This method runs Dijkstra's algorithm (https://en.wikipedia.org/wiki/Dijkstra%27s_algorithm) internally and caches the results. Subsequent calls made with the same source node before modifying the graph are less expensive due to these cached intermediate results. Returns: A tuple containing the minimum distance of the path and the path itself. If there is no path between the requested nodes, returns (None, []). .. graphviz:: graph g { node [shape=record]; A -- B [weight=3]; B -- D; A -- C [weight=2]; C -- D -- E -- F; F -- F; E -- G; H; } >>> g = Graph() >>> g.add_edge('A', 'B', 3) >>> g.add_edge('A', 'C', 2) >>> g.add_edge('B', 'D') >>> g.add_edge('C', 'D') >>> g.add_edge('D', 'E') >>> g.add_edge('E', 'F') >>> g.add_edge('E', 'G') >>> g.add_edge('F', 'F') >>> g.add_vertex('H') True >>> g.minimum_path_between('A', 'D') (3, ['A', 'C', 'D']) >>> g.minimum_path_between('A', 'H') (None, []) """ if self.dijkstra is None or self.dijkstra[0] != source: self._generate_dijkstra(source) assert self.dijkstra path = [] node: Optional[str] = dest while node != source: assert node path.append(node) node = self.dijkstra[1].get(node, None) if node is None: return (None, []) path.append(source) path = path[::-1] cost: Numeric = 0 for (a, b) in list_utils.ngrams(path, 2): cost += self.graph[a][b] return (cost, path)
if __name__ == "__main__": import doctest doctest.testmod()