scgraph.graph
1from heapq import heappop, heappush 2from typing import Any 3from scgraph.graph_utils import GraphUtils, GraphModifiers 4from scgraph.contraction_hierarchies import CHGraph 5from bmsspy import Bmssp 6 7 8class GraphTrees: 9 def get_shortest_path_tree(self, origin_id: int | set[int]) -> dict: 10 """ 11 Function: 12 13 - Calculate the shortest path tree of a graph using Dijkstra's algorithm 14 15 Required Arguments: 16 17 - `origin_id` 18 - Type: int | set[int] 19 - What: The id(s) of the node(s) from which to calculate the shortest path tree 20 21 Returns: 22 23 - A dictionary with the following keys: 24 - `origin_id`: The id of the node from which the shortest path tree was calculated 25 - `predecessors`: A list of node ids referring to the predecessor of each node given the shortest path tree 26 - Note: For disconnected graphs, nodes that are not connected to the origin node will have a predecessor of None 27 - `distance_matrix`: A list of distances from the origin node to each node in the graph 28 - Note: For disconnected graphs, nodes that are not connected to the origin node will have a distance of float("inf") 29 30 """ 31 # Input Validation 32 self.__input_check__(origin_id=origin_id, destination_id=0) 33 origin_ids = {origin_id} if isinstance(origin_id, int) else origin_id 34 35 # Variable Initialization 36 distance_matrix = [float("inf")] * len(self.graph) 37 open_leaves = [] 38 predecessor = [-1] * len(self.graph) 39 40 for oid in origin_ids: 41 distance_matrix[oid] = 0 42 heappush(open_leaves, (0, oid)) 43 44 while open_leaves: 45 current_distance, current_id = heappop(open_leaves) 46 for connected_id, connected_distance in self.graph[ 47 current_id 48 ].items(): 49 possible_distance = current_distance + connected_distance 50 if possible_distance < distance_matrix[connected_id]: 51 distance_matrix[connected_id] = possible_distance 52 predecessor[connected_id] = current_id 53 heappush(open_leaves, (possible_distance, connected_id)) 54 55 return { 56 "origin_id": origin_id, 57 "predecessors": predecessor, 58 "distance_matrix": distance_matrix, 59 } 60 61 def get_tree_path( 62 self, 63 origin_id: int, 64 destination_id: int, 65 tree_data: dict, 66 length_only: bool = False, 67 ) -> dict: 68 """ 69 Function: 70 71 - Get the path from the origin node to the destination node using the provided tree_data 72 - Return a list of node ids in the order they are visited as well as the length of the path 73 - If the destination node is not reachable from the origin node, return None 74 75 Required Arguments: 76 77 - `origin_id` 78 - Type: int 79 - What: The id of the origin node from the graph dictionary to start the shortest path from 80 - Note: Since multiple origins are possible, if the origin_id is not a predecessor node of the destination node, the closest origin will be used. 81 - `destination_id` 82 - Type: int 83 - What: The id of the destination node from the graph dictionary to end the shortest path at 84 - `tree_data` 85 - Type: dict 86 - What: The output from a tree function 87 88 Optional Arguments: 89 90 - `length_only` 91 - Type: bool 92 - What: If True, only returns the length of the path 93 - Default: False 94 95 Returns: 96 97 - A dictionary with the following keys: 98 - `path`: A list of node ids in the order they are visited from the origin node to the destination node 99 - `length`: The length of the path from the origin node to the destination node 100 """ 101 if tree_data["origin_id"] != origin_id: 102 raise Exception( 103 "The origin node must be the same as the spanning node for this function to work." 104 ) 105 destination_distance = tree_data["distance_matrix"][destination_id] 106 if destination_distance == float("inf"): 107 raise Exception( 108 "Something went wrong, the origin and destination nodes are not connected." 109 ) 110 if length_only: 111 return {"length": destination_distance} 112 current_id = destination_id 113 current_path = [destination_id] 114 while current_id != origin_id and current_id != -1: 115 current_id = tree_data["predecessors"][current_id] 116 current_path.append(current_id) 117 current_path.reverse() 118 return { 119 "path": current_path, 120 "length": destination_distance, 121 } 122 123 124class GraphAlgorithms: 125 def dijkstra( 126 self, 127 origin_id: int | set[int], 128 destination_id: int, 129 ) -> dict: 130 """ 131 Function: 132 133 - Identify the shortest path between two nodes in a sparse network graph using a modified Dijkstra algorithm 134 135 - Return a dictionary of various path information including: 136 - `path`: A list of node ids in the order they are visited 137 - `length`: The length of the path from the origin node to the destination node 138 139 Required Arguments: 140 141 - `origin_id` 142 - Type: int | set[int] 143 - What: The id(s) of the origin node(s) from the graph dictionary to start the shortest path from 144 - `destination_id` 145 - Type: int 146 - What: The id of the destination node from the graph dictionary to end the shortest path at 147 148 Optional Arguments: 149 150 - None 151 """ 152 # Input Validation 153 self.__input_check__(origin_id=origin_id, destination_id=destination_id) 154 origin_ids = {origin_id} if isinstance(origin_id, int) else origin_id 155 # Variable Initialization 156 distance_matrix = [float("inf")] * len(self.graph) 157 predecessor = [-1] * len(self.graph) 158 open_leaves = [] 159 160 for oid in origin_ids: 161 distance_matrix[oid] = 0 162 heappush(open_leaves, (0, oid)) 163 164 while open_leaves: 165 current_distance, current_id = heappop(open_leaves) 166 if current_id == destination_id: 167 break 168 # Technically, the next line is not necessary but can help with performance 169 if current_distance == distance_matrix[current_id]: 170 for connected_id, connected_distance in self.graph[ 171 current_id 172 ].items(): 173 possible_distance = current_distance + connected_distance 174 if possible_distance < distance_matrix[connected_id]: 175 distance_matrix[connected_id] = possible_distance 176 predecessor[connected_id] = current_id 177 heappush(open_leaves, (possible_distance, connected_id)) 178 if current_id != destination_id: 179 raise Exception( 180 "Something went wrong, the origin and destination nodes are not connected." 181 ) 182 183 return { 184 "path": self.__reconstruct_path__(destination_id, predecessor), 185 "length": distance_matrix[destination_id], 186 } 187 188 def dijkstra_negative( 189 self, 190 origin_id: int | set[int], 191 destination_id: int, 192 cycle_check_iterations: int | None = None, 193 ) -> dict: 194 """ 195 Function: 196 197 - Identify the shortest path between two nodes in a sparse network graph using a modified Dijkstra algorithm that catches negative cycles 198 - Negative cycles raise an exception if they are detected 199 - Note: This algorithm is guaranteed to find the shortest path or raise an exception if a negative cycle is detected 200 - Note: This algorithm requires computing the entire shortest path tree of the graph and is therefore not able to be terminated early 201 - For non negative weighted graphs, it is recommended to use the `dijkstra` algorithm instead 202 - Note: This should be fairly performant in general, however it does have a higher worst-case time complexity than Bellman-Ford 203 204 - Return a dictionary of various path information including: 205 - `id_path`: A list of node ids in the order they are visited 206 - `path`: A list of node dictionaries (lat + long) in the order they are visited 207 208 Required Arguments: 209 210 - `origin_id` 211 - Type: int | set[int] 212 - What: The id(s) of the origin node(s) from the graph dictionary to start the shortest path from 213 - `destination_id` 214 - Type: int 215 - What: The id of the destination node from the graph dictionary to end the shortest path at 216 217 Optional Arguments: 218 219 - `cycle_check_iterations` 220 - Type: int | None 221 - What: The number of iterations to loop over before checking for negative cycles 222 - Default: None (Then set to the number of nodes in the graph) 223 """ 224 # Input Validation 225 self.__input_check__(origin_id=origin_id, destination_id=destination_id) 226 origin_ids = {origin_id} if isinstance(origin_id, int) else origin_id 227 228 # Variable Initialization 229 distance_matrix = [float("inf")] * len(self.graph) 230 predecessor = [-1] * len(self.graph) 231 open_leaves = [] 232 233 for oid in origin_ids: 234 distance_matrix[oid] = 0 235 heappush(open_leaves, (0, oid)) 236 237 # Cycle iteration Variables 238 cycle_iteration = 0 239 if cycle_check_iterations is None: 240 cycle_check_iterations = len(self.graph) 241 242 while open_leaves: 243 current_distance, current_id = heappop(open_leaves) 244 if current_distance == distance_matrix[current_id]: 245 # Increment the cycle iteration counter and check for negative cycles if the iteration limit is reached 246 cycle_iteration += 1 247 if cycle_iteration >= cycle_check_iterations: 248 cycle_iteration = 0 # Reset the cycle iteration counter 249 self.__cycle_check__( 250 predecessor_matrix=predecessor, node_id=current_id 251 ) 252 for connected_id, connected_distance in self.graph[ 253 current_id 254 ].items(): 255 possible_distance = current_distance + connected_distance 256 if possible_distance < distance_matrix[connected_id]: 257 distance_matrix[connected_id] = possible_distance 258 predecessor[connected_id] = current_id 259 heappush(open_leaves, (possible_distance, connected_id)) 260 261 if distance_matrix[destination_id] == float("inf"): 262 raise Exception( 263 "Something went wrong, the origin and destination nodes are not connected." 264 ) 265 266 return { 267 "path": self.__reconstruct_path__(destination_id, predecessor), 268 "length": distance_matrix[destination_id], 269 } 270 271 def a_star( 272 self, 273 origin_id: int | set[int], 274 destination_id: int, 275 heuristic_fn: callable = None, 276 ) -> dict: 277 """ 278 Function: 279 280 - Identify the shortest path between two nodes in a sparse network graph using an A* extension of Makowski's modified Dijkstra algorithm 281 - Return a dictionary of various path information including: 282 - `id_path`: A list of node ids in the order they are visited 283 - `path`: A list of node dictionaries (lat + long) in the order they are visited 284 285 Required Arguments: 286 287 - `origin_id` 288 - Type: int | set[int] 289 - What: The id of the origin node from the graph dictionary to start the shortest path from 290 - `destination_id` 291 - Type: int 292 - What: The id of the destination node from the graph dictionary to end the shortest path at 293 - `heuristic_fn` 294 - Type: function 295 - What: A heuristic function that takes two node ids and returns an estimated distance between them 296 - Note: If None, returns the shortest path using Dijkstra's algorithm 297 - Default: None 298 299 Optional Arguments: 300 301 - None 302 """ 303 if heuristic_fn is None: 304 return self.dijkstra( 305 origin_id=origin_id, 306 destination_id=destination_id, 307 ) 308 # Input Validation 309 self.__input_check__(origin_id=origin_id, destination_id=destination_id) 310 origin_ids = {origin_id} if isinstance(origin_id, int) else origin_id 311 312 # Variable Initialization 313 distance_matrix = [float("inf")] * len(self.graph) 314 # Using a visited matrix does add a tad bit of overhead but avoids revisiting nodes 315 # and does not require anything extra to be stored in the heap 316 visited = [0] * len(self.graph) 317 open_leaves = [] 318 predecessor = [-1] * len(self.graph) 319 320 for oid in origin_ids: 321 distance_matrix[oid] = 0 322 heappush(open_leaves, (0, oid)) 323 324 while open_leaves: 325 current_id = heappop(open_leaves)[1] 326 if current_id == destination_id: 327 break 328 if visited[current_id] == 1: 329 continue 330 visited[current_id] = 1 331 current_distance = distance_matrix[current_id] 332 for connected_id, connected_distance in self.graph[ 333 current_id 334 ].items(): 335 possible_distance = current_distance + connected_distance 336 if possible_distance < distance_matrix[connected_id]: 337 distance_matrix[connected_id] = possible_distance 338 predecessor[connected_id] = current_id 339 heappush( 340 open_leaves, 341 ( 342 possible_distance 343 + heuristic_fn(connected_id, destination_id), 344 connected_id, 345 ), 346 ) 347 if current_id != destination_id: 348 raise Exception( 349 "Something went wrong, the origin and destination nodes are not connected." 350 ) 351 352 return { 353 "path": self.__reconstruct_path__(destination_id, predecessor), 354 "length": distance_matrix[destination_id], 355 } 356 357 def bellman_ford( 358 self, 359 origin_id: int | set[int], 360 destination_id: int, 361 ) -> dict: 362 """ 363 Function: 364 365 - Identify the shortest path between two nodes in a sparse network graph using the Bellman-Ford algorithm 366 - Return a dictionary of various path information including: 367 - `id_path`: A list of node ids in the order they are visited 368 - `path`: A list of node dictionaries (lat + long) in the order they are visited 369 370 Required Arguments: 371 372 - `origin_id` 373 - Type: int | set[int] 374 - What: The id(s) of the origin node(s) from the graph dictionary to start the shortest path from 375 - `destination_id` 376 - Type: int 377 - What: The id of the destination node from the graph dictionary to end the shortest path at 378 379 Optional Arguments: 380 381 - None 382 """ 383 # Input Validation 384 self.__input_check__(origin_id=origin_id, destination_id=destination_id) 385 origin_ids = {origin_id} if isinstance(origin_id, int) else origin_id 386 # Variable Initialization 387 distance_matrix = [float("inf")] * len(self.graph) 388 predecessor = [-1] * len(self.graph) 389 390 for oid in origin_ids: 391 distance_matrix[oid] = 0 392 393 len_graph = len(self.graph) 394 for i in range(len_graph): 395 for current_id in range(len(self.graph)): 396 current_distance = distance_matrix[current_id] 397 for connected_id, connected_distance in self.graph[ 398 current_id 399 ].items(): 400 possible_distance = current_distance + connected_distance 401 if possible_distance < distance_matrix[connected_id]: 402 distance_matrix[connected_id] = possible_distance 403 predecessor[connected_id] = current_id 404 if i == len_graph - 1: 405 raise Exception( 406 "Graph contains a negative weight cycle" 407 ) 408 # Check if destination is reachable 409 if distance_matrix[destination_id] == float("inf"): 410 raise Exception( 411 "Something went wrong, the origin and destination nodes are not connected." 412 ) 413 414 return { 415 "path": self.__reconstruct_path__(destination_id, predecessor), 416 "length": distance_matrix[destination_id], 417 } 418 419 def bmssp( 420 self, 421 origin_id: int | set[int], 422 destination_id: int, 423 use_constant_degree_graph: bool = True, 424 ): 425 """ 426 Function: 427 428 - Identify the shortest path between two nodes in a sparse network graph using the BMSSPy package 429 430 - This is now a wrapper for the BMSSPy package 431 - https://github.com/connor-makowski/bmsspy 432 - For backwards compatibility, this function wraps around the dijkstra function if BMSSPy is not installed 433 434 - Return a dictionary of various path information including: 435 - `id_path`: A list of node ids in the order they are visited 436 - `path`: A list of node dictionaries (lat + long) in the order they are visited 437 438 Required Arguments: 439 440 - `origin_id` 441 - Type: int | set[int] 442 - What: The id(s) of the origin node(s) from the graph dictionary to start the shortest path from 443 - `destination_id` 444 - Type: int 445 - What: The id of the destination node from the graph dictionary to end the shortest path at 446 - `use_constant_degree_graph` 447 - Type: bool 448 - What: Whether to convert the graph to a constant degree 2 graph prior to running the BMSSPy algorithm 449 - Default: True 450 451 452 Optional Arguments: 453 454 - None 455 """ 456 bmssp_graph = Bmssp( 457 graph=self.graph, 458 use_constant_degree_graph=use_constant_degree_graph, 459 ) 460 output = bmssp_graph.solve( 461 origin_id=origin_id, destination_id=destination_id 462 ) 463 return { 464 "path": output["path"], 465 "length": output["length"], 466 } 467 468 def cached_shortest_path( 469 self, 470 origin_id: int, 471 destination_id: int, 472 length_only: bool = False, 473 ): 474 """ 475 Function: 476 477 - Get the shortest path between two nodes in the graph attempting to use a cached shortest path tree if available 478 - If a cached shortest path tree is not available, it will compute the shortest path tree and cache it for future use if specified by `cache` 479 - Uses the get_shortest_path_tree (Dijkstra) and get_tree_path functions internally 480 - Stores cached shortest path trees in a list (self.__cache__) where the index corresponds to the origin node id 481 - Note: If you modify this graph after caching shortest path trees, the cached trees may become invalid 482 - You can reset the cache by calling self.reset_cache() 483 - For efficiency, the cache is not automatically reset when the graph is modified 484 - This logic must be handled by the user 485 486 Requires: 487 488 - origin_id: The id of the origin node 489 - destination_id: The id of the destination node 490 491 Optional: 492 493 - length_only: If True, only returns the length of the path 494 """ 495 shortest_path_tree = self.__cache__[origin_id] 496 if shortest_path_tree == 0: 497 shortest_path_tree = self.get_shortest_path_tree( 498 origin_id=origin_id 499 ) 500 self.__cache__[origin_id] = shortest_path_tree 501 return self.get_tree_path( 502 origin_id=origin_id, 503 destination_id=destination_id, 504 tree_data=shortest_path_tree, 505 length_only=length_only, 506 ) 507 508 def create_contraction_hierarchy( 509 self, heuristic_fn=None, ch_graph_kwargs=None 510 ) -> Any: 511 """ 512 Function: 513 514 - Create a Contraction Hierarchies (CH) graph from the current Graph object 515 - The CH graph is stored as an instance variable `self.ch_graph` 516 517 Optional Arguments: 518 519 - `heuristic_fn`: 520 - Type: function or None 521 - What: A heuristic function for CH preprocessing 522 - Default: None (uses default heuristic) 523 """ 524 if not hasattr(self, "__ch_graph__"): 525 ch_graph_kwargs = ( 526 ch_graph_kwargs if ch_graph_kwargs is not None else dict() 527 ) 528 self.__ch_graph__ = CHGraph( 529 graph=self.graph, heuristic_fn=heuristic_fn, **ch_graph_kwargs 530 ) 531 532 def contraction_hierarchy( 533 self, origin_id: int, destination_id: int, length_only: bool = False 534 ) -> dict[str, Any]: 535 """ 536 Function: 537 538 - Get the shortest path between two nodes using the Contraction Hierarchies (CH) graph 539 - Creates the CH graph if it doesn't exist 540 541 Requires: 542 543 - origin_id: The id of the origin node 544 - destination_id: The id of the destination node 545 546 Optional: 547 548 - length_only: If True, only returns the length of the path (not implemented yet) 549 550 Returns: 551 552 - A dictionary with 'path' and 'length' keys containing the shortest path and its length 553 """ 554 # Ensure that the CH graph is created and warmed up 555 self.create_contraction_hierarchy() 556 return self.__ch_graph__.search(origin_id, destination_id) 557 558 559class Graph(GraphUtils, GraphModifiers, GraphTrees, GraphAlgorithms): 560 def __init__(self, graph: list[dict[int, int | float]], validate=False): 561 """ 562 Function: 563 564 - Initialize a Graph object 565 - Validate the input graph 566 567 Required Arguments: 568 569 - `graph`: 570 - Type: list of dictionaries with integer keys and integer or float values 571 - What: A list of dictionaries where the indicies are origin node ids and the values are dictionaries of destination node indices and graph weights 572 - Note: All nodes must be included as origins in the graph regardless of if they have any connected destinations 573 - EG: 574 ``` 575 [ 576 # From London (index 0) 577 { 578 # To Paris (index 1) 579 1: 311, 580 }, 581 # From Paris (index 1) 582 { 583 # To London (index 0) 584 0: 311, 585 # To Berlin (index 2) 586 2: 878, 587 # To Rome (index 3) 588 3: 1439, 589 # To Madrid (index 4) 590 4: 1053 591 }, 592 # From Berlin (index 2) 593 { 594 # To Paris (index 1) 595 1: 878, 596 # To Rome (index 3) 597 3: 1181, 598 }, 599 # From Rome (index 3) 600 { 601 # To Paris (index 1) 602 1: 1439, 603 # To Berlin (index 2) 604 2: 1181, 605 }, 606 # From Madrid (index 4) 607 { 608 # To Paris (index 1) 609 1: 1053, 610 # To Lisbon (index 5) 611 5: 623 612 }, 613 # From Lisbon (index 5) 614 { 615 # To Madrid (index 4) 616 4: 623 617 } 618 ] 619 ``` 620 621 Optional Arguments: 622 623 - `validate`: 624 - Type: bool 625 - What: Whether to validate the input graph upon initialization 626 - Default: False 627 """ 628 self.graph = graph 629 self.reset_cache() 630 if validate: 631 self.validate()
9class GraphTrees: 10 def get_shortest_path_tree(self, origin_id: int | set[int]) -> dict: 11 """ 12 Function: 13 14 - Calculate the shortest path tree of a graph using Dijkstra's algorithm 15 16 Required Arguments: 17 18 - `origin_id` 19 - Type: int | set[int] 20 - What: The id(s) of the node(s) from which to calculate the shortest path tree 21 22 Returns: 23 24 - A dictionary with the following keys: 25 - `origin_id`: The id of the node from which the shortest path tree was calculated 26 - `predecessors`: A list of node ids referring to the predecessor of each node given the shortest path tree 27 - Note: For disconnected graphs, nodes that are not connected to the origin node will have a predecessor of None 28 - `distance_matrix`: A list of distances from the origin node to each node in the graph 29 - Note: For disconnected graphs, nodes that are not connected to the origin node will have a distance of float("inf") 30 31 """ 32 # Input Validation 33 self.__input_check__(origin_id=origin_id, destination_id=0) 34 origin_ids = {origin_id} if isinstance(origin_id, int) else origin_id 35 36 # Variable Initialization 37 distance_matrix = [float("inf")] * len(self.graph) 38 open_leaves = [] 39 predecessor = [-1] * len(self.graph) 40 41 for oid in origin_ids: 42 distance_matrix[oid] = 0 43 heappush(open_leaves, (0, oid)) 44 45 while open_leaves: 46 current_distance, current_id = heappop(open_leaves) 47 for connected_id, connected_distance in self.graph[ 48 current_id 49 ].items(): 50 possible_distance = current_distance + connected_distance 51 if possible_distance < distance_matrix[connected_id]: 52 distance_matrix[connected_id] = possible_distance 53 predecessor[connected_id] = current_id 54 heappush(open_leaves, (possible_distance, connected_id)) 55 56 return { 57 "origin_id": origin_id, 58 "predecessors": predecessor, 59 "distance_matrix": distance_matrix, 60 } 61 62 def get_tree_path( 63 self, 64 origin_id: int, 65 destination_id: int, 66 tree_data: dict, 67 length_only: bool = False, 68 ) -> dict: 69 """ 70 Function: 71 72 - Get the path from the origin node to the destination node using the provided tree_data 73 - Return a list of node ids in the order they are visited as well as the length of the path 74 - If the destination node is not reachable from the origin node, return None 75 76 Required Arguments: 77 78 - `origin_id` 79 - Type: int 80 - What: The id of the origin node from the graph dictionary to start the shortest path from 81 - Note: Since multiple origins are possible, if the origin_id is not a predecessor node of the destination node, the closest origin will be used. 82 - `destination_id` 83 - Type: int 84 - What: The id of the destination node from the graph dictionary to end the shortest path at 85 - `tree_data` 86 - Type: dict 87 - What: The output from a tree function 88 89 Optional Arguments: 90 91 - `length_only` 92 - Type: bool 93 - What: If True, only returns the length of the path 94 - Default: False 95 96 Returns: 97 98 - A dictionary with the following keys: 99 - `path`: A list of node ids in the order they are visited from the origin node to the destination node 100 - `length`: The length of the path from the origin node to the destination node 101 """ 102 if tree_data["origin_id"] != origin_id: 103 raise Exception( 104 "The origin node must be the same as the spanning node for this function to work." 105 ) 106 destination_distance = tree_data["distance_matrix"][destination_id] 107 if destination_distance == float("inf"): 108 raise Exception( 109 "Something went wrong, the origin and destination nodes are not connected." 110 ) 111 if length_only: 112 return {"length": destination_distance} 113 current_id = destination_id 114 current_path = [destination_id] 115 while current_id != origin_id and current_id != -1: 116 current_id = tree_data["predecessors"][current_id] 117 current_path.append(current_id) 118 current_path.reverse() 119 return { 120 "path": current_path, 121 "length": destination_distance, 122 }
10 def get_shortest_path_tree(self, origin_id: int | set[int]) -> dict: 11 """ 12 Function: 13 14 - Calculate the shortest path tree of a graph using Dijkstra's algorithm 15 16 Required Arguments: 17 18 - `origin_id` 19 - Type: int | set[int] 20 - What: The id(s) of the node(s) from which to calculate the shortest path tree 21 22 Returns: 23 24 - A dictionary with the following keys: 25 - `origin_id`: The id of the node from which the shortest path tree was calculated 26 - `predecessors`: A list of node ids referring to the predecessor of each node given the shortest path tree 27 - Note: For disconnected graphs, nodes that are not connected to the origin node will have a predecessor of None 28 - `distance_matrix`: A list of distances from the origin node to each node in the graph 29 - Note: For disconnected graphs, nodes that are not connected to the origin node will have a distance of float("inf") 30 31 """ 32 # Input Validation 33 self.__input_check__(origin_id=origin_id, destination_id=0) 34 origin_ids = {origin_id} if isinstance(origin_id, int) else origin_id 35 36 # Variable Initialization 37 distance_matrix = [float("inf")] * len(self.graph) 38 open_leaves = [] 39 predecessor = [-1] * len(self.graph) 40 41 for oid in origin_ids: 42 distance_matrix[oid] = 0 43 heappush(open_leaves, (0, oid)) 44 45 while open_leaves: 46 current_distance, current_id = heappop(open_leaves) 47 for connected_id, connected_distance in self.graph[ 48 current_id 49 ].items(): 50 possible_distance = current_distance + connected_distance 51 if possible_distance < distance_matrix[connected_id]: 52 distance_matrix[connected_id] = possible_distance 53 predecessor[connected_id] = current_id 54 heappush(open_leaves, (possible_distance, connected_id)) 55 56 return { 57 "origin_id": origin_id, 58 "predecessors": predecessor, 59 "distance_matrix": distance_matrix, 60 }
Function:
- Calculate the shortest path tree of a graph using Dijkstra's algorithm
Required Arguments:
origin_id- Type: int | set[int]
- What: The id(s) of the node(s) from which to calculate the shortest path tree
Returns:
- A dictionary with the following keys:
origin_id: The id of the node from which the shortest path tree was calculatedpredecessors: A list of node ids referring to the predecessor of each node given the shortest path tree- Note: For disconnected graphs, nodes that are not connected to the origin node will have a predecessor of None
distance_matrix: A list of distances from the origin node to each node in the graph- Note: For disconnected graphs, nodes that are not connected to the origin node will have a distance of float("inf")
62 def get_tree_path( 63 self, 64 origin_id: int, 65 destination_id: int, 66 tree_data: dict, 67 length_only: bool = False, 68 ) -> dict: 69 """ 70 Function: 71 72 - Get the path from the origin node to the destination node using the provided tree_data 73 - Return a list of node ids in the order they are visited as well as the length of the path 74 - If the destination node is not reachable from the origin node, return None 75 76 Required Arguments: 77 78 - `origin_id` 79 - Type: int 80 - What: The id of the origin node from the graph dictionary to start the shortest path from 81 - Note: Since multiple origins are possible, if the origin_id is not a predecessor node of the destination node, the closest origin will be used. 82 - `destination_id` 83 - Type: int 84 - What: The id of the destination node from the graph dictionary to end the shortest path at 85 - `tree_data` 86 - Type: dict 87 - What: The output from a tree function 88 89 Optional Arguments: 90 91 - `length_only` 92 - Type: bool 93 - What: If True, only returns the length of the path 94 - Default: False 95 96 Returns: 97 98 - A dictionary with the following keys: 99 - `path`: A list of node ids in the order they are visited from the origin node to the destination node 100 - `length`: The length of the path from the origin node to the destination node 101 """ 102 if tree_data["origin_id"] != origin_id: 103 raise Exception( 104 "The origin node must be the same as the spanning node for this function to work." 105 ) 106 destination_distance = tree_data["distance_matrix"][destination_id] 107 if destination_distance == float("inf"): 108 raise Exception( 109 "Something went wrong, the origin and destination nodes are not connected." 110 ) 111 if length_only: 112 return {"length": destination_distance} 113 current_id = destination_id 114 current_path = [destination_id] 115 while current_id != origin_id and current_id != -1: 116 current_id = tree_data["predecessors"][current_id] 117 current_path.append(current_id) 118 current_path.reverse() 119 return { 120 "path": current_path, 121 "length": destination_distance, 122 }
Function:
- Get the path from the origin node to the destination node using the provided tree_data
- Return a list of node ids in the order they are visited as well as the length of the path
- If the destination node is not reachable from the origin node, return None
Required Arguments:
origin_id- Type: int
- What: The id of the origin node from the graph dictionary to start the shortest path from
- Note: Since multiple origins are possible, if the origin_id is not a predecessor node of the destination node, the closest origin will be used.
destination_id- Type: int
- What: The id of the destination node from the graph dictionary to end the shortest path at
tree_data- Type: dict
- What: The output from a tree function
Optional Arguments:
length_only- Type: bool
- What: If True, only returns the length of the path
- Default: False
Returns:
- A dictionary with the following keys:
path: A list of node ids in the order they are visited from the origin node to the destination nodelength: The length of the path from the origin node to the destination node
125class GraphAlgorithms: 126 def dijkstra( 127 self, 128 origin_id: int | set[int], 129 destination_id: int, 130 ) -> dict: 131 """ 132 Function: 133 134 - Identify the shortest path between two nodes in a sparse network graph using a modified Dijkstra algorithm 135 136 - Return a dictionary of various path information including: 137 - `path`: A list of node ids in the order they are visited 138 - `length`: The length of the path from the origin node to the destination node 139 140 Required Arguments: 141 142 - `origin_id` 143 - Type: int | set[int] 144 - What: The id(s) of the origin node(s) from the graph dictionary to start the shortest path from 145 - `destination_id` 146 - Type: int 147 - What: The id of the destination node from the graph dictionary to end the shortest path at 148 149 Optional Arguments: 150 151 - None 152 """ 153 # Input Validation 154 self.__input_check__(origin_id=origin_id, destination_id=destination_id) 155 origin_ids = {origin_id} if isinstance(origin_id, int) else origin_id 156 # Variable Initialization 157 distance_matrix = [float("inf")] * len(self.graph) 158 predecessor = [-1] * len(self.graph) 159 open_leaves = [] 160 161 for oid in origin_ids: 162 distance_matrix[oid] = 0 163 heappush(open_leaves, (0, oid)) 164 165 while open_leaves: 166 current_distance, current_id = heappop(open_leaves) 167 if current_id == destination_id: 168 break 169 # Technically, the next line is not necessary but can help with performance 170 if current_distance == distance_matrix[current_id]: 171 for connected_id, connected_distance in self.graph[ 172 current_id 173 ].items(): 174 possible_distance = current_distance + connected_distance 175 if possible_distance < distance_matrix[connected_id]: 176 distance_matrix[connected_id] = possible_distance 177 predecessor[connected_id] = current_id 178 heappush(open_leaves, (possible_distance, connected_id)) 179 if current_id != destination_id: 180 raise Exception( 181 "Something went wrong, the origin and destination nodes are not connected." 182 ) 183 184 return { 185 "path": self.__reconstruct_path__(destination_id, predecessor), 186 "length": distance_matrix[destination_id], 187 } 188 189 def dijkstra_negative( 190 self, 191 origin_id: int | set[int], 192 destination_id: int, 193 cycle_check_iterations: int | None = None, 194 ) -> dict: 195 """ 196 Function: 197 198 - Identify the shortest path between two nodes in a sparse network graph using a modified Dijkstra algorithm that catches negative cycles 199 - Negative cycles raise an exception if they are detected 200 - Note: This algorithm is guaranteed to find the shortest path or raise an exception if a negative cycle is detected 201 - Note: This algorithm requires computing the entire shortest path tree of the graph and is therefore not able to be terminated early 202 - For non negative weighted graphs, it is recommended to use the `dijkstra` algorithm instead 203 - Note: This should be fairly performant in general, however it does have a higher worst-case time complexity than Bellman-Ford 204 205 - Return a dictionary of various path information including: 206 - `id_path`: A list of node ids in the order they are visited 207 - `path`: A list of node dictionaries (lat + long) in the order they are visited 208 209 Required Arguments: 210 211 - `origin_id` 212 - Type: int | set[int] 213 - What: The id(s) of the origin node(s) from the graph dictionary to start the shortest path from 214 - `destination_id` 215 - Type: int 216 - What: The id of the destination node from the graph dictionary to end the shortest path at 217 218 Optional Arguments: 219 220 - `cycle_check_iterations` 221 - Type: int | None 222 - What: The number of iterations to loop over before checking for negative cycles 223 - Default: None (Then set to the number of nodes in the graph) 224 """ 225 # Input Validation 226 self.__input_check__(origin_id=origin_id, destination_id=destination_id) 227 origin_ids = {origin_id} if isinstance(origin_id, int) else origin_id 228 229 # Variable Initialization 230 distance_matrix = [float("inf")] * len(self.graph) 231 predecessor = [-1] * len(self.graph) 232 open_leaves = [] 233 234 for oid in origin_ids: 235 distance_matrix[oid] = 0 236 heappush(open_leaves, (0, oid)) 237 238 # Cycle iteration Variables 239 cycle_iteration = 0 240 if cycle_check_iterations is None: 241 cycle_check_iterations = len(self.graph) 242 243 while open_leaves: 244 current_distance, current_id = heappop(open_leaves) 245 if current_distance == distance_matrix[current_id]: 246 # Increment the cycle iteration counter and check for negative cycles if the iteration limit is reached 247 cycle_iteration += 1 248 if cycle_iteration >= cycle_check_iterations: 249 cycle_iteration = 0 # Reset the cycle iteration counter 250 self.__cycle_check__( 251 predecessor_matrix=predecessor, node_id=current_id 252 ) 253 for connected_id, connected_distance in self.graph[ 254 current_id 255 ].items(): 256 possible_distance = current_distance + connected_distance 257 if possible_distance < distance_matrix[connected_id]: 258 distance_matrix[connected_id] = possible_distance 259 predecessor[connected_id] = current_id 260 heappush(open_leaves, (possible_distance, connected_id)) 261 262 if distance_matrix[destination_id] == float("inf"): 263 raise Exception( 264 "Something went wrong, the origin and destination nodes are not connected." 265 ) 266 267 return { 268 "path": self.__reconstruct_path__(destination_id, predecessor), 269 "length": distance_matrix[destination_id], 270 } 271 272 def a_star( 273 self, 274 origin_id: int | set[int], 275 destination_id: int, 276 heuristic_fn: callable = None, 277 ) -> dict: 278 """ 279 Function: 280 281 - Identify the shortest path between two nodes in a sparse network graph using an A* extension of Makowski's modified Dijkstra algorithm 282 - Return a dictionary of various path information including: 283 - `id_path`: A list of node ids in the order they are visited 284 - `path`: A list of node dictionaries (lat + long) in the order they are visited 285 286 Required Arguments: 287 288 - `origin_id` 289 - Type: int | set[int] 290 - What: The id of the origin node from the graph dictionary to start the shortest path from 291 - `destination_id` 292 - Type: int 293 - What: The id of the destination node from the graph dictionary to end the shortest path at 294 - `heuristic_fn` 295 - Type: function 296 - What: A heuristic function that takes two node ids and returns an estimated distance between them 297 - Note: If None, returns the shortest path using Dijkstra's algorithm 298 - Default: None 299 300 Optional Arguments: 301 302 - None 303 """ 304 if heuristic_fn is None: 305 return self.dijkstra( 306 origin_id=origin_id, 307 destination_id=destination_id, 308 ) 309 # Input Validation 310 self.__input_check__(origin_id=origin_id, destination_id=destination_id) 311 origin_ids = {origin_id} if isinstance(origin_id, int) else origin_id 312 313 # Variable Initialization 314 distance_matrix = [float("inf")] * len(self.graph) 315 # Using a visited matrix does add a tad bit of overhead but avoids revisiting nodes 316 # and does not require anything extra to be stored in the heap 317 visited = [0] * len(self.graph) 318 open_leaves = [] 319 predecessor = [-1] * len(self.graph) 320 321 for oid in origin_ids: 322 distance_matrix[oid] = 0 323 heappush(open_leaves, (0, oid)) 324 325 while open_leaves: 326 current_id = heappop(open_leaves)[1] 327 if current_id == destination_id: 328 break 329 if visited[current_id] == 1: 330 continue 331 visited[current_id] = 1 332 current_distance = distance_matrix[current_id] 333 for connected_id, connected_distance in self.graph[ 334 current_id 335 ].items(): 336 possible_distance = current_distance + connected_distance 337 if possible_distance < distance_matrix[connected_id]: 338 distance_matrix[connected_id] = possible_distance 339 predecessor[connected_id] = current_id 340 heappush( 341 open_leaves, 342 ( 343 possible_distance 344 + heuristic_fn(connected_id, destination_id), 345 connected_id, 346 ), 347 ) 348 if current_id != destination_id: 349 raise Exception( 350 "Something went wrong, the origin and destination nodes are not connected." 351 ) 352 353 return { 354 "path": self.__reconstruct_path__(destination_id, predecessor), 355 "length": distance_matrix[destination_id], 356 } 357 358 def bellman_ford( 359 self, 360 origin_id: int | set[int], 361 destination_id: int, 362 ) -> dict: 363 """ 364 Function: 365 366 - Identify the shortest path between two nodes in a sparse network graph using the Bellman-Ford algorithm 367 - Return a dictionary of various path information including: 368 - `id_path`: A list of node ids in the order they are visited 369 - `path`: A list of node dictionaries (lat + long) in the order they are visited 370 371 Required Arguments: 372 373 - `origin_id` 374 - Type: int | set[int] 375 - What: The id(s) of the origin node(s) from the graph dictionary to start the shortest path from 376 - `destination_id` 377 - Type: int 378 - What: The id of the destination node from the graph dictionary to end the shortest path at 379 380 Optional Arguments: 381 382 - None 383 """ 384 # Input Validation 385 self.__input_check__(origin_id=origin_id, destination_id=destination_id) 386 origin_ids = {origin_id} if isinstance(origin_id, int) else origin_id 387 # Variable Initialization 388 distance_matrix = [float("inf")] * len(self.graph) 389 predecessor = [-1] * len(self.graph) 390 391 for oid in origin_ids: 392 distance_matrix[oid] = 0 393 394 len_graph = len(self.graph) 395 for i in range(len_graph): 396 for current_id in range(len(self.graph)): 397 current_distance = distance_matrix[current_id] 398 for connected_id, connected_distance in self.graph[ 399 current_id 400 ].items(): 401 possible_distance = current_distance + connected_distance 402 if possible_distance < distance_matrix[connected_id]: 403 distance_matrix[connected_id] = possible_distance 404 predecessor[connected_id] = current_id 405 if i == len_graph - 1: 406 raise Exception( 407 "Graph contains a negative weight cycle" 408 ) 409 # Check if destination is reachable 410 if distance_matrix[destination_id] == float("inf"): 411 raise Exception( 412 "Something went wrong, the origin and destination nodes are not connected." 413 ) 414 415 return { 416 "path": self.__reconstruct_path__(destination_id, predecessor), 417 "length": distance_matrix[destination_id], 418 } 419 420 def bmssp( 421 self, 422 origin_id: int | set[int], 423 destination_id: int, 424 use_constant_degree_graph: bool = True, 425 ): 426 """ 427 Function: 428 429 - Identify the shortest path between two nodes in a sparse network graph using the BMSSPy package 430 431 - This is now a wrapper for the BMSSPy package 432 - https://github.com/connor-makowski/bmsspy 433 - For backwards compatibility, this function wraps around the dijkstra function if BMSSPy is not installed 434 435 - Return a dictionary of various path information including: 436 - `id_path`: A list of node ids in the order they are visited 437 - `path`: A list of node dictionaries (lat + long) in the order they are visited 438 439 Required Arguments: 440 441 - `origin_id` 442 - Type: int | set[int] 443 - What: The id(s) of the origin node(s) from the graph dictionary to start the shortest path from 444 - `destination_id` 445 - Type: int 446 - What: The id of the destination node from the graph dictionary to end the shortest path at 447 - `use_constant_degree_graph` 448 - Type: bool 449 - What: Whether to convert the graph to a constant degree 2 graph prior to running the BMSSPy algorithm 450 - Default: True 451 452 453 Optional Arguments: 454 455 - None 456 """ 457 bmssp_graph = Bmssp( 458 graph=self.graph, 459 use_constant_degree_graph=use_constant_degree_graph, 460 ) 461 output = bmssp_graph.solve( 462 origin_id=origin_id, destination_id=destination_id 463 ) 464 return { 465 "path": output["path"], 466 "length": output["length"], 467 } 468 469 def cached_shortest_path( 470 self, 471 origin_id: int, 472 destination_id: int, 473 length_only: bool = False, 474 ): 475 """ 476 Function: 477 478 - Get the shortest path between two nodes in the graph attempting to use a cached shortest path tree if available 479 - If a cached shortest path tree is not available, it will compute the shortest path tree and cache it for future use if specified by `cache` 480 - Uses the get_shortest_path_tree (Dijkstra) and get_tree_path functions internally 481 - Stores cached shortest path trees in a list (self.__cache__) where the index corresponds to the origin node id 482 - Note: If you modify this graph after caching shortest path trees, the cached trees may become invalid 483 - You can reset the cache by calling self.reset_cache() 484 - For efficiency, the cache is not automatically reset when the graph is modified 485 - This logic must be handled by the user 486 487 Requires: 488 489 - origin_id: The id of the origin node 490 - destination_id: The id of the destination node 491 492 Optional: 493 494 - length_only: If True, only returns the length of the path 495 """ 496 shortest_path_tree = self.__cache__[origin_id] 497 if shortest_path_tree == 0: 498 shortest_path_tree = self.get_shortest_path_tree( 499 origin_id=origin_id 500 ) 501 self.__cache__[origin_id] = shortest_path_tree 502 return self.get_tree_path( 503 origin_id=origin_id, 504 destination_id=destination_id, 505 tree_data=shortest_path_tree, 506 length_only=length_only, 507 ) 508 509 def create_contraction_hierarchy( 510 self, heuristic_fn=None, ch_graph_kwargs=None 511 ) -> Any: 512 """ 513 Function: 514 515 - Create a Contraction Hierarchies (CH) graph from the current Graph object 516 - The CH graph is stored as an instance variable `self.ch_graph` 517 518 Optional Arguments: 519 520 - `heuristic_fn`: 521 - Type: function or None 522 - What: A heuristic function for CH preprocessing 523 - Default: None (uses default heuristic) 524 """ 525 if not hasattr(self, "__ch_graph__"): 526 ch_graph_kwargs = ( 527 ch_graph_kwargs if ch_graph_kwargs is not None else dict() 528 ) 529 self.__ch_graph__ = CHGraph( 530 graph=self.graph, heuristic_fn=heuristic_fn, **ch_graph_kwargs 531 ) 532 533 def contraction_hierarchy( 534 self, origin_id: int, destination_id: int, length_only: bool = False 535 ) -> dict[str, Any]: 536 """ 537 Function: 538 539 - Get the shortest path between two nodes using the Contraction Hierarchies (CH) graph 540 - Creates the CH graph if it doesn't exist 541 542 Requires: 543 544 - origin_id: The id of the origin node 545 - destination_id: The id of the destination node 546 547 Optional: 548 549 - length_only: If True, only returns the length of the path (not implemented yet) 550 551 Returns: 552 553 - A dictionary with 'path' and 'length' keys containing the shortest path and its length 554 """ 555 # Ensure that the CH graph is created and warmed up 556 self.create_contraction_hierarchy() 557 return self.__ch_graph__.search(origin_id, destination_id)
126 def dijkstra( 127 self, 128 origin_id: int | set[int], 129 destination_id: int, 130 ) -> dict: 131 """ 132 Function: 133 134 - Identify the shortest path between two nodes in a sparse network graph using a modified Dijkstra algorithm 135 136 - Return a dictionary of various path information including: 137 - `path`: A list of node ids in the order they are visited 138 - `length`: The length of the path from the origin node to the destination node 139 140 Required Arguments: 141 142 - `origin_id` 143 - Type: int | set[int] 144 - What: The id(s) of the origin node(s) from the graph dictionary to start the shortest path from 145 - `destination_id` 146 - Type: int 147 - What: The id of the destination node from the graph dictionary to end the shortest path at 148 149 Optional Arguments: 150 151 - None 152 """ 153 # Input Validation 154 self.__input_check__(origin_id=origin_id, destination_id=destination_id) 155 origin_ids = {origin_id} if isinstance(origin_id, int) else origin_id 156 # Variable Initialization 157 distance_matrix = [float("inf")] * len(self.graph) 158 predecessor = [-1] * len(self.graph) 159 open_leaves = [] 160 161 for oid in origin_ids: 162 distance_matrix[oid] = 0 163 heappush(open_leaves, (0, oid)) 164 165 while open_leaves: 166 current_distance, current_id = heappop(open_leaves) 167 if current_id == destination_id: 168 break 169 # Technically, the next line is not necessary but can help with performance 170 if current_distance == distance_matrix[current_id]: 171 for connected_id, connected_distance in self.graph[ 172 current_id 173 ].items(): 174 possible_distance = current_distance + connected_distance 175 if possible_distance < distance_matrix[connected_id]: 176 distance_matrix[connected_id] = possible_distance 177 predecessor[connected_id] = current_id 178 heappush(open_leaves, (possible_distance, connected_id)) 179 if current_id != destination_id: 180 raise Exception( 181 "Something went wrong, the origin and destination nodes are not connected." 182 ) 183 184 return { 185 "path": self.__reconstruct_path__(destination_id, predecessor), 186 "length": distance_matrix[destination_id], 187 }
Function:
Identify the shortest path between two nodes in a sparse network graph using a modified Dijkstra algorithm
Return a dictionary of various path information including:
path: A list of node ids in the order they are visitedlength: The length of the path from the origin node to the destination node
Required Arguments:
origin_id- Type: int | set[int]
- What: The id(s) of the origin node(s) from the graph dictionary to start the shortest path from
destination_id- Type: int
- What: The id of the destination node from the graph dictionary to end the shortest path at
Optional Arguments:
- None
189 def dijkstra_negative( 190 self, 191 origin_id: int | set[int], 192 destination_id: int, 193 cycle_check_iterations: int | None = None, 194 ) -> dict: 195 """ 196 Function: 197 198 - Identify the shortest path between two nodes in a sparse network graph using a modified Dijkstra algorithm that catches negative cycles 199 - Negative cycles raise an exception if they are detected 200 - Note: This algorithm is guaranteed to find the shortest path or raise an exception if a negative cycle is detected 201 - Note: This algorithm requires computing the entire shortest path tree of the graph and is therefore not able to be terminated early 202 - For non negative weighted graphs, it is recommended to use the `dijkstra` algorithm instead 203 - Note: This should be fairly performant in general, however it does have a higher worst-case time complexity than Bellman-Ford 204 205 - Return a dictionary of various path information including: 206 - `id_path`: A list of node ids in the order they are visited 207 - `path`: A list of node dictionaries (lat + long) in the order they are visited 208 209 Required Arguments: 210 211 - `origin_id` 212 - Type: int | set[int] 213 - What: The id(s) of the origin node(s) from the graph dictionary to start the shortest path from 214 - `destination_id` 215 - Type: int 216 - What: The id of the destination node from the graph dictionary to end the shortest path at 217 218 Optional Arguments: 219 220 - `cycle_check_iterations` 221 - Type: int | None 222 - What: The number of iterations to loop over before checking for negative cycles 223 - Default: None (Then set to the number of nodes in the graph) 224 """ 225 # Input Validation 226 self.__input_check__(origin_id=origin_id, destination_id=destination_id) 227 origin_ids = {origin_id} if isinstance(origin_id, int) else origin_id 228 229 # Variable Initialization 230 distance_matrix = [float("inf")] * len(self.graph) 231 predecessor = [-1] * len(self.graph) 232 open_leaves = [] 233 234 for oid in origin_ids: 235 distance_matrix[oid] = 0 236 heappush(open_leaves, (0, oid)) 237 238 # Cycle iteration Variables 239 cycle_iteration = 0 240 if cycle_check_iterations is None: 241 cycle_check_iterations = len(self.graph) 242 243 while open_leaves: 244 current_distance, current_id = heappop(open_leaves) 245 if current_distance == distance_matrix[current_id]: 246 # Increment the cycle iteration counter and check for negative cycles if the iteration limit is reached 247 cycle_iteration += 1 248 if cycle_iteration >= cycle_check_iterations: 249 cycle_iteration = 0 # Reset the cycle iteration counter 250 self.__cycle_check__( 251 predecessor_matrix=predecessor, node_id=current_id 252 ) 253 for connected_id, connected_distance in self.graph[ 254 current_id 255 ].items(): 256 possible_distance = current_distance + connected_distance 257 if possible_distance < distance_matrix[connected_id]: 258 distance_matrix[connected_id] = possible_distance 259 predecessor[connected_id] = current_id 260 heappush(open_leaves, (possible_distance, connected_id)) 261 262 if distance_matrix[destination_id] == float("inf"): 263 raise Exception( 264 "Something went wrong, the origin and destination nodes are not connected." 265 ) 266 267 return { 268 "path": self.__reconstruct_path__(destination_id, predecessor), 269 "length": distance_matrix[destination_id], 270 }
Function:
- Identify the shortest path between two nodes in a sparse network graph using a modified Dijkstra algorithm that catches negative cycles
- Negative cycles raise an exception if they are detected
- Note: This algorithm is guaranteed to find the shortest path or raise an exception if a negative cycle is detected
- Note: This algorithm requires computing the entire shortest path tree of the graph and is therefore not able to be terminated early
- For non negative weighted graphs, it is recommended to use the
dijkstraalgorithm instead
- For non negative weighted graphs, it is recommended to use the
Note: This should be fairly performant in general, however it does have a higher worst-case time complexity than Bellman-Ford
Return a dictionary of various path information including:
id_path: A list of node ids in the order they are visitedpath: A list of node dictionaries (lat + long) in the order they are visited
Required Arguments:
origin_id- Type: int | set[int]
- What: The id(s) of the origin node(s) from the graph dictionary to start the shortest path from
destination_id- Type: int
- What: The id of the destination node from the graph dictionary to end the shortest path at
Optional Arguments:
cycle_check_iterations- Type: int | None
- What: The number of iterations to loop over before checking for negative cycles
- Default: None (Then set to the number of nodes in the graph)
272 def a_star( 273 self, 274 origin_id: int | set[int], 275 destination_id: int, 276 heuristic_fn: callable = None, 277 ) -> dict: 278 """ 279 Function: 280 281 - Identify the shortest path between two nodes in a sparse network graph using an A* extension of Makowski's modified Dijkstra algorithm 282 - Return a dictionary of various path information including: 283 - `id_path`: A list of node ids in the order they are visited 284 - `path`: A list of node dictionaries (lat + long) in the order they are visited 285 286 Required Arguments: 287 288 - `origin_id` 289 - Type: int | set[int] 290 - What: The id of the origin node from the graph dictionary to start the shortest path from 291 - `destination_id` 292 - Type: int 293 - What: The id of the destination node from the graph dictionary to end the shortest path at 294 - `heuristic_fn` 295 - Type: function 296 - What: A heuristic function that takes two node ids and returns an estimated distance between them 297 - Note: If None, returns the shortest path using Dijkstra's algorithm 298 - Default: None 299 300 Optional Arguments: 301 302 - None 303 """ 304 if heuristic_fn is None: 305 return self.dijkstra( 306 origin_id=origin_id, 307 destination_id=destination_id, 308 ) 309 # Input Validation 310 self.__input_check__(origin_id=origin_id, destination_id=destination_id) 311 origin_ids = {origin_id} if isinstance(origin_id, int) else origin_id 312 313 # Variable Initialization 314 distance_matrix = [float("inf")] * len(self.graph) 315 # Using a visited matrix does add a tad bit of overhead but avoids revisiting nodes 316 # and does not require anything extra to be stored in the heap 317 visited = [0] * len(self.graph) 318 open_leaves = [] 319 predecessor = [-1] * len(self.graph) 320 321 for oid in origin_ids: 322 distance_matrix[oid] = 0 323 heappush(open_leaves, (0, oid)) 324 325 while open_leaves: 326 current_id = heappop(open_leaves)[1] 327 if current_id == destination_id: 328 break 329 if visited[current_id] == 1: 330 continue 331 visited[current_id] = 1 332 current_distance = distance_matrix[current_id] 333 for connected_id, connected_distance in self.graph[ 334 current_id 335 ].items(): 336 possible_distance = current_distance + connected_distance 337 if possible_distance < distance_matrix[connected_id]: 338 distance_matrix[connected_id] = possible_distance 339 predecessor[connected_id] = current_id 340 heappush( 341 open_leaves, 342 ( 343 possible_distance 344 + heuristic_fn(connected_id, destination_id), 345 connected_id, 346 ), 347 ) 348 if current_id != destination_id: 349 raise Exception( 350 "Something went wrong, the origin and destination nodes are not connected." 351 ) 352 353 return { 354 "path": self.__reconstruct_path__(destination_id, predecessor), 355 "length": distance_matrix[destination_id], 356 }
Function:
- Identify the shortest path between two nodes in a sparse network graph using an A* extension of Makowski's modified Dijkstra algorithm
- Return a dictionary of various path information including:
id_path: A list of node ids in the order they are visitedpath: A list of node dictionaries (lat + long) in the order they are visited
Required Arguments:
origin_id- Type: int | set[int]
- What: The id of the origin node from the graph dictionary to start the shortest path from
destination_id- Type: int
- What: The id of the destination node from the graph dictionary to end the shortest path at
heuristic_fn- Type: function
- What: A heuristic function that takes two node ids and returns an estimated distance between them
- Note: If None, returns the shortest path using Dijkstra's algorithm
- Default: None
Optional Arguments:
- None
358 def bellman_ford( 359 self, 360 origin_id: int | set[int], 361 destination_id: int, 362 ) -> dict: 363 """ 364 Function: 365 366 - Identify the shortest path between two nodes in a sparse network graph using the Bellman-Ford algorithm 367 - Return a dictionary of various path information including: 368 - `id_path`: A list of node ids in the order they are visited 369 - `path`: A list of node dictionaries (lat + long) in the order they are visited 370 371 Required Arguments: 372 373 - `origin_id` 374 - Type: int | set[int] 375 - What: The id(s) of the origin node(s) from the graph dictionary to start the shortest path from 376 - `destination_id` 377 - Type: int 378 - What: The id of the destination node from the graph dictionary to end the shortest path at 379 380 Optional Arguments: 381 382 - None 383 """ 384 # Input Validation 385 self.__input_check__(origin_id=origin_id, destination_id=destination_id) 386 origin_ids = {origin_id} if isinstance(origin_id, int) else origin_id 387 # Variable Initialization 388 distance_matrix = [float("inf")] * len(self.graph) 389 predecessor = [-1] * len(self.graph) 390 391 for oid in origin_ids: 392 distance_matrix[oid] = 0 393 394 len_graph = len(self.graph) 395 for i in range(len_graph): 396 for current_id in range(len(self.graph)): 397 current_distance = distance_matrix[current_id] 398 for connected_id, connected_distance in self.graph[ 399 current_id 400 ].items(): 401 possible_distance = current_distance + connected_distance 402 if possible_distance < distance_matrix[connected_id]: 403 distance_matrix[connected_id] = possible_distance 404 predecessor[connected_id] = current_id 405 if i == len_graph - 1: 406 raise Exception( 407 "Graph contains a negative weight cycle" 408 ) 409 # Check if destination is reachable 410 if distance_matrix[destination_id] == float("inf"): 411 raise Exception( 412 "Something went wrong, the origin and destination nodes are not connected." 413 ) 414 415 return { 416 "path": self.__reconstruct_path__(destination_id, predecessor), 417 "length": distance_matrix[destination_id], 418 }
Function:
- Identify the shortest path between two nodes in a sparse network graph using the Bellman-Ford algorithm
- Return a dictionary of various path information including:
id_path: A list of node ids in the order they are visitedpath: A list of node dictionaries (lat + long) in the order they are visited
Required Arguments:
origin_id- Type: int | set[int]
- What: The id(s) of the origin node(s) from the graph dictionary to start the shortest path from
destination_id- Type: int
- What: The id of the destination node from the graph dictionary to end the shortest path at
Optional Arguments:
- None
420 def bmssp( 421 self, 422 origin_id: int | set[int], 423 destination_id: int, 424 use_constant_degree_graph: bool = True, 425 ): 426 """ 427 Function: 428 429 - Identify the shortest path between two nodes in a sparse network graph using the BMSSPy package 430 431 - This is now a wrapper for the BMSSPy package 432 - https://github.com/connor-makowski/bmsspy 433 - For backwards compatibility, this function wraps around the dijkstra function if BMSSPy is not installed 434 435 - Return a dictionary of various path information including: 436 - `id_path`: A list of node ids in the order they are visited 437 - `path`: A list of node dictionaries (lat + long) in the order they are visited 438 439 Required Arguments: 440 441 - `origin_id` 442 - Type: int | set[int] 443 - What: The id(s) of the origin node(s) from the graph dictionary to start the shortest path from 444 - `destination_id` 445 - Type: int 446 - What: The id of the destination node from the graph dictionary to end the shortest path at 447 - `use_constant_degree_graph` 448 - Type: bool 449 - What: Whether to convert the graph to a constant degree 2 graph prior to running the BMSSPy algorithm 450 - Default: True 451 452 453 Optional Arguments: 454 455 - None 456 """ 457 bmssp_graph = Bmssp( 458 graph=self.graph, 459 use_constant_degree_graph=use_constant_degree_graph, 460 ) 461 output = bmssp_graph.solve( 462 origin_id=origin_id, destination_id=destination_id 463 ) 464 return { 465 "path": output["path"], 466 "length": output["length"], 467 }
Function:
Identify the shortest path between two nodes in a sparse network graph using the BMSSPy package
This is now a wrapper for the BMSSPy package
- https://github.com/connor-makowski/bmsspy
- For backwards compatibility, this function wraps around the dijkstra function if BMSSPy is not installed
Return a dictionary of various path information including:
id_path: A list of node ids in the order they are visitedpath: A list of node dictionaries (lat + long) in the order they are visited
Required Arguments:
origin_id- Type: int | set[int]
- What: The id(s) of the origin node(s) from the graph dictionary to start the shortest path from
destination_id- Type: int
- What: The id of the destination node from the graph dictionary to end the shortest path at
use_constant_degree_graph- Type: bool
- What: Whether to convert the graph to a constant degree 2 graph prior to running the BMSSPy algorithm
- Default: True
Optional Arguments:
- None
469 def cached_shortest_path( 470 self, 471 origin_id: int, 472 destination_id: int, 473 length_only: bool = False, 474 ): 475 """ 476 Function: 477 478 - Get the shortest path between two nodes in the graph attempting to use a cached shortest path tree if available 479 - If a cached shortest path tree is not available, it will compute the shortest path tree and cache it for future use if specified by `cache` 480 - Uses the get_shortest_path_tree (Dijkstra) and get_tree_path functions internally 481 - Stores cached shortest path trees in a list (self.__cache__) where the index corresponds to the origin node id 482 - Note: If you modify this graph after caching shortest path trees, the cached trees may become invalid 483 - You can reset the cache by calling self.reset_cache() 484 - For efficiency, the cache is not automatically reset when the graph is modified 485 - This logic must be handled by the user 486 487 Requires: 488 489 - origin_id: The id of the origin node 490 - destination_id: The id of the destination node 491 492 Optional: 493 494 - length_only: If True, only returns the length of the path 495 """ 496 shortest_path_tree = self.__cache__[origin_id] 497 if shortest_path_tree == 0: 498 shortest_path_tree = self.get_shortest_path_tree( 499 origin_id=origin_id 500 ) 501 self.__cache__[origin_id] = shortest_path_tree 502 return self.get_tree_path( 503 origin_id=origin_id, 504 destination_id=destination_id, 505 tree_data=shortest_path_tree, 506 length_only=length_only, 507 )
Function:
- Get the shortest path between two nodes in the graph attempting to use a cached shortest path tree if available
- If a cached shortest path tree is not available, it will compute the shortest path tree and cache it for future use if specified by
cache - Uses the get_shortest_path_tree (Dijkstra) and get_tree_path functions internally
- Stores cached shortest path trees in a list (self.__cache__) where the index corresponds to the origin node id
- Note: If you modify this graph after caching shortest path trees, the cached trees may become invalid
- You can reset the cache by calling self.reset_cache()
- For efficiency, the cache is not automatically reset when the graph is modified
- This logic must be handled by the user
Requires:
- origin_id: The id of the origin node
- destination_id: The id of the destination node
Optional:
- length_only: If True, only returns the length of the path
509 def create_contraction_hierarchy( 510 self, heuristic_fn=None, ch_graph_kwargs=None 511 ) -> Any: 512 """ 513 Function: 514 515 - Create a Contraction Hierarchies (CH) graph from the current Graph object 516 - The CH graph is stored as an instance variable `self.ch_graph` 517 518 Optional Arguments: 519 520 - `heuristic_fn`: 521 - Type: function or None 522 - What: A heuristic function for CH preprocessing 523 - Default: None (uses default heuristic) 524 """ 525 if not hasattr(self, "__ch_graph__"): 526 ch_graph_kwargs = ( 527 ch_graph_kwargs if ch_graph_kwargs is not None else dict() 528 ) 529 self.__ch_graph__ = CHGraph( 530 graph=self.graph, heuristic_fn=heuristic_fn, **ch_graph_kwargs 531 )
Function:
- Create a Contraction Hierarchies (CH) graph from the current Graph object
- The CH graph is stored as an instance variable
self.ch_graph
Optional Arguments:
heuristic_fn:- Type: function or None
- What: A heuristic function for CH preprocessing
- Default: None (uses default heuristic)
533 def contraction_hierarchy( 534 self, origin_id: int, destination_id: int, length_only: bool = False 535 ) -> dict[str, Any]: 536 """ 537 Function: 538 539 - Get the shortest path between two nodes using the Contraction Hierarchies (CH) graph 540 - Creates the CH graph if it doesn't exist 541 542 Requires: 543 544 - origin_id: The id of the origin node 545 - destination_id: The id of the destination node 546 547 Optional: 548 549 - length_only: If True, only returns the length of the path (not implemented yet) 550 551 Returns: 552 553 - A dictionary with 'path' and 'length' keys containing the shortest path and its length 554 """ 555 # Ensure that the CH graph is created and warmed up 556 self.create_contraction_hierarchy() 557 return self.__ch_graph__.search(origin_id, destination_id)
Function:
- Get the shortest path between two nodes using the Contraction Hierarchies (CH) graph
- Creates the CH graph if it doesn't exist
Requires:
- origin_id: The id of the origin node
- destination_id: The id of the destination node
Optional:
- length_only: If True, only returns the length of the path (not implemented yet)
Returns:
- A dictionary with 'path' and 'length' keys containing the shortest path and its length
560class Graph(GraphUtils, GraphModifiers, GraphTrees, GraphAlgorithms): 561 def __init__(self, graph: list[dict[int, int | float]], validate=False): 562 """ 563 Function: 564 565 - Initialize a Graph object 566 - Validate the input graph 567 568 Required Arguments: 569 570 - `graph`: 571 - Type: list of dictionaries with integer keys and integer or float values 572 - What: A list of dictionaries where the indicies are origin node ids and the values are dictionaries of destination node indices and graph weights 573 - Note: All nodes must be included as origins in the graph regardless of if they have any connected destinations 574 - EG: 575 ``` 576 [ 577 # From London (index 0) 578 { 579 # To Paris (index 1) 580 1: 311, 581 }, 582 # From Paris (index 1) 583 { 584 # To London (index 0) 585 0: 311, 586 # To Berlin (index 2) 587 2: 878, 588 # To Rome (index 3) 589 3: 1439, 590 # To Madrid (index 4) 591 4: 1053 592 }, 593 # From Berlin (index 2) 594 { 595 # To Paris (index 1) 596 1: 878, 597 # To Rome (index 3) 598 3: 1181, 599 }, 600 # From Rome (index 3) 601 { 602 # To Paris (index 1) 603 1: 1439, 604 # To Berlin (index 2) 605 2: 1181, 606 }, 607 # From Madrid (index 4) 608 { 609 # To Paris (index 1) 610 1: 1053, 611 # To Lisbon (index 5) 612 5: 623 613 }, 614 # From Lisbon (index 5) 615 { 616 # To Madrid (index 4) 617 4: 623 618 } 619 ] 620 ``` 621 622 Optional Arguments: 623 624 - `validate`: 625 - Type: bool 626 - What: Whether to validate the input graph upon initialization 627 - Default: False 628 """ 629 self.graph = graph 630 self.reset_cache() 631 if validate: 632 self.validate()
561 def __init__(self, graph: list[dict[int, int | float]], validate=False): 562 """ 563 Function: 564 565 - Initialize a Graph object 566 - Validate the input graph 567 568 Required Arguments: 569 570 - `graph`: 571 - Type: list of dictionaries with integer keys and integer or float values 572 - What: A list of dictionaries where the indicies are origin node ids and the values are dictionaries of destination node indices and graph weights 573 - Note: All nodes must be included as origins in the graph regardless of if they have any connected destinations 574 - EG: 575 ``` 576 [ 577 # From London (index 0) 578 { 579 # To Paris (index 1) 580 1: 311, 581 }, 582 # From Paris (index 1) 583 { 584 # To London (index 0) 585 0: 311, 586 # To Berlin (index 2) 587 2: 878, 588 # To Rome (index 3) 589 3: 1439, 590 # To Madrid (index 4) 591 4: 1053 592 }, 593 # From Berlin (index 2) 594 { 595 # To Paris (index 1) 596 1: 878, 597 # To Rome (index 3) 598 3: 1181, 599 }, 600 # From Rome (index 3) 601 { 602 # To Paris (index 1) 603 1: 1439, 604 # To Berlin (index 2) 605 2: 1181, 606 }, 607 # From Madrid (index 4) 608 { 609 # To Paris (index 1) 610 1: 1053, 611 # To Lisbon (index 5) 612 5: 623 613 }, 614 # From Lisbon (index 5) 615 { 616 # To Madrid (index 4) 617 4: 623 618 } 619 ] 620 ``` 621 622 Optional Arguments: 623 624 - `validate`: 625 - Type: bool 626 - What: Whether to validate the input graph upon initialization 627 - Default: False 628 """ 629 self.graph = graph 630 self.reset_cache() 631 if validate: 632 self.validate()
Function:
- Initialize a Graph object
- Validate the input graph
Required Arguments:
-
- Type: list of dictionaries with integer keys and integer or float values
- What: A list of dictionaries where the indicies are origin node ids and the values are dictionaries of destination node indices and graph weights
- Note: All nodes must be included as origins in the graph regardless of if they have any connected destinations
- EG:
[ # From London (index 0) { # To Paris (index 1) 1: 311, }, # From Paris (index 1) { # To London (index 0) 0: 311, # To Berlin (index 2) 2: 878, # To Rome (index 3) 3: 1439, # To Madrid (index 4) 4: 1053 }, # From Berlin (index 2) { # To Paris (index 1) 1: 878, # To Rome (index 3) 3: 1181, }, # From Rome (index 3) { # To Paris (index 1) 1: 1439, # To Berlin (index 2) 2: 1181, }, # From Madrid (index 4) { # To Paris (index 1) 1: 1053, # To Lisbon (index 5) 5: 623 }, # From Lisbon (index 5) { # To Madrid (index 4) 4: 623 } ]
Optional Arguments:
validate:- Type: bool
- What: Whether to validate the input graph upon initialization
- Default: False