scgraph.grid
1from scgraph.graph import Graph 2from scgraph.helpers.shape_mover_utils import ShapeMoverUtils 3from scgraph.cache import CacheGraph 4from typing import Literal 5 6 7class GridGraph: 8 def __init__( 9 self, 10 x_size: int, 11 y_size: int, 12 blocks: list[tuple[int, int]], 13 shape: list[tuple[int | float, int | float]] | None = None, 14 add_exterior_walls: bool = True, 15 conn_data: list[tuple[int, int, float]] | None = None, 16 ) -> None: 17 """ 18 Initializes a GridGraph object. 19 20 Required Arguments: 21 22 - `x_size` 23 - Type: int 24 - What: The size of the grid in the x direction 25 - `y_size` 26 - Type: int 27 - What: The size of the grid in the y direction 28 - `blocks` 29 - Type: list of tuples 30 - What: A list of tuples representing the coordinates of the blocked cells 31 32 Optional Arguments: 33 34 - `shape` 35 - Type: list of tuples 36 - What: A list of tuples representing the shape of the moving object relative to the object location in the grid 37 - Default: [(0, 0), (0, 1), (1, 0), (1, 1)] 38 - Example: [(0, 0), (0, 1), (1, 0), (1, 1)] 39 - Note: This would form a square of size 1x1 40 - Assuming the grid location is at (0,0) the square takes up the grid space between (0, 0) and (1, 1) 41 - Assumint the grid location is at (1,1) the square takes up the grid space between (1, 1) and (2, 2) with the same shape 42 - Note: This shape is used to determine which connections are allowed when passing through the grid. 43 - For example 44 - The shape is a square of size 1x1 45 - The location of the square is (0, 0) 46 - There is a blocked cell at (0, 1) 47 - There is a potential connection to (1, 1), (0, 1), and (1, 0) 48 - The square can not move to (1, 1) as it would collide with the blocked cell at (0, 1) 49 - This is because the square would pass partly through the blocked cell on its way to (1, 1) 50 - To achieve the same result, it can move to (1, 0) and then up to (1, 1) taking extra time / distance 51 - `add_exterior_walls` 52 - Type: bool 53 - What: Whether to add exterior walls to the grid 54 - Default: True 55 - `conn_data` 56 - Type: list of tuples 57 - What: A list of tuples representing movements allowed in the grid 58 - Each tuple should contain three values: (x_offset, y_offset, distance) 59 - x_offset: The x offset from the current cell 60 - y_offset: The y offset from the current cell 61 - distance: The distance to the connected cell 62 - Default: [(0, 1, 1), (1, 0, 1), (0, -1, 1), (-1, 0, 1), (1, 1, sqrt(2)), (-1, -1, sqrt(2)), (1, -1, sqrt(2)), (-1, 1, sqrt(2))] 63 - Note: If None, the default connection data will be used 64 - Note: This includes 8 directions, 4 cardinal and 4 diagonal 65 - Cardinal directions are 1 unit away, diagonal directions are sqrt(2) units away 66 """ 67 # Validate the inputs 68 assert x_size > 0, "x_size must be greater than 0" 69 assert y_size > 0, "y_size must be greater than 0" 70 71 self.x_size = x_size 72 self.y_size = y_size 73 self.blocks = blocks 74 self.add_exterior_walls = add_exterior_walls 75 76 # Update the blocks if add_exterior_walls is True 77 # This is done by adding all the cells in the first and last row and column 78 if add_exterior_walls: 79 self.blocks += [ 80 (x, y) for x in range(x_size) for y in [0, y_size - 1] 81 ] + [(x, y) for x in [0, x_size - 1] for y in range(y_size)] 82 83 # If shape is None, set the default shape 84 if shape is None: 85 self.shape = [(0, 0), (0, 1), (1, 0), (1, 1)] 86 else: 87 self.shape = shape 88 89 # If conn_data is None, set the default connection data 90 if conn_data is None: 91 sqrt_2 = 1.4142 92 self.conn_data = [ 93 (-1, -1, sqrt_2), 94 (-1, 0, 1), 95 (-1, 1, sqrt_2), 96 (0, -1, 1), 97 (0, 1, 1), 98 (1, -1, sqrt_2), 99 (1, 0, 1), 100 (1, 1, sqrt_2), 101 ] 102 else: 103 self.conn_data = conn_data 104 105 self.graph = self.__create_graph__() 106 self.nodes = [self.__get_x_y__(idx) for idx in range(len(self.graph))] 107 self.cacheGraph = CacheGraph(self.graph) 108 109 def __get_idx__(self, x: int, y: int): 110 """ 111 Function: 112 - Get the index of a cell in a 2D grid given its x and y coordinates 113 - This does not have any error checking, so be careful with the inputs 114 115 Required Arguments: 116 - `x` 117 - Type: int 118 - What: The x coordinate of the cell 119 - `y` 120 - Type: int 121 - What: The y coordinate of the cell 122 123 Returns: 124 125 - `idx` 126 - Type: int 127 - What: The index of the cell in the grid 128 """ 129 return x + y * self.x_size 130 131 def __get_x_y__(self, idx: int): 132 """ 133 Function: 134 - Get the x and y coordinates of a cell in a 2D grid given its index 135 - This does not have any error checking so be careful with the inputs 136 137 Required Arguments: 138 139 - `idx` 140 - Type: int 141 - What: The index of the cell 142 143 Returns: 144 - `x` 145 - Type: int 146 - What: The x coordinate of the cell 147 - `y` 148 - Type: int 149 - What: The y coordinate of the cell 150 """ 151 return idx % self.x_size, idx // self.x_size 152 153 def get_idx(self, x: int, y: int): 154 """ 155 Function: 156 - Get the index of a cell in a 2D grid given its x and y coordinates 157 158 Required Arguments: 159 - `x` 160 - Type: int 161 - What: The x coordinate of the cell 162 - `y` 163 - Type: int 164 - What: The y coordinate of the cell 165 166 Returns: 167 168 - `idx` 169 - Type: int 170 - What: The index of the cell in the grid 171 """ 172 assert x >= 0 and x < self.x_size, "x coordinate is out of bounds" 173 assert y >= 0 and y < self.y_size, "y coordinate is out of bounds" 174 return x + y * self.x_size 175 176 def get_x_y(self, idx: int): 177 """ 178 Function: 179 - Get the x and y coordinates of a cell in a 2D grid given its index 180 181 Required Arguments: 182 183 - `idx` 184 - Type: int 185 - What: The index of the cell 186 187 Returns: 188 - `x` 189 - Type: int 190 - What: The x coordinate of the cell 191 - `y` 192 - Type: int 193 - What: The y coordinate of the cell 194 """ 195 assert idx >= 0 and idx < len(self.graph), "idx is out of bounds" 196 return idx % self.x_size, idx // self.x_size 197 198 def __get_relative_shape_offsets__( 199 self, 200 shape: list[tuple[int | float, int | float]], 201 x_off: int, 202 y_off: int, 203 ): 204 return list( 205 ShapeMoverUtils.moving_shape_overlap_intervals( 206 x_coord=0, 207 y_coord=0, 208 x_shift=x_off, 209 y_shift=y_off, 210 t_start=0, 211 t_end=1, 212 shape=shape, 213 ).keys() 214 ) 215 216 def __create_graph__( 217 self, 218 ): 219 """ 220 Function: 221 222 - Create a graph from the grid specifications 223 224 Returns: 225 226 - `graph` 227 - Type: list 228 - What: The adjacency list representation of the graph 229 """ 230 #################### 231 # Create a list of lists to hold all the possible connections in an easy to access/understand format 232 #################### 233 234 # Each cell in the list will be a dictionary of connections 235 # The keys will be the coordinates of the cell that can be moved to 236 # The values will be the distance to that cell 237 # This essentially creates a sparse matrix 238 graph_matrix = [ 239 [{} for _ in range(self.x_size)] for _ in range(self.y_size) 240 ] 241 for y in range(self.y_size): 242 for x in range(self.x_size): 243 for x_off, y_off, dist in self.conn_data: 244 if ( 245 0 <= x + x_off < self.x_size 246 and 0 <= y + y_off < self.y_size 247 ): 248 graph_matrix[y][x][(x + x_off, y + y_off)] = dist 249 250 #################### 251 # Remove all connections that would not be possible based on the blocks and the shape 252 #################### 253 254 # Create a set of offsets that will be used to help remove connections relative to each blocked cell 255 delta_offsets = { 256 (x_delta, y_delta): self.__get_relative_shape_offsets__( 257 self.shape, x_delta, y_delta 258 ) 259 for x_delta, y_delta, _ in self.conn_data 260 } 261 262 # Loop over all the blocks and remove all connections that would be not possible based on the blocks 263 for block in self.blocks: 264 # Clear out all connections leaving from the blocked cell 265 graph_matrix[block[1]][block[0]] = {} 266 # Clear out all connections that would be blocked by each movement given the blocked cell and the delta offsets 267 # Our delta offsets dictionary essentially tells us which relative cells would be blocked by the moving shape 268 for delta, offsets in delta_offsets.items(): 269 for offset in offsets: 270 x_cell = block[0] - offset[0] 271 y_cell = block[1] - offset[1] 272 if 0 <= x_cell < self.x_size and 0 <= y_cell < self.y_size: 273 x_move_to = x_cell + delta[0] 274 y_move_to = y_cell + delta[1] 275 graph_matrix[y_cell][x_cell].pop( 276 (x_move_to, y_move_to), None 277 ) 278 279 #################### 280 # Convert the graph matrix into the very efficient graph format as specified by the scgraph library 281 #################### 282 graph = [] 283 for x_data in graph_matrix: 284 for connection_dict in x_data: 285 graph.append( 286 { 287 self.__get_idx__(x, y): dist 288 for (x, y), dist in connection_dict.items() 289 } 290 ) 291 292 return graph 293 294 def euclidean_heuristic(self, origin_id: int, destination_id: int) -> float: 295 """ 296 Function: 297 298 - Calculate the Euclidean distance between two nodes in the grid graph 299 300 Required Arguments: 301 302 - `origin_id` 303 - Type: int 304 - What: The id of the origin node 305 - `destination_id` 306 - Type: int 307 - What: The id of the destination node 308 309 Returns: 310 311 - `distance` 312 - Type: float 313 - What: The Euclidean distance between the two nodes 314 """ 315 origin_location = self.nodes[origin_id] 316 destination_location = self.nodes[destination_id] 317 return ( 318 (origin_location[0] - destination_location[0]) ** 2 319 + (origin_location[1] - destination_location[1]) ** 2 320 ) ** 0.5 321 322 def __get_closest_node_with_connections__( 323 self, 324 x: int | float, 325 y: int | float, 326 ): 327 """ 328 Function: 329 330 - Get the closest node in the graph that has connections 331 332 Required Arguments: 333 334 - `x` 335 - Type: int | float 336 - What: The x coordinate of the node 337 - `y` 338 - Type: int | float 339 - What: The y coordinate of the node 340 341 Returns: 342 343 - `closest_node_id` 344 - Type: int 345 - What: The id of the closest node with connections 346 - `closest_distance` 347 - Type: float 348 - What: The distance to the closest node with connections 349 """ 350 x = int(x) if int(x) == x else x 351 y = int(y) if int(y) == y else y 352 if isinstance(x, int) and isinstance(y, int): 353 return self.get_idx(x, y), 0.0 354 closest_node_id = None 355 closest_distance = float("inf") 356 for x_off, y_off in [(0, 0), (1, 0), (0, 1), (1, 1)]: 357 node_x = int(x) + x_off 358 node_y = int(y) + y_off 359 try: 360 node_id = self.get_idx(node_x, node_y) 361 except AssertionError: 362 continue 363 if self.graph[node_id] == {}: 364 continue 365 distance = ((node_x - x) ** 2 + (node_y - y) ** 2) ** 0.5 366 if distance < closest_distance: 367 closest_distance = distance 368 closest_node_id = node_id 369 if closest_node_id is None: 370 raise ValueError( 371 "No valid adjacent node with connections found for the given coordinates" 372 ) 373 return closest_node_id, closest_distance 374 375 def format_coordinate(self, cooinate_dict, output_coorinate_path): 376 if output_coorinate_path == "list_of_tuples": 377 return (cooinate_dict["x"], cooinate_dict["y"]) 378 elif output_coorinate_path == "list_of_lists": 379 return [cooinate_dict["x"], cooinate_dict["y"]] 380 elif output_coordinate_path == "list_of_dicts": 381 return {"x": cooinate_dict["x"], "y": cooinate_dict["y"]} 382 else: 383 raise ValueError("Invalid output_coordinate_path format") 384 385 def get_shortest_path( 386 self, 387 origin_node: ( 388 dict[Literal["x", "y"], int | float] 389 | tuple[int | float, int | float] 390 | list[int | float] 391 ), 392 destination_node: ( 393 dict[Literal["x", "y"], int | float] 394 | tuple[int | float, int | float] 395 | list[int | float] 396 ), 397 output_coordinate_path: str = "list_of_dicts", 398 cache: bool = False, 399 cache_for: str = "origin", 400 output_path: bool = False, 401 heuristic_fn: callable | Literal["euclidean"] | None = "euclidean", 402 algorithm_fn: callable = Graph.a_star, 403 algorithm_kwargs: dict | None = None, 404 **kwargs, 405 ) -> dict: 406 """ 407 Function: 408 409 - Identify the shortest path between two nodes in a sparse network graph 410 - If an off graph origin and/or destination node is provided, it will find the closest adjacent node with connections 411 as the origin and/or destination node 412 - This is done to increase the likelihood that the pathfinding algorithm can find a valid path 413 - If no valid adjacent node is found, an error will be raised 414 415 - Return a dictionary of various path information including: 416 - `path`: A list of graph ids in the order they are visited 417 - `coordinate_path`: A list of dicts (x, y) in the order they are visited 418 - `length`: The length of the path 419 420 Required Arguments: 421 422 - `origin_node` 423 - Type: dict of int | float 424 - What: A dictionary with the keys 'x' and 'y' 425 - Alternatively, a tuple or list of two values (x, y) can be used 426 - `destination_node` 427 - Type: dict of int | float 428 - What: A dictionary with the keys 'x' and 'y' 429 - Alternatively, a tuple or list of two values (x, y) can be used 430 431 Optional Arguments: 432 433 - `output_coordinate_path` 434 - Type: str 435 - What: The format of the output coordinate path 436 - Default: 'list_of_lists' 437 - Options: 438 - `list_of_tuples`: A list of tuples with the first value being x and the second being y 439 - 'list_of_dicts': A list of dictionaries with keys 'x' and 'y' 440 - 'list_of_lists': A list of lists with the first value being x and the second being y 441 - `cache` 442 - Type: bool 443 - What: Whether to use the cache (save and reuse the spanning tree) 444 - Default: False 445 - `cache_for` 446 - Type: str 447 - What: Whether to cache the spanning tree for the origin or destination node if `cache` is True 448 - Default: 'origin' 449 - Options: 'origin', 'destination' 450 - `output_path` 451 - Type: bool 452 - What: Whether to output the path as a list of graph idxs (mostly for debugging purposes) 453 - Default: False 454 - `heuristic_fn` 455 - Type: callable | Literal['euclidean'] | None 456 - What: A heuristic function to use for the A* algorithm if caching is False 457 - Default: 'euclidean' (A predefined heuristic function that calculates the Euclidean distance for this grid graph) 458 - If None, the A* algorithm will default to Dijkstra's algorithm 459 - If a callable is provided, it should take two arguments: origin_id and destination_id and return a float representing the heuristic distance between the two nodes 460 - Note: This distance should never be greater than the actual distance between the two nodes or you may get suboptimal paths 461 - Note: This is deprecated and will be removed in a future version 462 - `algorithm_fn` 463 - Type: callable | None 464 - What: The algorithm to use for pathfinding 465 - Default: 'a_star' 466 - If None, the default algorithm will be used 467 - `algorithm_kwargs` 468 - Type: dict 469 - What: Additional keyword arguments to pass to the algorithm function 470 - Default: {} 471 - `**kwargs` 472 - Additional keyword arguments. These are included for forwards and backwards compatibility reasons, but are not currently used. 473 """ 474 if isinstance(origin_node, (tuple, list)): 475 origin_node = {"x": origin_node[0], "y": origin_node[1]} 476 if isinstance(destination_node, (tuple, list)): 477 destination_node = { 478 "x": destination_node[0], 479 "y": destination_node[1], 480 } 481 if algorithm_kwargs is None: 482 algorithm_kwargs = {} 483 484 origin_id, origin_distance = self.__get_closest_node_with_connections__( 485 **origin_node 486 ) 487 destination_id, destination_distance = ( 488 self.__get_closest_node_with_connections__(**destination_node) 489 ) 490 491 if self.graph[origin_id] == {}: 492 raise ValueError( 493 "Origin node is not connected to any other nodes. This is likely caused by the origin node not being possible given a blocked cell or nearby blocked cell" 494 ) 495 if self.graph[destination_id] == {}: 496 raise ValueError( 497 "Destination node is not connected to any other nodes. This is likely caused by the destination node not being possible given a blocked cell or nearby blocked cell" 498 ) 499 if cache: 500 if cache_for not in ["origin", "destination"]: 501 raise ValueError( 502 "cache_for must be 'origin' or 'destination' when cache is True" 503 ) 504 # Reverse for cache graphing if cache_for is destination since the graph is undirected 505 if cache_for == "destination": 506 origin_id, destination_id = destination_id, origin_id 507 output = self.cacheGraph.get_shortest_path( 508 origin_id=origin_id, 509 destination_id=destination_id, 510 ) 511 # Undo the reverse if cache_for is destination 512 if cache_for == "destination": 513 output["path"].reverse() 514 origin_id, destination_id = destination_id, origin_id 515 else: 516 # TODO: Remove this backwards compatibility hack in future versions 517 if algorithm_fn == Graph.a_star: 518 if "heuristic_fn" not in algorithm_kwargs: 519 algorithm_kwargs["heuristic_fn"] = ( 520 self.euclidean_heuristic 521 if heuristic_fn == "euclidean" 522 else heuristic_fn 523 ) 524 output = algorithm_fn( 525 graph=self.graph, 526 origin_id=origin_id, 527 destination_id=destination_id, 528 **algorithm_kwargs, 529 ) 530 output["coordinate_path"] = self.get_coordinate_path( 531 output["path"], output_coordinate_path 532 ) 533 if origin_distance > 0: 534 output["coordinate_path"] = [ 535 self.format_coordinate(origin_node, output_coordinate_path) 536 ] + output["coordinate_path"] 537 output["path"] = [-1] + output["path"] 538 output["length"] += origin_distance 539 if destination_distance > 0: 540 output["coordinate_path"].append( 541 self.format_coordinate(destination_node, output_coordinate_path) 542 ) 543 output["path"].append(-1) 544 output["length"] += destination_distance 545 if not output_path: 546 del output["path"] 547 return output 548 549 def get_coordinate_path( 550 self, path: list[int], output_coordinate_path: str = "list_of_dicts" 551 ) -> list[dict[str, int]]: 552 """ 553 Function: 554 555 - Return a list of node dictionaries (lat + long) in the order they are visited 556 557 Required Arguments: 558 559 - `path` 560 - Type: list 561 - What: A list of node ids in the order they are visited 562 563 Optional Arguments: 564 565 - `output_coordinate_path` 566 - Type: str 567 - What: The format of the output coordinate path 568 - Default: 'list_of_dicts' 569 - Options: 570 - 'list_of_dicts': A list of dictionaries with keys 'x' and 'y' 571 - 'list_of_lists': A list of lists with the first value being x and the second being y 572 - `list_of_tuples`: A list of tuples with the first value being x and the second being y 573 574 Returns: 575 576 - `coordinate_path` 577 - Type: list 578 - What: A list of dictionaries with keys 'x' and 'y' in the order they are visited 579 """ 580 output = [self.__get_x_y__(idx) for idx in path] 581 if output_coordinate_path == "list_of_tuples": 582 return output 583 elif output_coordinate_path == "list_of_lists": 584 return [[x, y] for x, y in output] 585 elif output_coordinate_path == "list_of_dicts": 586 return [{"x": x, "y": y} for x, y in output] 587 else: 588 raise ValueError( 589 "output_coordinate_path must be 'list_of_dicts' or 'list_of_lists' or 'list_of_tuples'" 590 ) 591 592 def export_object( 593 self, filename: str = "", include_blocks: bool = False 594 ) -> dict: 595 """ 596 Function: 597 598 - Export the graph as a list of dictionaries 599 600 Arguments: 601 602 - `filename` 603 - Type: str 604 - What: The name of the file to export the graph to. 605 - An extension of .gridgraph will be added to the file name if not already present 606 607 Optional Arguments: 608 609 - `include_blocks` 610 - Type: bool 611 - What: Whether to include the blocks in the export 612 - Default: False 613 - Note: This is not needed as the graph is already created 614 - Note: You can include blocks in the export if you need them for some reason 615 - This will be set as the blocks attribute in the imported object 616 - This will increase the size of the export file 617 """ 618 if filename == "": 619 raise ValueError("filename must be specified to export the graph") 620 filename = ( 621 filename 622 if filename.endswith(".gridgraph") 623 else filename + ".gridgraph" 624 ) 625 try: 626 import pickle, zlib, os 627 except ImportError: 628 raise ImportError( 629 "os, pickle and zlib are required to export the graph" 630 ) 631 export_data = { 632 "graph_attributes": { 633 "graph": self.graph, 634 "nodes": self.nodes, 635 "x_size": self.x_size, 636 "y_size": self.y_size, 637 "shape": self.shape, 638 "add_exterior_walls": self.add_exterior_walls, 639 "conn_data": self.conn_data, 640 }, 641 "graph_cache": self.cacheGraph.cache, 642 "export_version": 1, 643 } 644 if include_blocks: 645 export_data["graph_attributes"]["blocks"] = self.blocks 646 647 if not os.path.exists(os.path.dirname(filename)): 648 os.makedirs(os.path.dirname(filename)) 649 with open(filename, "wb") as f: 650 f.write(zlib.compress(pickle.dumps(export_data))) 651 652 @staticmethod 653 def import_object(filename: str = "") -> None: 654 """ 655 Function: 656 657 - A staticmethod to import the graph from a file 658 659 Arguments: 660 661 - `filename` 662 - Type: str 663 - What: The name of the file to import the graph from. 664 - An extension of .gridgraph will be added to the file name if not already present 665 666 Returns: 667 668 - `GridGraph` 669 - Type: GridGraph 670 - What: The imported graph object 671 """ 672 if filename == "": 673 raise ValueError("filename must be specified to import the graph") 674 filename = ( 675 filename 676 if filename.endswith(".gridgraph") 677 else filename + ".gridgraph" 678 ) 679 try: 680 import pickle, zlib 681 except ImportError: 682 raise ImportError( 683 "pickle and zlib are required to import the graph" 684 ) 685 with open(filename, "rb") as f: 686 import_data = pickle.loads(zlib.decompress(f.read())) 687 688 # Check the version of the export 689 if import_data["export_version"] != 1: 690 raise ValueError( 691 f"Incompatible export version {import_data['export_version']}. The current version is 1" 692 ) 693 # Create a new basic GridGraph object and overwrite the attributes with the imported data 694 # This is done as the init method calls the __create_graph__ method which is not needed here 695 GridGraph_object = GridGraph( 696 x_size=1, y_size=1, blocks=[], add_exterior_walls=False 697 ) 698 for key, value in import_data["graph_attributes"].items(): 699 GridGraph_object.__setattr__(key, value) 700 701 GridGraph_object.cacheGraph = CacheGraph(GridGraph_object.graph) 702 GridGraph_object.cacheGraph.cache = import_data["graph_cache"] 703 return GridGraph_object
class
GridGraph:
8class GridGraph: 9 def __init__( 10 self, 11 x_size: int, 12 y_size: int, 13 blocks: list[tuple[int, int]], 14 shape: list[tuple[int | float, int | float]] | None = None, 15 add_exterior_walls: bool = True, 16 conn_data: list[tuple[int, int, float]] | None = None, 17 ) -> None: 18 """ 19 Initializes a GridGraph object. 20 21 Required Arguments: 22 23 - `x_size` 24 - Type: int 25 - What: The size of the grid in the x direction 26 - `y_size` 27 - Type: int 28 - What: The size of the grid in the y direction 29 - `blocks` 30 - Type: list of tuples 31 - What: A list of tuples representing the coordinates of the blocked cells 32 33 Optional Arguments: 34 35 - `shape` 36 - Type: list of tuples 37 - What: A list of tuples representing the shape of the moving object relative to the object location in the grid 38 - Default: [(0, 0), (0, 1), (1, 0), (1, 1)] 39 - Example: [(0, 0), (0, 1), (1, 0), (1, 1)] 40 - Note: This would form a square of size 1x1 41 - Assuming the grid location is at (0,0) the square takes up the grid space between (0, 0) and (1, 1) 42 - Assumint the grid location is at (1,1) the square takes up the grid space between (1, 1) and (2, 2) with the same shape 43 - Note: This shape is used to determine which connections are allowed when passing through the grid. 44 - For example 45 - The shape is a square of size 1x1 46 - The location of the square is (0, 0) 47 - There is a blocked cell at (0, 1) 48 - There is a potential connection to (1, 1), (0, 1), and (1, 0) 49 - The square can not move to (1, 1) as it would collide with the blocked cell at (0, 1) 50 - This is because the square would pass partly through the blocked cell on its way to (1, 1) 51 - To achieve the same result, it can move to (1, 0) and then up to (1, 1) taking extra time / distance 52 - `add_exterior_walls` 53 - Type: bool 54 - What: Whether to add exterior walls to the grid 55 - Default: True 56 - `conn_data` 57 - Type: list of tuples 58 - What: A list of tuples representing movements allowed in the grid 59 - Each tuple should contain three values: (x_offset, y_offset, distance) 60 - x_offset: The x offset from the current cell 61 - y_offset: The y offset from the current cell 62 - distance: The distance to the connected cell 63 - Default: [(0, 1, 1), (1, 0, 1), (0, -1, 1), (-1, 0, 1), (1, 1, sqrt(2)), (-1, -1, sqrt(2)), (1, -1, sqrt(2)), (-1, 1, sqrt(2))] 64 - Note: If None, the default connection data will be used 65 - Note: This includes 8 directions, 4 cardinal and 4 diagonal 66 - Cardinal directions are 1 unit away, diagonal directions are sqrt(2) units away 67 """ 68 # Validate the inputs 69 assert x_size > 0, "x_size must be greater than 0" 70 assert y_size > 0, "y_size must be greater than 0" 71 72 self.x_size = x_size 73 self.y_size = y_size 74 self.blocks = blocks 75 self.add_exterior_walls = add_exterior_walls 76 77 # Update the blocks if add_exterior_walls is True 78 # This is done by adding all the cells in the first and last row and column 79 if add_exterior_walls: 80 self.blocks += [ 81 (x, y) for x in range(x_size) for y in [0, y_size - 1] 82 ] + [(x, y) for x in [0, x_size - 1] for y in range(y_size)] 83 84 # If shape is None, set the default shape 85 if shape is None: 86 self.shape = [(0, 0), (0, 1), (1, 0), (1, 1)] 87 else: 88 self.shape = shape 89 90 # If conn_data is None, set the default connection data 91 if conn_data is None: 92 sqrt_2 = 1.4142 93 self.conn_data = [ 94 (-1, -1, sqrt_2), 95 (-1, 0, 1), 96 (-1, 1, sqrt_2), 97 (0, -1, 1), 98 (0, 1, 1), 99 (1, -1, sqrt_2), 100 (1, 0, 1), 101 (1, 1, sqrt_2), 102 ] 103 else: 104 self.conn_data = conn_data 105 106 self.graph = self.__create_graph__() 107 self.nodes = [self.__get_x_y__(idx) for idx in range(len(self.graph))] 108 self.cacheGraph = CacheGraph(self.graph) 109 110 def __get_idx__(self, x: int, y: int): 111 """ 112 Function: 113 - Get the index of a cell in a 2D grid given its x and y coordinates 114 - This does not have any error checking, so be careful with the inputs 115 116 Required Arguments: 117 - `x` 118 - Type: int 119 - What: The x coordinate of the cell 120 - `y` 121 - Type: int 122 - What: The y coordinate of the cell 123 124 Returns: 125 126 - `idx` 127 - Type: int 128 - What: The index of the cell in the grid 129 """ 130 return x + y * self.x_size 131 132 def __get_x_y__(self, idx: int): 133 """ 134 Function: 135 - Get the x and y coordinates of a cell in a 2D grid given its index 136 - This does not have any error checking so be careful with the inputs 137 138 Required Arguments: 139 140 - `idx` 141 - Type: int 142 - What: The index of the cell 143 144 Returns: 145 - `x` 146 - Type: int 147 - What: The x coordinate of the cell 148 - `y` 149 - Type: int 150 - What: The y coordinate of the cell 151 """ 152 return idx % self.x_size, idx // self.x_size 153 154 def get_idx(self, x: int, y: int): 155 """ 156 Function: 157 - Get the index of a cell in a 2D grid given its x and y coordinates 158 159 Required Arguments: 160 - `x` 161 - Type: int 162 - What: The x coordinate of the cell 163 - `y` 164 - Type: int 165 - What: The y coordinate of the cell 166 167 Returns: 168 169 - `idx` 170 - Type: int 171 - What: The index of the cell in the grid 172 """ 173 assert x >= 0 and x < self.x_size, "x coordinate is out of bounds" 174 assert y >= 0 and y < self.y_size, "y coordinate is out of bounds" 175 return x + y * self.x_size 176 177 def get_x_y(self, idx: int): 178 """ 179 Function: 180 - Get the x and y coordinates of a cell in a 2D grid given its index 181 182 Required Arguments: 183 184 - `idx` 185 - Type: int 186 - What: The index of the cell 187 188 Returns: 189 - `x` 190 - Type: int 191 - What: The x coordinate of the cell 192 - `y` 193 - Type: int 194 - What: The y coordinate of the cell 195 """ 196 assert idx >= 0 and idx < len(self.graph), "idx is out of bounds" 197 return idx % self.x_size, idx // self.x_size 198 199 def __get_relative_shape_offsets__( 200 self, 201 shape: list[tuple[int | float, int | float]], 202 x_off: int, 203 y_off: int, 204 ): 205 return list( 206 ShapeMoverUtils.moving_shape_overlap_intervals( 207 x_coord=0, 208 y_coord=0, 209 x_shift=x_off, 210 y_shift=y_off, 211 t_start=0, 212 t_end=1, 213 shape=shape, 214 ).keys() 215 ) 216 217 def __create_graph__( 218 self, 219 ): 220 """ 221 Function: 222 223 - Create a graph from the grid specifications 224 225 Returns: 226 227 - `graph` 228 - Type: list 229 - What: The adjacency list representation of the graph 230 """ 231 #################### 232 # Create a list of lists to hold all the possible connections in an easy to access/understand format 233 #################### 234 235 # Each cell in the list will be a dictionary of connections 236 # The keys will be the coordinates of the cell that can be moved to 237 # The values will be the distance to that cell 238 # This essentially creates a sparse matrix 239 graph_matrix = [ 240 [{} for _ in range(self.x_size)] for _ in range(self.y_size) 241 ] 242 for y in range(self.y_size): 243 for x in range(self.x_size): 244 for x_off, y_off, dist in self.conn_data: 245 if ( 246 0 <= x + x_off < self.x_size 247 and 0 <= y + y_off < self.y_size 248 ): 249 graph_matrix[y][x][(x + x_off, y + y_off)] = dist 250 251 #################### 252 # Remove all connections that would not be possible based on the blocks and the shape 253 #################### 254 255 # Create a set of offsets that will be used to help remove connections relative to each blocked cell 256 delta_offsets = { 257 (x_delta, y_delta): self.__get_relative_shape_offsets__( 258 self.shape, x_delta, y_delta 259 ) 260 for x_delta, y_delta, _ in self.conn_data 261 } 262 263 # Loop over all the blocks and remove all connections that would be not possible based on the blocks 264 for block in self.blocks: 265 # Clear out all connections leaving from the blocked cell 266 graph_matrix[block[1]][block[0]] = {} 267 # Clear out all connections that would be blocked by each movement given the blocked cell and the delta offsets 268 # Our delta offsets dictionary essentially tells us which relative cells would be blocked by the moving shape 269 for delta, offsets in delta_offsets.items(): 270 for offset in offsets: 271 x_cell = block[0] - offset[0] 272 y_cell = block[1] - offset[1] 273 if 0 <= x_cell < self.x_size and 0 <= y_cell < self.y_size: 274 x_move_to = x_cell + delta[0] 275 y_move_to = y_cell + delta[1] 276 graph_matrix[y_cell][x_cell].pop( 277 (x_move_to, y_move_to), None 278 ) 279 280 #################### 281 # Convert the graph matrix into the very efficient graph format as specified by the scgraph library 282 #################### 283 graph = [] 284 for x_data in graph_matrix: 285 for connection_dict in x_data: 286 graph.append( 287 { 288 self.__get_idx__(x, y): dist 289 for (x, y), dist in connection_dict.items() 290 } 291 ) 292 293 return graph 294 295 def euclidean_heuristic(self, origin_id: int, destination_id: int) -> float: 296 """ 297 Function: 298 299 - Calculate the Euclidean distance between two nodes in the grid graph 300 301 Required Arguments: 302 303 - `origin_id` 304 - Type: int 305 - What: The id of the origin node 306 - `destination_id` 307 - Type: int 308 - What: The id of the destination node 309 310 Returns: 311 312 - `distance` 313 - Type: float 314 - What: The Euclidean distance between the two nodes 315 """ 316 origin_location = self.nodes[origin_id] 317 destination_location = self.nodes[destination_id] 318 return ( 319 (origin_location[0] - destination_location[0]) ** 2 320 + (origin_location[1] - destination_location[1]) ** 2 321 ) ** 0.5 322 323 def __get_closest_node_with_connections__( 324 self, 325 x: int | float, 326 y: int | float, 327 ): 328 """ 329 Function: 330 331 - Get the closest node in the graph that has connections 332 333 Required Arguments: 334 335 - `x` 336 - Type: int | float 337 - What: The x coordinate of the node 338 - `y` 339 - Type: int | float 340 - What: The y coordinate of the node 341 342 Returns: 343 344 - `closest_node_id` 345 - Type: int 346 - What: The id of the closest node with connections 347 - `closest_distance` 348 - Type: float 349 - What: The distance to the closest node with connections 350 """ 351 x = int(x) if int(x) == x else x 352 y = int(y) if int(y) == y else y 353 if isinstance(x, int) and isinstance(y, int): 354 return self.get_idx(x, y), 0.0 355 closest_node_id = None 356 closest_distance = float("inf") 357 for x_off, y_off in [(0, 0), (1, 0), (0, 1), (1, 1)]: 358 node_x = int(x) + x_off 359 node_y = int(y) + y_off 360 try: 361 node_id = self.get_idx(node_x, node_y) 362 except AssertionError: 363 continue 364 if self.graph[node_id] == {}: 365 continue 366 distance = ((node_x - x) ** 2 + (node_y - y) ** 2) ** 0.5 367 if distance < closest_distance: 368 closest_distance = distance 369 closest_node_id = node_id 370 if closest_node_id is None: 371 raise ValueError( 372 "No valid adjacent node with connections found for the given coordinates" 373 ) 374 return closest_node_id, closest_distance 375 376 def format_coordinate(self, cooinate_dict, output_coorinate_path): 377 if output_coorinate_path == "list_of_tuples": 378 return (cooinate_dict["x"], cooinate_dict["y"]) 379 elif output_coorinate_path == "list_of_lists": 380 return [cooinate_dict["x"], cooinate_dict["y"]] 381 elif output_coordinate_path == "list_of_dicts": 382 return {"x": cooinate_dict["x"], "y": cooinate_dict["y"]} 383 else: 384 raise ValueError("Invalid output_coordinate_path format") 385 386 def get_shortest_path( 387 self, 388 origin_node: ( 389 dict[Literal["x", "y"], int | float] 390 | tuple[int | float, int | float] 391 | list[int | float] 392 ), 393 destination_node: ( 394 dict[Literal["x", "y"], int | float] 395 | tuple[int | float, int | float] 396 | list[int | float] 397 ), 398 output_coordinate_path: str = "list_of_dicts", 399 cache: bool = False, 400 cache_for: str = "origin", 401 output_path: bool = False, 402 heuristic_fn: callable | Literal["euclidean"] | None = "euclidean", 403 algorithm_fn: callable = Graph.a_star, 404 algorithm_kwargs: dict | None = None, 405 **kwargs, 406 ) -> dict: 407 """ 408 Function: 409 410 - Identify the shortest path between two nodes in a sparse network graph 411 - If an off graph origin and/or destination node is provided, it will find the closest adjacent node with connections 412 as the origin and/or destination node 413 - This is done to increase the likelihood that the pathfinding algorithm can find a valid path 414 - If no valid adjacent node is found, an error will be raised 415 416 - Return a dictionary of various path information including: 417 - `path`: A list of graph ids in the order they are visited 418 - `coordinate_path`: A list of dicts (x, y) in the order they are visited 419 - `length`: The length of the path 420 421 Required Arguments: 422 423 - `origin_node` 424 - Type: dict of int | float 425 - What: A dictionary with the keys 'x' and 'y' 426 - Alternatively, a tuple or list of two values (x, y) can be used 427 - `destination_node` 428 - Type: dict of int | float 429 - What: A dictionary with the keys 'x' and 'y' 430 - Alternatively, a tuple or list of two values (x, y) can be used 431 432 Optional Arguments: 433 434 - `output_coordinate_path` 435 - Type: str 436 - What: The format of the output coordinate path 437 - Default: 'list_of_lists' 438 - Options: 439 - `list_of_tuples`: A list of tuples with the first value being x and the second being y 440 - 'list_of_dicts': A list of dictionaries with keys 'x' and 'y' 441 - 'list_of_lists': A list of lists with the first value being x and the second being y 442 - `cache` 443 - Type: bool 444 - What: Whether to use the cache (save and reuse the spanning tree) 445 - Default: False 446 - `cache_for` 447 - Type: str 448 - What: Whether to cache the spanning tree for the origin or destination node if `cache` is True 449 - Default: 'origin' 450 - Options: 'origin', 'destination' 451 - `output_path` 452 - Type: bool 453 - What: Whether to output the path as a list of graph idxs (mostly for debugging purposes) 454 - Default: False 455 - `heuristic_fn` 456 - Type: callable | Literal['euclidean'] | None 457 - What: A heuristic function to use for the A* algorithm if caching is False 458 - Default: 'euclidean' (A predefined heuristic function that calculates the Euclidean distance for this grid graph) 459 - If None, the A* algorithm will default to Dijkstra's algorithm 460 - If a callable is provided, it should take two arguments: origin_id and destination_id and return a float representing the heuristic distance between the two nodes 461 - Note: This distance should never be greater than the actual distance between the two nodes or you may get suboptimal paths 462 - Note: This is deprecated and will be removed in a future version 463 - `algorithm_fn` 464 - Type: callable | None 465 - What: The algorithm to use for pathfinding 466 - Default: 'a_star' 467 - If None, the default algorithm will be used 468 - `algorithm_kwargs` 469 - Type: dict 470 - What: Additional keyword arguments to pass to the algorithm function 471 - Default: {} 472 - `**kwargs` 473 - Additional keyword arguments. These are included for forwards and backwards compatibility reasons, but are not currently used. 474 """ 475 if isinstance(origin_node, (tuple, list)): 476 origin_node = {"x": origin_node[0], "y": origin_node[1]} 477 if isinstance(destination_node, (tuple, list)): 478 destination_node = { 479 "x": destination_node[0], 480 "y": destination_node[1], 481 } 482 if algorithm_kwargs is None: 483 algorithm_kwargs = {} 484 485 origin_id, origin_distance = self.__get_closest_node_with_connections__( 486 **origin_node 487 ) 488 destination_id, destination_distance = ( 489 self.__get_closest_node_with_connections__(**destination_node) 490 ) 491 492 if self.graph[origin_id] == {}: 493 raise ValueError( 494 "Origin node is not connected to any other nodes. This is likely caused by the origin node not being possible given a blocked cell or nearby blocked cell" 495 ) 496 if self.graph[destination_id] == {}: 497 raise ValueError( 498 "Destination node is not connected to any other nodes. This is likely caused by the destination node not being possible given a blocked cell or nearby blocked cell" 499 ) 500 if cache: 501 if cache_for not in ["origin", "destination"]: 502 raise ValueError( 503 "cache_for must be 'origin' or 'destination' when cache is True" 504 ) 505 # Reverse for cache graphing if cache_for is destination since the graph is undirected 506 if cache_for == "destination": 507 origin_id, destination_id = destination_id, origin_id 508 output = self.cacheGraph.get_shortest_path( 509 origin_id=origin_id, 510 destination_id=destination_id, 511 ) 512 # Undo the reverse if cache_for is destination 513 if cache_for == "destination": 514 output["path"].reverse() 515 origin_id, destination_id = destination_id, origin_id 516 else: 517 # TODO: Remove this backwards compatibility hack in future versions 518 if algorithm_fn == Graph.a_star: 519 if "heuristic_fn" not in algorithm_kwargs: 520 algorithm_kwargs["heuristic_fn"] = ( 521 self.euclidean_heuristic 522 if heuristic_fn == "euclidean" 523 else heuristic_fn 524 ) 525 output = algorithm_fn( 526 graph=self.graph, 527 origin_id=origin_id, 528 destination_id=destination_id, 529 **algorithm_kwargs, 530 ) 531 output["coordinate_path"] = self.get_coordinate_path( 532 output["path"], output_coordinate_path 533 ) 534 if origin_distance > 0: 535 output["coordinate_path"] = [ 536 self.format_coordinate(origin_node, output_coordinate_path) 537 ] + output["coordinate_path"] 538 output["path"] = [-1] + output["path"] 539 output["length"] += origin_distance 540 if destination_distance > 0: 541 output["coordinate_path"].append( 542 self.format_coordinate(destination_node, output_coordinate_path) 543 ) 544 output["path"].append(-1) 545 output["length"] += destination_distance 546 if not output_path: 547 del output["path"] 548 return output 549 550 def get_coordinate_path( 551 self, path: list[int], output_coordinate_path: str = "list_of_dicts" 552 ) -> list[dict[str, int]]: 553 """ 554 Function: 555 556 - Return a list of node dictionaries (lat + long) in the order they are visited 557 558 Required Arguments: 559 560 - `path` 561 - Type: list 562 - What: A list of node ids in the order they are visited 563 564 Optional Arguments: 565 566 - `output_coordinate_path` 567 - Type: str 568 - What: The format of the output coordinate path 569 - Default: 'list_of_dicts' 570 - Options: 571 - 'list_of_dicts': A list of dictionaries with keys 'x' and 'y' 572 - 'list_of_lists': A list of lists with the first value being x and the second being y 573 - `list_of_tuples`: A list of tuples with the first value being x and the second being y 574 575 Returns: 576 577 - `coordinate_path` 578 - Type: list 579 - What: A list of dictionaries with keys 'x' and 'y' in the order they are visited 580 """ 581 output = [self.__get_x_y__(idx) for idx in path] 582 if output_coordinate_path == "list_of_tuples": 583 return output 584 elif output_coordinate_path == "list_of_lists": 585 return [[x, y] for x, y in output] 586 elif output_coordinate_path == "list_of_dicts": 587 return [{"x": x, "y": y} for x, y in output] 588 else: 589 raise ValueError( 590 "output_coordinate_path must be 'list_of_dicts' or 'list_of_lists' or 'list_of_tuples'" 591 ) 592 593 def export_object( 594 self, filename: str = "", include_blocks: bool = False 595 ) -> dict: 596 """ 597 Function: 598 599 - Export the graph as a list of dictionaries 600 601 Arguments: 602 603 - `filename` 604 - Type: str 605 - What: The name of the file to export the graph to. 606 - An extension of .gridgraph will be added to the file name if not already present 607 608 Optional Arguments: 609 610 - `include_blocks` 611 - Type: bool 612 - What: Whether to include the blocks in the export 613 - Default: False 614 - Note: This is not needed as the graph is already created 615 - Note: You can include blocks in the export if you need them for some reason 616 - This will be set as the blocks attribute in the imported object 617 - This will increase the size of the export file 618 """ 619 if filename == "": 620 raise ValueError("filename must be specified to export the graph") 621 filename = ( 622 filename 623 if filename.endswith(".gridgraph") 624 else filename + ".gridgraph" 625 ) 626 try: 627 import pickle, zlib, os 628 except ImportError: 629 raise ImportError( 630 "os, pickle and zlib are required to export the graph" 631 ) 632 export_data = { 633 "graph_attributes": { 634 "graph": self.graph, 635 "nodes": self.nodes, 636 "x_size": self.x_size, 637 "y_size": self.y_size, 638 "shape": self.shape, 639 "add_exterior_walls": self.add_exterior_walls, 640 "conn_data": self.conn_data, 641 }, 642 "graph_cache": self.cacheGraph.cache, 643 "export_version": 1, 644 } 645 if include_blocks: 646 export_data["graph_attributes"]["blocks"] = self.blocks 647 648 if not os.path.exists(os.path.dirname(filename)): 649 os.makedirs(os.path.dirname(filename)) 650 with open(filename, "wb") as f: 651 f.write(zlib.compress(pickle.dumps(export_data))) 652 653 @staticmethod 654 def import_object(filename: str = "") -> None: 655 """ 656 Function: 657 658 - A staticmethod to import the graph from a file 659 660 Arguments: 661 662 - `filename` 663 - Type: str 664 - What: The name of the file to import the graph from. 665 - An extension of .gridgraph will be added to the file name if not already present 666 667 Returns: 668 669 - `GridGraph` 670 - Type: GridGraph 671 - What: The imported graph object 672 """ 673 if filename == "": 674 raise ValueError("filename must be specified to import the graph") 675 filename = ( 676 filename 677 if filename.endswith(".gridgraph") 678 else filename + ".gridgraph" 679 ) 680 try: 681 import pickle, zlib 682 except ImportError: 683 raise ImportError( 684 "pickle and zlib are required to import the graph" 685 ) 686 with open(filename, "rb") as f: 687 import_data = pickle.loads(zlib.decompress(f.read())) 688 689 # Check the version of the export 690 if import_data["export_version"] != 1: 691 raise ValueError( 692 f"Incompatible export version {import_data['export_version']}. The current version is 1" 693 ) 694 # Create a new basic GridGraph object and overwrite the attributes with the imported data 695 # This is done as the init method calls the __create_graph__ method which is not needed here 696 GridGraph_object = GridGraph( 697 x_size=1, y_size=1, blocks=[], add_exterior_walls=False 698 ) 699 for key, value in import_data["graph_attributes"].items(): 700 GridGraph_object.__setattr__(key, value) 701 702 GridGraph_object.cacheGraph = CacheGraph(GridGraph_object.graph) 703 GridGraph_object.cacheGraph.cache = import_data["graph_cache"] 704 return GridGraph_object
GridGraph( x_size: int, y_size: int, blocks: list[tuple[int, int]], shape: list[tuple[int | float, int | float]] | None = None, add_exterior_walls: bool = True, conn_data: list[tuple[int, int, float]] | None = None)
9 def __init__( 10 self, 11 x_size: int, 12 y_size: int, 13 blocks: list[tuple[int, int]], 14 shape: list[tuple[int | float, int | float]] | None = None, 15 add_exterior_walls: bool = True, 16 conn_data: list[tuple[int, int, float]] | None = None, 17 ) -> None: 18 """ 19 Initializes a GridGraph object. 20 21 Required Arguments: 22 23 - `x_size` 24 - Type: int 25 - What: The size of the grid in the x direction 26 - `y_size` 27 - Type: int 28 - What: The size of the grid in the y direction 29 - `blocks` 30 - Type: list of tuples 31 - What: A list of tuples representing the coordinates of the blocked cells 32 33 Optional Arguments: 34 35 - `shape` 36 - Type: list of tuples 37 - What: A list of tuples representing the shape of the moving object relative to the object location in the grid 38 - Default: [(0, 0), (0, 1), (1, 0), (1, 1)] 39 - Example: [(0, 0), (0, 1), (1, 0), (1, 1)] 40 - Note: This would form a square of size 1x1 41 - Assuming the grid location is at (0,0) the square takes up the grid space between (0, 0) and (1, 1) 42 - Assumint the grid location is at (1,1) the square takes up the grid space between (1, 1) and (2, 2) with the same shape 43 - Note: This shape is used to determine which connections are allowed when passing through the grid. 44 - For example 45 - The shape is a square of size 1x1 46 - The location of the square is (0, 0) 47 - There is a blocked cell at (0, 1) 48 - There is a potential connection to (1, 1), (0, 1), and (1, 0) 49 - The square can not move to (1, 1) as it would collide with the blocked cell at (0, 1) 50 - This is because the square would pass partly through the blocked cell on its way to (1, 1) 51 - To achieve the same result, it can move to (1, 0) and then up to (1, 1) taking extra time / distance 52 - `add_exterior_walls` 53 - Type: bool 54 - What: Whether to add exterior walls to the grid 55 - Default: True 56 - `conn_data` 57 - Type: list of tuples 58 - What: A list of tuples representing movements allowed in the grid 59 - Each tuple should contain three values: (x_offset, y_offset, distance) 60 - x_offset: The x offset from the current cell 61 - y_offset: The y offset from the current cell 62 - distance: The distance to the connected cell 63 - Default: [(0, 1, 1), (1, 0, 1), (0, -1, 1), (-1, 0, 1), (1, 1, sqrt(2)), (-1, -1, sqrt(2)), (1, -1, sqrt(2)), (-1, 1, sqrt(2))] 64 - Note: If None, the default connection data will be used 65 - Note: This includes 8 directions, 4 cardinal and 4 diagonal 66 - Cardinal directions are 1 unit away, diagonal directions are sqrt(2) units away 67 """ 68 # Validate the inputs 69 assert x_size > 0, "x_size must be greater than 0" 70 assert y_size > 0, "y_size must be greater than 0" 71 72 self.x_size = x_size 73 self.y_size = y_size 74 self.blocks = blocks 75 self.add_exterior_walls = add_exterior_walls 76 77 # Update the blocks if add_exterior_walls is True 78 # This is done by adding all the cells in the first and last row and column 79 if add_exterior_walls: 80 self.blocks += [ 81 (x, y) for x in range(x_size) for y in [0, y_size - 1] 82 ] + [(x, y) for x in [0, x_size - 1] for y in range(y_size)] 83 84 # If shape is None, set the default shape 85 if shape is None: 86 self.shape = [(0, 0), (0, 1), (1, 0), (1, 1)] 87 else: 88 self.shape = shape 89 90 # If conn_data is None, set the default connection data 91 if conn_data is None: 92 sqrt_2 = 1.4142 93 self.conn_data = [ 94 (-1, -1, sqrt_2), 95 (-1, 0, 1), 96 (-1, 1, sqrt_2), 97 (0, -1, 1), 98 (0, 1, 1), 99 (1, -1, sqrt_2), 100 (1, 0, 1), 101 (1, 1, sqrt_2), 102 ] 103 else: 104 self.conn_data = conn_data 105 106 self.graph = self.__create_graph__() 107 self.nodes = [self.__get_x_y__(idx) for idx in range(len(self.graph))] 108 self.cacheGraph = CacheGraph(self.graph)
Initializes a GridGraph object.
Required Arguments:
x_size
- Type: int
- What: The size of the grid in the x direction
y_size
- Type: int
- What: The size of the grid in the y direction
blocks
- Type: list of tuples
- What: A list of tuples representing the coordinates of the blocked cells
Optional Arguments:
shape
- Type: list of tuples
- What: A list of tuples representing the shape of the moving object relative to the object location in the grid
- Default: [(0, 0), (0, 1), (1, 0), (1, 1)]
- Example: [(0, 0), (0, 1), (1, 0), (1, 1)]
- Note: This would form a square of size 1x1
- Assuming the grid location is at (0,0) the square takes up the grid space between (0, 0) and (1, 1)
- Assumint the grid location is at (1,1) the square takes up the grid space between (1, 1) and (2, 2) with the same shape
- Note: This shape is used to determine which connections are allowed when passing through the grid.
- For example
- The shape is a square of size 1x1
- The location of the square is (0, 0)
- There is a blocked cell at (0, 1)
- There is a potential connection to (1, 1), (0, 1), and (1, 0)
- The square can not move to (1, 1) as it would collide with the blocked cell at (0, 1)
- This is because the square would pass partly through the blocked cell on its way to (1, 1)
- To achieve the same result, it can move to (1, 0) and then up to (1, 1) taking extra time / distance
- For example
add_exterior_walls
- Type: bool
- What: Whether to add exterior walls to the grid
- Default: True
conn_data
- Type: list of tuples
- What: A list of tuples representing movements allowed in the grid
- Each tuple should contain three values: (x_offset, y_offset, distance)
- x_offset: The x offset from the current cell
- y_offset: The y offset from the current cell
- distance: The distance to the connected cell
- Default: [(0, 1, 1), (1, 0, 1), (0, -1, 1), (-1, 0, 1), (1, 1, sqrt(2)), (-1, -1, sqrt(2)), (1, -1, sqrt(2)), (-1, 1, sqrt(2))]
- Note: If None, the default connection data will be used
- Note: This includes 8 directions, 4 cardinal and 4 diagonal
- Cardinal directions are 1 unit away, diagonal directions are sqrt(2) units away
def
get_idx(self, x: int, y: int):
154 def get_idx(self, x: int, y: int): 155 """ 156 Function: 157 - Get the index of a cell in a 2D grid given its x and y coordinates 158 159 Required Arguments: 160 - `x` 161 - Type: int 162 - What: The x coordinate of the cell 163 - `y` 164 - Type: int 165 - What: The y coordinate of the cell 166 167 Returns: 168 169 - `idx` 170 - Type: int 171 - What: The index of the cell in the grid 172 """ 173 assert x >= 0 and x < self.x_size, "x coordinate is out of bounds" 174 assert y >= 0 and y < self.y_size, "y coordinate is out of bounds" 175 return x + y * self.x_size
Function:
- Get the index of a cell in a 2D grid given its x and y coordinates
Required Arguments:
x
- Type: int
- What: The x coordinate of the cell
y
- Type: int
- What: The y coordinate of the cell
Returns:
idx
- Type: int
- What: The index of the cell in the grid
def
get_x_y(self, idx: int):
177 def get_x_y(self, idx: int): 178 """ 179 Function: 180 - Get the x and y coordinates of a cell in a 2D grid given its index 181 182 Required Arguments: 183 184 - `idx` 185 - Type: int 186 - What: The index of the cell 187 188 Returns: 189 - `x` 190 - Type: int 191 - What: The x coordinate of the cell 192 - `y` 193 - Type: int 194 - What: The y coordinate of the cell 195 """ 196 assert idx >= 0 and idx < len(self.graph), "idx is out of bounds" 197 return idx % self.x_size, idx // self.x_size
Function:
- Get the x and y coordinates of a cell in a 2D grid given its index
Required Arguments:
idx
- Type: int
- What: The index of the cell
Returns:
x
- Type: int
- What: The x coordinate of the cell
y
- Type: int
- What: The y coordinate of the cell
def
euclidean_heuristic(self, origin_id: int, destination_id: int) -> float:
295 def euclidean_heuristic(self, origin_id: int, destination_id: int) -> float: 296 """ 297 Function: 298 299 - Calculate the Euclidean distance between two nodes in the grid graph 300 301 Required Arguments: 302 303 - `origin_id` 304 - Type: int 305 - What: The id of the origin node 306 - `destination_id` 307 - Type: int 308 - What: The id of the destination node 309 310 Returns: 311 312 - `distance` 313 - Type: float 314 - What: The Euclidean distance between the two nodes 315 """ 316 origin_location = self.nodes[origin_id] 317 destination_location = self.nodes[destination_id] 318 return ( 319 (origin_location[0] - destination_location[0]) ** 2 320 + (origin_location[1] - destination_location[1]) ** 2 321 ) ** 0.5
Function:
- Calculate the Euclidean distance between two nodes in the grid graph
Required Arguments:
origin_id
- Type: int
- What: The id of the origin node
destination_id
- Type: int
- What: The id of the destination node
Returns:
distance
- Type: float
- What: The Euclidean distance between the two nodes
def
format_coordinate(self, cooinate_dict, output_coorinate_path):
376 def format_coordinate(self, cooinate_dict, output_coorinate_path): 377 if output_coorinate_path == "list_of_tuples": 378 return (cooinate_dict["x"], cooinate_dict["y"]) 379 elif output_coorinate_path == "list_of_lists": 380 return [cooinate_dict["x"], cooinate_dict["y"]] 381 elif output_coordinate_path == "list_of_dicts": 382 return {"x": cooinate_dict["x"], "y": cooinate_dict["y"]} 383 else: 384 raise ValueError("Invalid output_coordinate_path format")
def
get_shortest_path( self, origin_node: dict[typing.Literal['x', 'y'], int | float] | tuple[int | float, int | float] | list[int | float], destination_node: dict[typing.Literal['x', 'y'], int | float] | tuple[int | float, int | float] | list[int | float], output_coordinate_path: str = 'list_of_dicts', cache: bool = False, cache_for: str = 'origin', output_path: bool = False, heuristic_fn: Union[<built-in function callable>, Literal['euclidean'], NoneType] = 'euclidean', algorithm_fn: <built-in function callable> = <function Graph.a_star>, algorithm_kwargs: dict | None = None, **kwargs) -> dict:
386 def get_shortest_path( 387 self, 388 origin_node: ( 389 dict[Literal["x", "y"], int | float] 390 | tuple[int | float, int | float] 391 | list[int | float] 392 ), 393 destination_node: ( 394 dict[Literal["x", "y"], int | float] 395 | tuple[int | float, int | float] 396 | list[int | float] 397 ), 398 output_coordinate_path: str = "list_of_dicts", 399 cache: bool = False, 400 cache_for: str = "origin", 401 output_path: bool = False, 402 heuristic_fn: callable | Literal["euclidean"] | None = "euclidean", 403 algorithm_fn: callable = Graph.a_star, 404 algorithm_kwargs: dict | None = None, 405 **kwargs, 406 ) -> dict: 407 """ 408 Function: 409 410 - Identify the shortest path between two nodes in a sparse network graph 411 - If an off graph origin and/or destination node is provided, it will find the closest adjacent node with connections 412 as the origin and/or destination node 413 - This is done to increase the likelihood that the pathfinding algorithm can find a valid path 414 - If no valid adjacent node is found, an error will be raised 415 416 - Return a dictionary of various path information including: 417 - `path`: A list of graph ids in the order they are visited 418 - `coordinate_path`: A list of dicts (x, y) in the order they are visited 419 - `length`: The length of the path 420 421 Required Arguments: 422 423 - `origin_node` 424 - Type: dict of int | float 425 - What: A dictionary with the keys 'x' and 'y' 426 - Alternatively, a tuple or list of two values (x, y) can be used 427 - `destination_node` 428 - Type: dict of int | float 429 - What: A dictionary with the keys 'x' and 'y' 430 - Alternatively, a tuple or list of two values (x, y) can be used 431 432 Optional Arguments: 433 434 - `output_coordinate_path` 435 - Type: str 436 - What: The format of the output coordinate path 437 - Default: 'list_of_lists' 438 - Options: 439 - `list_of_tuples`: A list of tuples with the first value being x and the second being y 440 - 'list_of_dicts': A list of dictionaries with keys 'x' and 'y' 441 - 'list_of_lists': A list of lists with the first value being x and the second being y 442 - `cache` 443 - Type: bool 444 - What: Whether to use the cache (save and reuse the spanning tree) 445 - Default: False 446 - `cache_for` 447 - Type: str 448 - What: Whether to cache the spanning tree for the origin or destination node if `cache` is True 449 - Default: 'origin' 450 - Options: 'origin', 'destination' 451 - `output_path` 452 - Type: bool 453 - What: Whether to output the path as a list of graph idxs (mostly for debugging purposes) 454 - Default: False 455 - `heuristic_fn` 456 - Type: callable | Literal['euclidean'] | None 457 - What: A heuristic function to use for the A* algorithm if caching is False 458 - Default: 'euclidean' (A predefined heuristic function that calculates the Euclidean distance for this grid graph) 459 - If None, the A* algorithm will default to Dijkstra's algorithm 460 - If a callable is provided, it should take two arguments: origin_id and destination_id and return a float representing the heuristic distance between the two nodes 461 - Note: This distance should never be greater than the actual distance between the two nodes or you may get suboptimal paths 462 - Note: This is deprecated and will be removed in a future version 463 - `algorithm_fn` 464 - Type: callable | None 465 - What: The algorithm to use for pathfinding 466 - Default: 'a_star' 467 - If None, the default algorithm will be used 468 - `algorithm_kwargs` 469 - Type: dict 470 - What: Additional keyword arguments to pass to the algorithm function 471 - Default: {} 472 - `**kwargs` 473 - Additional keyword arguments. These are included for forwards and backwards compatibility reasons, but are not currently used. 474 """ 475 if isinstance(origin_node, (tuple, list)): 476 origin_node = {"x": origin_node[0], "y": origin_node[1]} 477 if isinstance(destination_node, (tuple, list)): 478 destination_node = { 479 "x": destination_node[0], 480 "y": destination_node[1], 481 } 482 if algorithm_kwargs is None: 483 algorithm_kwargs = {} 484 485 origin_id, origin_distance = self.__get_closest_node_with_connections__( 486 **origin_node 487 ) 488 destination_id, destination_distance = ( 489 self.__get_closest_node_with_connections__(**destination_node) 490 ) 491 492 if self.graph[origin_id] == {}: 493 raise ValueError( 494 "Origin node is not connected to any other nodes. This is likely caused by the origin node not being possible given a blocked cell or nearby blocked cell" 495 ) 496 if self.graph[destination_id] == {}: 497 raise ValueError( 498 "Destination node is not connected to any other nodes. This is likely caused by the destination node not being possible given a blocked cell or nearby blocked cell" 499 ) 500 if cache: 501 if cache_for not in ["origin", "destination"]: 502 raise ValueError( 503 "cache_for must be 'origin' or 'destination' when cache is True" 504 ) 505 # Reverse for cache graphing if cache_for is destination since the graph is undirected 506 if cache_for == "destination": 507 origin_id, destination_id = destination_id, origin_id 508 output = self.cacheGraph.get_shortest_path( 509 origin_id=origin_id, 510 destination_id=destination_id, 511 ) 512 # Undo the reverse if cache_for is destination 513 if cache_for == "destination": 514 output["path"].reverse() 515 origin_id, destination_id = destination_id, origin_id 516 else: 517 # TODO: Remove this backwards compatibility hack in future versions 518 if algorithm_fn == Graph.a_star: 519 if "heuristic_fn" not in algorithm_kwargs: 520 algorithm_kwargs["heuristic_fn"] = ( 521 self.euclidean_heuristic 522 if heuristic_fn == "euclidean" 523 else heuristic_fn 524 ) 525 output = algorithm_fn( 526 graph=self.graph, 527 origin_id=origin_id, 528 destination_id=destination_id, 529 **algorithm_kwargs, 530 ) 531 output["coordinate_path"] = self.get_coordinate_path( 532 output["path"], output_coordinate_path 533 ) 534 if origin_distance > 0: 535 output["coordinate_path"] = [ 536 self.format_coordinate(origin_node, output_coordinate_path) 537 ] + output["coordinate_path"] 538 output["path"] = [-1] + output["path"] 539 output["length"] += origin_distance 540 if destination_distance > 0: 541 output["coordinate_path"].append( 542 self.format_coordinate(destination_node, output_coordinate_path) 543 ) 544 output["path"].append(-1) 545 output["length"] += destination_distance 546 if not output_path: 547 del output["path"] 548 return output
Function:
- Identify the shortest path between two nodes in a sparse network graph
If an off graph origin and/or destination node is provided, it will find the closest adjacent node with connections as the origin and/or destination node
- This is done to increase the likelihood that the pathfinding algorithm can find a valid path
- If no valid adjacent node is found, an error will be raised
Return a dictionary of various path information including:
path
: A list of graph ids in the order they are visitedcoordinate_path
: A list of dicts (x, y) in the order they are visitedlength
: The length of the path
Required Arguments:
origin_node
- Type: dict of int | float
- What: A dictionary with the keys 'x' and 'y'
- Alternatively, a tuple or list of two values (x, y) can be used
destination_node
- Type: dict of int | float
- What: A dictionary with the keys 'x' and 'y'
- Alternatively, a tuple or list of two values (x, y) can be used
Optional Arguments:
output_coordinate_path
- Type: str
- What: The format of the output coordinate path
- Default: 'list_of_lists'
- Options:
list_of_tuples
: A list of tuples with the first value being x and the second being y- 'list_of_dicts': A list of dictionaries with keys 'x' and 'y'
- 'list_of_lists': A list of lists with the first value being x and the second being y
cache
- Type: bool
- What: Whether to use the cache (save and reuse the spanning tree)
- Default: False
cache_for
- Type: str
- What: Whether to cache the spanning tree for the origin or destination node if
cache
is True - Default: 'origin'
- Options: 'origin', 'destination'
output_path
- Type: bool
- What: Whether to output the path as a list of graph idxs (mostly for debugging purposes)
- Default: False
heuristic_fn
- Type: callable | Literal['euclidean'] | None
- What: A heuristic function to use for the A* algorithm if caching is False
- Default: 'euclidean' (A predefined heuristic function that calculates the Euclidean distance for this grid graph)
- If None, the A* algorithm will default to Dijkstra's algorithm
- If a callable is provided, it should take two arguments: origin_id and destination_id and return a float representing the heuristic distance between the two nodes
- Note: This distance should never be greater than the actual distance between the two nodes or you may get suboptimal paths
- Note: This is deprecated and will be removed in a future version
algorithm_fn
- Type: callable | None
- What: The algorithm to use for pathfinding
- Default: 'a_star'
- If None, the default algorithm will be used
algorithm_kwargs
- Type: dict
- What: Additional keyword arguments to pass to the algorithm function
- Default: {}
**kwargs
- Additional keyword arguments. These are included for forwards and backwards compatibility reasons, but are not currently used.
def
get_coordinate_path( self, path: list[int], output_coordinate_path: str = 'list_of_dicts') -> list[dict[str, int]]:
550 def get_coordinate_path( 551 self, path: list[int], output_coordinate_path: str = "list_of_dicts" 552 ) -> list[dict[str, int]]: 553 """ 554 Function: 555 556 - Return a list of node dictionaries (lat + long) in the order they are visited 557 558 Required Arguments: 559 560 - `path` 561 - Type: list 562 - What: A list of node ids in the order they are visited 563 564 Optional Arguments: 565 566 - `output_coordinate_path` 567 - Type: str 568 - What: The format of the output coordinate path 569 - Default: 'list_of_dicts' 570 - Options: 571 - 'list_of_dicts': A list of dictionaries with keys 'x' and 'y' 572 - 'list_of_lists': A list of lists with the first value being x and the second being y 573 - `list_of_tuples`: A list of tuples with the first value being x and the second being y 574 575 Returns: 576 577 - `coordinate_path` 578 - Type: list 579 - What: A list of dictionaries with keys 'x' and 'y' in the order they are visited 580 """ 581 output = [self.__get_x_y__(idx) for idx in path] 582 if output_coordinate_path == "list_of_tuples": 583 return output 584 elif output_coordinate_path == "list_of_lists": 585 return [[x, y] for x, y in output] 586 elif output_coordinate_path == "list_of_dicts": 587 return [{"x": x, "y": y} for x, y in output] 588 else: 589 raise ValueError( 590 "output_coordinate_path must be 'list_of_dicts' or 'list_of_lists' or 'list_of_tuples'" 591 )
Function:
- Return a list of node dictionaries (lat + long) in the order they are visited
Required Arguments:
path
- Type: list
- What: A list of node ids in the order they are visited
Optional Arguments:
output_coordinate_path
- Type: str
- What: The format of the output coordinate path
- Default: 'list_of_dicts'
- Options:
- 'list_of_dicts': A list of dictionaries with keys 'x' and 'y'
- 'list_of_lists': A list of lists with the first value being x and the second being y
list_of_tuples
: A list of tuples with the first value being x and the second being y
Returns:
coordinate_path
- Type: list
- What: A list of dictionaries with keys 'x' and 'y' in the order they are visited
def
export_object(self, filename: str = '', include_blocks: bool = False) -> dict:
593 def export_object( 594 self, filename: str = "", include_blocks: bool = False 595 ) -> dict: 596 """ 597 Function: 598 599 - Export the graph as a list of dictionaries 600 601 Arguments: 602 603 - `filename` 604 - Type: str 605 - What: The name of the file to export the graph to. 606 - An extension of .gridgraph will be added to the file name if not already present 607 608 Optional Arguments: 609 610 - `include_blocks` 611 - Type: bool 612 - What: Whether to include the blocks in the export 613 - Default: False 614 - Note: This is not needed as the graph is already created 615 - Note: You can include blocks in the export if you need them for some reason 616 - This will be set as the blocks attribute in the imported object 617 - This will increase the size of the export file 618 """ 619 if filename == "": 620 raise ValueError("filename must be specified to export the graph") 621 filename = ( 622 filename 623 if filename.endswith(".gridgraph") 624 else filename + ".gridgraph" 625 ) 626 try: 627 import pickle, zlib, os 628 except ImportError: 629 raise ImportError( 630 "os, pickle and zlib are required to export the graph" 631 ) 632 export_data = { 633 "graph_attributes": { 634 "graph": self.graph, 635 "nodes": self.nodes, 636 "x_size": self.x_size, 637 "y_size": self.y_size, 638 "shape": self.shape, 639 "add_exterior_walls": self.add_exterior_walls, 640 "conn_data": self.conn_data, 641 }, 642 "graph_cache": self.cacheGraph.cache, 643 "export_version": 1, 644 } 645 if include_blocks: 646 export_data["graph_attributes"]["blocks"] = self.blocks 647 648 if not os.path.exists(os.path.dirname(filename)): 649 os.makedirs(os.path.dirname(filename)) 650 with open(filename, "wb") as f: 651 f.write(zlib.compress(pickle.dumps(export_data)))
Function:
- Export the graph as a list of dictionaries
Arguments:
filename
- Type: str
- What: The name of the file to export the graph to.
- An extension of .gridgraph will be added to the file name if not already present
Optional Arguments:
include_blocks
- Type: bool
- What: Whether to include the blocks in the export
- Default: False
- Note: This is not needed as the graph is already created
- Note: You can include blocks in the export if you need them for some reason
- This will be set as the blocks attribute in the imported object
- This will increase the size of the export file
@staticmethod
def
import_object(filename: str = '') -> None:
653 @staticmethod 654 def import_object(filename: str = "") -> None: 655 """ 656 Function: 657 658 - A staticmethod to import the graph from a file 659 660 Arguments: 661 662 - `filename` 663 - Type: str 664 - What: The name of the file to import the graph from. 665 - An extension of .gridgraph will be added to the file name if not already present 666 667 Returns: 668 669 - `GridGraph` 670 - Type: GridGraph 671 - What: The imported graph object 672 """ 673 if filename == "": 674 raise ValueError("filename must be specified to import the graph") 675 filename = ( 676 filename 677 if filename.endswith(".gridgraph") 678 else filename + ".gridgraph" 679 ) 680 try: 681 import pickle, zlib 682 except ImportError: 683 raise ImportError( 684 "pickle and zlib are required to import the graph" 685 ) 686 with open(filename, "rb") as f: 687 import_data = pickle.loads(zlib.decompress(f.read())) 688 689 # Check the version of the export 690 if import_data["export_version"] != 1: 691 raise ValueError( 692 f"Incompatible export version {import_data['export_version']}. The current version is 1" 693 ) 694 # Create a new basic GridGraph object and overwrite the attributes with the imported data 695 # This is done as the init method calls the __create_graph__ method which is not needed here 696 GridGraph_object = GridGraph( 697 x_size=1, y_size=1, blocks=[], add_exterior_walls=False 698 ) 699 for key, value in import_data["graph_attributes"].items(): 700 GridGraph_object.__setattr__(key, value) 701 702 GridGraph_object.cacheGraph = CacheGraph(GridGraph_object.graph) 703 GridGraph_object.cacheGraph.cache = import_data["graph_cache"] 704 return GridGraph_object
Function:
- A staticmethod to import the graph from a file
Arguments:
filename
- Type: str
- What: The name of the file to import the graph from.
- An extension of .gridgraph will be added to the file name if not already present
Returns:
GridGraph
- Type: GridGraph
- What: The imported graph object