scgraph.grid

  1from scgraph.graph import Graph
  2from scgraph.helpers.shape_mover_utils import ShapeMoverUtils
  3from scgraph.cache import CacheGraph
  4from typing import Literal
  5
  6
  7class GridGraph:
  8    def __init__(
  9        self,
 10        x_size: int,
 11        y_size: int,
 12        blocks: list[tuple[int, int]],
 13        shape: list[tuple[int | float, int | float]] | None = None,
 14        add_exterior_walls: bool = True,
 15        conn_data: list[tuple[int, int, float]] | None = None,
 16    ) -> None:
 17        """
 18        Initializes a GridGraph object.
 19
 20        Required Arguments:
 21
 22        - `x_size`
 23            - Type: int
 24            - What: The size of the grid in the x direction
 25        - `y_size`
 26            - Type: int
 27            - What: The size of the grid in the y direction
 28        - `blocks`
 29            - Type: list of tuples
 30            - What: A list of tuples representing the coordinates of the blocked cells
 31
 32        Optional Arguments:
 33
 34        - `shape`
 35            - Type: list of tuples
 36            - What: A list of tuples representing the shape of the moving object relative to the object location in the grid
 37            - Default: [(0, 0), (0, 1), (1, 0), (1, 1)]
 38            - Example: [(0, 0), (0, 1), (1, 0), (1, 1)]
 39                - Note: This would form a square of size 1x1
 40                - Assuming the grid location is at (0,0) the square takes up the grid space between (0, 0) and (1, 1)
 41                - Assumint the grid location is at (1,1) the square takes up the grid space between (1, 1) and (2, 2) with the same shape
 42            - Note: This shape is used to determine which connections are allowed when passing through the grid.
 43                - For example
 44                    - The shape is a square of size 1x1
 45                    - The location of the square is (0, 0)
 46                    - There is a blocked cell at (0, 1)
 47                    - There is a potential connection to (1, 1), (0, 1), and (1, 0)
 48                    - The square can not move to (1, 1) as it would collide with the blocked cell at (0, 1)
 49                      - This is because the square would pass partly through the blocked cell on its way to (1, 1)
 50                      - To achieve the same result, it can move to (1, 0) and then up to (1, 1) taking extra time / distance
 51        - `add_exterior_walls`
 52            - Type: bool
 53            - What: Whether to add exterior walls to the grid
 54            - Default: True
 55        - `conn_data`
 56            - Type: list of tuples
 57            - What: A list of tuples representing movements allowed in the grid
 58                - Each tuple should contain three values: (x_offset, y_offset, distance)
 59                - x_offset: The x offset from the current cell
 60                - y_offset: The y offset from the current cell
 61                - distance: The distance to the connected cell
 62            - Default: [(0, 1, 1), (1, 0, 1), (0, -1, 1), (-1, 0, 1), (1, 1, sqrt(2)), (-1, -1, sqrt(2)), (1, -1, sqrt(2)), (-1, 1, sqrt(2))]
 63                - Note: If None, the default connection data will be used
 64                - Note: This includes 8 directions, 4 cardinal and 4 diagonal
 65                - Cardinal directions are 1 unit away, diagonal directions are sqrt(2) units away
 66        """
 67        # Validate the inputs
 68        assert x_size > 0, "x_size must be greater than 0"
 69        assert y_size > 0, "y_size must be greater than 0"
 70
 71        self.x_size = x_size
 72        self.y_size = y_size
 73        self.blocks = blocks
 74        self.add_exterior_walls = add_exterior_walls
 75
 76        # Update the blocks if add_exterior_walls is True
 77        # This is done by adding all the cells in the first and last row and column
 78        if add_exterior_walls:
 79            self.blocks += [
 80                (x, y) for x in range(x_size) for y in [0, y_size - 1]
 81            ] + [(x, y) for x in [0, x_size - 1] for y in range(y_size)]
 82
 83        # If shape is None, set the default shape
 84        if shape is None:
 85            self.shape = [(0, 0), (0, 1), (1, 0), (1, 1)]
 86        else:
 87            self.shape = shape
 88
 89        # If conn_data is None, set the default connection data
 90        if conn_data is None:
 91            sqrt_2 = 1.4142
 92            self.conn_data = [
 93                (-1, -1, sqrt_2),
 94                (-1, 0, 1),
 95                (-1, 1, sqrt_2),
 96                (0, -1, 1),
 97                (0, 1, 1),
 98                (1, -1, sqrt_2),
 99                (1, 0, 1),
100                (1, 1, sqrt_2),
101            ]
102        else:
103            self.conn_data = conn_data
104
105        self.graph = self.__create_graph__()
106        self.nodes = [self.__get_x_y__(idx) for idx in range(len(self.graph))]
107        self.cacheGraph = CacheGraph(self.graph)
108
109    def __get_idx__(self, x: int, y: int):
110        """
111        Function:
112        - Get the index of a cell in a 2D grid given its x and y coordinates
113        - This does not have any error checking, so be careful with the inputs
114
115        Required Arguments:
116        - `x`
117            - Type: int
118            - What: The x coordinate of the cell
119        - `y`
120            - Type: int
121            - What: The y coordinate of the cell
122
123        Returns:
124
125        - `idx`
126            - Type: int
127            - What: The index of the cell in the grid
128        """
129        return x + y * self.x_size
130
131    def __get_x_y__(self, idx: int):
132        """
133        Function:
134        - Get the x and y coordinates of a cell in a 2D grid given its index
135        - This does not have any error checking so be careful with the inputs
136
137        Required Arguments:
138
139        - `idx`
140            - Type: int
141            - What: The index of the cell
142
143        Returns:
144        - `x`
145            - Type: int
146            - What: The x coordinate of the cell
147        - `y`
148            - Type: int
149            - What: The y coordinate of the cell
150        """
151        return idx % self.x_size, idx // self.x_size
152
153    def get_idx(self, x: int, y: int):
154        """
155        Function:
156        - Get the index of a cell in a 2D grid given its x and y coordinates
157
158        Required Arguments:
159        - `x`
160            - Type: int
161            - What: The x coordinate of the cell
162        - `y`
163            - Type: int
164            - What: The y coordinate of the cell
165
166        Returns:
167
168        - `idx`
169            - Type: int
170            - What: The index of the cell in the grid
171        """
172        assert x >= 0 and x < self.x_size, "x coordinate is out of bounds"
173        assert y >= 0 and y < self.y_size, "y coordinate is out of bounds"
174        return x + y * self.x_size
175
176    def get_x_y(self, idx: int):
177        """
178        Function:
179        - Get the x and y coordinates of a cell in a 2D grid given its index
180
181        Required Arguments:
182
183        - `idx`
184            - Type: int
185            - What: The index of the cell
186
187        Returns:
188        - `x`
189            - Type: int
190            - What: The x coordinate of the cell
191        - `y`
192            - Type: int
193            - What: The y coordinate of the cell
194        """
195        assert idx >= 0 and idx < len(self.graph), "idx is out of bounds"
196        return idx % self.x_size, idx // self.x_size
197
198    def __get_relative_shape_offsets__(
199        self,
200        shape: list[tuple[int | float, int | float]],
201        x_off: int,
202        y_off: int,
203    ):
204        return list(
205            ShapeMoverUtils.moving_shape_overlap_intervals(
206                x_coord=0,
207                y_coord=0,
208                x_shift=x_off,
209                y_shift=y_off,
210                t_start=0,
211                t_end=1,
212                shape=shape,
213            ).keys()
214        )
215
216    def __create_graph__(
217        self,
218    ):
219        """
220        Function:
221
222        - Create a graph from the grid specifications
223
224        Returns:
225
226        - `graph`
227            - Type: list
228            - What: The adjacency list representation of the graph
229        """
230        ####################
231        # Create a list of lists to hold all the possible connections in an easy to access/understand format
232        ####################
233
234        # Each cell in the list will be a dictionary of connections
235        # The keys will be the coordinates of the cell that can be moved to
236        # The values will be the distance to that cell
237        # This essentially creates a sparse matrix
238        graph_matrix = [
239            [{} for _ in range(self.x_size)] for _ in range(self.y_size)
240        ]
241        for y in range(self.y_size):
242            for x in range(self.x_size):
243                for x_off, y_off, dist in self.conn_data:
244                    if (
245                        0 <= x + x_off < self.x_size
246                        and 0 <= y + y_off < self.y_size
247                    ):
248                        graph_matrix[y][x][(x + x_off, y + y_off)] = dist
249
250        ####################
251        # Remove all connections that would not be possible based on the blocks and the shape
252        ####################
253
254        # Create a set of offsets that will be used to help remove connections relative to each blocked cell
255        delta_offsets = {
256            (x_delta, y_delta): self.__get_relative_shape_offsets__(
257                self.shape, x_delta, y_delta
258            )
259            for x_delta, y_delta, _ in self.conn_data
260        }
261
262        # Loop over all the blocks and remove all connections that would be not possible based on the blocks
263        for block in self.blocks:
264            # Clear out all connections leaving from the blocked cell
265            graph_matrix[block[1]][block[0]] = {}
266            # Clear out all connections that would be blocked by each movement given the blocked cell and the delta offsets
267            # Our delta offsets dictionary essentially tells us which relative cells would be blocked by the moving shape
268            for delta, offsets in delta_offsets.items():
269                for offset in offsets:
270                    x_cell = block[0] - offset[0]
271                    y_cell = block[1] - offset[1]
272                    if 0 <= x_cell < self.x_size and 0 <= y_cell < self.y_size:
273                        x_move_to = x_cell + delta[0]
274                        y_move_to = y_cell + delta[1]
275                        graph_matrix[y_cell][x_cell].pop(
276                            (x_move_to, y_move_to), None
277                        )
278
279        ####################
280        # Convert the graph matrix into the very efficient graph format as specified by the scgraph library
281        ####################
282        graph = []
283        for x_data in graph_matrix:
284            for connection_dict in x_data:
285                graph.append(
286                    {
287                        self.__get_idx__(x, y): dist
288                        for (x, y), dist in connection_dict.items()
289                    }
290                )
291
292        return graph
293
294    def euclidean_heuristic(self, origin_id: int, destination_id: int) -> float:
295        """
296        Function:
297
298        - Calculate the Euclidean distance between two nodes in the grid graph
299
300        Required Arguments:
301
302        - `origin_id`
303            - Type: int
304            - What: The id of the origin node
305        - `destination_id`
306            - Type: int
307            - What: The id of the destination node
308
309        Returns:
310
311        - `distance`
312            - Type: float
313            - What: The Euclidean distance between the two nodes
314        """
315        origin_location = self.nodes[origin_id]
316        destination_location = self.nodes[destination_id]
317        return (
318            (origin_location[0] - destination_location[0]) ** 2
319            + (origin_location[1] - destination_location[1]) ** 2
320        ) ** 0.5
321
322    def __get_closest_node_with_connections__(
323        self,
324        x: int | float,
325        y: int | float,
326    ):
327        """
328        Function:
329
330        - Get the closest node in the graph that has connections
331
332        Required Arguments:
333
334        - `x`
335            - Type: int | float
336            - What: The x coordinate of the node
337        - `y`
338            - Type: int | float
339            - What: The y coordinate of the node
340
341        Returns:
342
343        - `closest_node_id`
344            - Type: int
345            - What: The id of the closest node with connections
346        - `closest_distance`
347            - Type: float
348            - What: The distance to the closest node with connections
349        """
350        x = int(x) if int(x) == x else x
351        y = int(y) if int(y) == y else y
352        if isinstance(x, int) and isinstance(y, int):
353            return self.get_idx(x, y), 0.0
354        closest_node_id = None
355        closest_distance = float("inf")
356        for x_off, y_off in [(0, 0), (1, 0), (0, 1), (1, 1)]:
357            node_x = int(x) + x_off
358            node_y = int(y) + y_off
359            try:
360                node_id = self.get_idx(node_x, node_y)
361            except AssertionError:
362                continue
363            if self.graph[node_id] == {}:
364                continue
365            distance = ((node_x - x) ** 2 + (node_y - y) ** 2) ** 0.5
366            if distance < closest_distance:
367                closest_distance = distance
368                closest_node_id = node_id
369        if closest_node_id is None:
370            raise ValueError(
371                "No valid adjacent node with connections found for the given coordinates"
372            )
373        return closest_node_id, closest_distance
374
375    def format_coordinate(self, cooinate_dict, output_coorinate_path):
376        if output_coorinate_path == "list_of_tuples":
377            return (cooinate_dict["x"], cooinate_dict["y"])
378        elif output_coorinate_path == "list_of_lists":
379            return [cooinate_dict["x"], cooinate_dict["y"]]
380        elif output_coordinate_path == "list_of_dicts":
381            return {"x": cooinate_dict["x"], "y": cooinate_dict["y"]}
382        else:
383            raise ValueError("Invalid output_coordinate_path format")
384
385    def get_shortest_path(
386        self,
387        origin_node: (
388            dict[Literal["x", "y"], int | float]
389            | tuple[int | float, int | float]
390            | list[int | float]
391        ),
392        destination_node: (
393            dict[Literal["x", "y"], int | float]
394            | tuple[int | float, int | float]
395            | list[int | float]
396        ),
397        output_coordinate_path: str = "list_of_dicts",
398        cache: bool = False,
399        cache_for: str = "origin",
400        output_path: bool = False,
401        heuristic_fn: callable | Literal["euclidean"] | None = "euclidean",
402        algorithm_fn: callable = Graph.a_star,
403        algorithm_kwargs: dict | None = None,
404        **kwargs,
405    ) -> dict:
406        """
407        Function:
408
409        - Identify the shortest path between two nodes in a sparse network graph
410        - If an off graph origin and/or destination node is provided, it will find the closest adjacent node with connections
411          as the origin and/or destination node
412            - This is done to increase the likelihood that the pathfinding algorithm can find a valid path
413            - If no valid adjacent node is found, an error will be raised
414
415        - Return a dictionary of various path information including:
416            - `path`: A list of graph ids in the order they are visited
417            - `coordinate_path`: A list of dicts (x, y) in the order they are visited
418            - `length`: The length of the path
419
420         Required Arguments:
421
422        - `origin_node`
423            - Type: dict of int | float
424            - What: A dictionary with the keys 'x' and 'y'
425            - Alternatively, a tuple or list of two values (x, y) can be used
426        - `destination_node`
427            - Type: dict of int | float
428            - What: A dictionary with the keys 'x' and 'y'
429            - Alternatively, a tuple or list of two values (x, y) can be used
430
431        Optional Arguments:
432
433        - `output_coordinate_path`
434            - Type: str
435            - What: The format of the output coordinate path
436            - Default: 'list_of_lists'
437            - Options:
438                - `list_of_tuples`: A list of tuples with the first value being x and the second being y
439                - 'list_of_dicts': A list of dictionaries with keys 'x' and 'y'
440                - 'list_of_lists': A list of lists with the first value being x and the second being y
441        - `cache`
442            - Type: bool
443            - What: Whether to use the cache (save and reuse the spanning tree)
444            - Default: False
445        - `cache_for`
446            - Type: str
447            - What: Whether to cache the spanning tree for the origin or destination node if `cache` is True
448            - Default: 'origin'
449            - Options: 'origin', 'destination'
450        - `output_path`
451            - Type: bool
452            - What: Whether to output the path as a list of graph idxs (mostly for debugging purposes)
453            - Default: False
454        - `heuristic_fn`
455            - Type: callable | Literal['euclidean'] | None
456            - What: A heuristic function to use for the A* algorithm if caching is False
457            - Default: 'euclidean' (A predefined heuristic function that calculates the Euclidean distance for this grid graph)
458            - If None, the A* algorithm will default to Dijkstra's algorithm
459            - If a callable is provided, it should take two arguments: origin_id and destination_id and return a float representing the heuristic distance between the two nodes
460                - Note: This distance should never be greater than the actual distance between the two nodes or you may get suboptimal paths
461                - Note: This is deprecated and will be removed in a future version
462        - `algorithm_fn`
463            - Type: callable | None
464            - What: The algorithm to use for pathfinding
465            - Default: 'a_star'
466            - If None, the default algorithm will be used
467        - `algorithm_kwargs`
468            - Type: dict
469            - What: Additional keyword arguments to pass to the algorithm function
470            - Default: {}
471        - `**kwargs`
472            - Additional keyword arguments. These are included for forwards and backwards compatibility reasons, but are not currently used.
473        """
474        if isinstance(origin_node, (tuple, list)):
475            origin_node = {"x": origin_node[0], "y": origin_node[1]}
476        if isinstance(destination_node, (tuple, list)):
477            destination_node = {
478                "x": destination_node[0],
479                "y": destination_node[1],
480            }
481        if algorithm_kwargs is None:
482            algorithm_kwargs = {}
483
484        origin_id, origin_distance = self.__get_closest_node_with_connections__(
485            **origin_node
486        )
487        destination_id, destination_distance = (
488            self.__get_closest_node_with_connections__(**destination_node)
489        )
490
491        if self.graph[origin_id] == {}:
492            raise ValueError(
493                "Origin node is not connected to any other nodes. This is likely caused by the origin node not being possible given a blocked cell or nearby blocked cell"
494            )
495        if self.graph[destination_id] == {}:
496            raise ValueError(
497                "Destination node is not connected to any other nodes. This is likely caused by the destination node not being possible given a blocked cell or nearby blocked cell"
498            )
499        if cache:
500            if cache_for not in ["origin", "destination"]:
501                raise ValueError(
502                    "cache_for must be 'origin' or 'destination' when cache is True"
503                )
504            # Reverse for cache graphing if cache_for is destination since the graph is undirected
505            if cache_for == "destination":
506                origin_id, destination_id = destination_id, origin_id
507            output = self.cacheGraph.get_shortest_path(
508                origin_id=origin_id,
509                destination_id=destination_id,
510            )
511            # Undo the reverse if cache_for is destination
512            if cache_for == "destination":
513                output["path"].reverse()
514                origin_id, destination_id = destination_id, origin_id
515        else:
516            # TODO: Remove this backwards compatibility hack in future versions
517            if algorithm_fn == Graph.a_star:
518                if "heuristic_fn" not in algorithm_kwargs:
519                    algorithm_kwargs["heuristic_fn"] = (
520                        self.euclidean_heuristic
521                        if heuristic_fn == "euclidean"
522                        else heuristic_fn
523                    )
524            output = algorithm_fn(
525                graph=self.graph,
526                origin_id=origin_id,
527                destination_id=destination_id,
528                **algorithm_kwargs,
529            )
530        output["coordinate_path"] = self.get_coordinate_path(
531            output["path"], output_coordinate_path
532        )
533        if origin_distance > 0:
534            output["coordinate_path"] = [
535                self.format_coordinate(origin_node, output_coordinate_path)
536            ] + output["coordinate_path"]
537            output["path"] = [-1] + output["path"]
538            output["length"] += origin_distance
539        if destination_distance > 0:
540            output["coordinate_path"].append(
541                self.format_coordinate(destination_node, output_coordinate_path)
542            )
543            output["path"].append(-1)
544            output["length"] += destination_distance
545        if not output_path:
546            del output["path"]
547        return output
548
549    def get_coordinate_path(
550        self, path: list[int], output_coordinate_path: str = "list_of_dicts"
551    ) -> list[dict[str, int]]:
552        """
553        Function:
554
555        - Return a list of node dictionaries (lat + long) in the order they are visited
556
557        Required Arguments:
558
559        - `path`
560            - Type: list
561            - What: A list of node ids in the order they are visited
562
563        Optional Arguments:
564
565        - `output_coordinate_path`
566            - Type: str
567            - What: The format of the output coordinate path
568            - Default: 'list_of_dicts'
569            - Options:
570                - 'list_of_dicts': A list of dictionaries with keys 'x' and 'y'
571                - 'list_of_lists': A list of lists with the first value being x and the second being y
572                - `list_of_tuples`: A list of tuples with the first value being x and the second being y
573
574        Returns:
575
576        - `coordinate_path`
577            - Type: list
578            - What: A list of dictionaries with keys 'x' and 'y' in the order they are visited
579        """
580        output = [self.__get_x_y__(idx) for idx in path]
581        if output_coordinate_path == "list_of_tuples":
582            return output
583        elif output_coordinate_path == "list_of_lists":
584            return [[x, y] for x, y in output]
585        elif output_coordinate_path == "list_of_dicts":
586            return [{"x": x, "y": y} for x, y in output]
587        else:
588            raise ValueError(
589                "output_coordinate_path must be 'list_of_dicts' or 'list_of_lists' or 'list_of_tuples'"
590            )
591
592    def export_object(
593        self, filename: str = "", include_blocks: bool = False
594    ) -> dict:
595        """
596        Function:
597
598        - Export the graph as a list of dictionaries
599
600        Arguments:
601
602        - `filename`
603            - Type: str
604            - What: The name of the file to export the graph to.
605            - An extension of .gridgraph will be added to the file name if not already present
606
607        Optional Arguments:
608
609        - `include_blocks`
610            - Type: bool
611            - What: Whether to include the blocks in the export
612            - Default: False
613            - Note: This is not needed as the graph is already created
614            - Note: You can include blocks in the export if you need them for some reason
615                - This will be set as the blocks attribute in the imported object
616                - This will increase the size of the export file
617        """
618        if filename == "":
619            raise ValueError("filename must be specified to export the graph")
620        filename = (
621            filename
622            if filename.endswith(".gridgraph")
623            else filename + ".gridgraph"
624        )
625        try:
626            import pickle, zlib, os
627        except ImportError:
628            raise ImportError(
629                "os, pickle and zlib are required to export the graph"
630            )
631        export_data = {
632            "graph_attributes": {
633                "graph": self.graph,
634                "nodes": self.nodes,
635                "x_size": self.x_size,
636                "y_size": self.y_size,
637                "shape": self.shape,
638                "add_exterior_walls": self.add_exterior_walls,
639                "conn_data": self.conn_data,
640            },
641            "graph_cache": self.cacheGraph.cache,
642            "export_version": 1,
643        }
644        if include_blocks:
645            export_data["graph_attributes"]["blocks"] = self.blocks
646
647        if not os.path.exists(os.path.dirname(filename)):
648            os.makedirs(os.path.dirname(filename))
649        with open(filename, "wb") as f:
650            f.write(zlib.compress(pickle.dumps(export_data)))
651
652    @staticmethod
653    def import_object(filename: str = "") -> None:
654        """
655        Function:
656
657        - A staticmethod to import the graph from a file
658
659        Arguments:
660
661        - `filename`
662            - Type: str
663            - What: The name of the file to import the graph from.
664            - An extension of .gridgraph will be added to the file name if not already present
665
666        Returns:
667
668        - `GridGraph`
669            - Type: GridGraph
670            - What: The imported graph object
671        """
672        if filename == "":
673            raise ValueError("filename must be specified to import the graph")
674        filename = (
675            filename
676            if filename.endswith(".gridgraph")
677            else filename + ".gridgraph"
678        )
679        try:
680            import pickle, zlib
681        except ImportError:
682            raise ImportError(
683                "pickle and zlib are required to import the graph"
684            )
685        with open(filename, "rb") as f:
686            import_data = pickle.loads(zlib.decompress(f.read()))
687
688        # Check the version of the export
689        if import_data["export_version"] != 1:
690            raise ValueError(
691                f"Incompatible export version {import_data['export_version']}. The current version is 1"
692            )
693        # Create a new basic GridGraph object and overwrite the attributes with the imported data
694        # This is done as the init method calls the __create_graph__ method which is not needed here
695        GridGraph_object = GridGraph(
696            x_size=1, y_size=1, blocks=[], add_exterior_walls=False
697        )
698        for key, value in import_data["graph_attributes"].items():
699            GridGraph_object.__setattr__(key, value)
700
701        GridGraph_object.cacheGraph = CacheGraph(GridGraph_object.graph)
702        GridGraph_object.cacheGraph.cache = import_data["graph_cache"]
703        return GridGraph_object
class GridGraph:
  8class GridGraph:
  9    def __init__(
 10        self,
 11        x_size: int,
 12        y_size: int,
 13        blocks: list[tuple[int, int]],
 14        shape: list[tuple[int | float, int | float]] | None = None,
 15        add_exterior_walls: bool = True,
 16        conn_data: list[tuple[int, int, float]] | None = None,
 17    ) -> None:
 18        """
 19        Initializes a GridGraph object.
 20
 21        Required Arguments:
 22
 23        - `x_size`
 24            - Type: int
 25            - What: The size of the grid in the x direction
 26        - `y_size`
 27            - Type: int
 28            - What: The size of the grid in the y direction
 29        - `blocks`
 30            - Type: list of tuples
 31            - What: A list of tuples representing the coordinates of the blocked cells
 32
 33        Optional Arguments:
 34
 35        - `shape`
 36            - Type: list of tuples
 37            - What: A list of tuples representing the shape of the moving object relative to the object location in the grid
 38            - Default: [(0, 0), (0, 1), (1, 0), (1, 1)]
 39            - Example: [(0, 0), (0, 1), (1, 0), (1, 1)]
 40                - Note: This would form a square of size 1x1
 41                - Assuming the grid location is at (0,0) the square takes up the grid space between (0, 0) and (1, 1)
 42                - Assumint the grid location is at (1,1) the square takes up the grid space between (1, 1) and (2, 2) with the same shape
 43            - Note: This shape is used to determine which connections are allowed when passing through the grid.
 44                - For example
 45                    - The shape is a square of size 1x1
 46                    - The location of the square is (0, 0)
 47                    - There is a blocked cell at (0, 1)
 48                    - There is a potential connection to (1, 1), (0, 1), and (1, 0)
 49                    - The square can not move to (1, 1) as it would collide with the blocked cell at (0, 1)
 50                      - This is because the square would pass partly through the blocked cell on its way to (1, 1)
 51                      - To achieve the same result, it can move to (1, 0) and then up to (1, 1) taking extra time / distance
 52        - `add_exterior_walls`
 53            - Type: bool
 54            - What: Whether to add exterior walls to the grid
 55            - Default: True
 56        - `conn_data`
 57            - Type: list of tuples
 58            - What: A list of tuples representing movements allowed in the grid
 59                - Each tuple should contain three values: (x_offset, y_offset, distance)
 60                - x_offset: The x offset from the current cell
 61                - y_offset: The y offset from the current cell
 62                - distance: The distance to the connected cell
 63            - Default: [(0, 1, 1), (1, 0, 1), (0, -1, 1), (-1, 0, 1), (1, 1, sqrt(2)), (-1, -1, sqrt(2)), (1, -1, sqrt(2)), (-1, 1, sqrt(2))]
 64                - Note: If None, the default connection data will be used
 65                - Note: This includes 8 directions, 4 cardinal and 4 diagonal
 66                - Cardinal directions are 1 unit away, diagonal directions are sqrt(2) units away
 67        """
 68        # Validate the inputs
 69        assert x_size > 0, "x_size must be greater than 0"
 70        assert y_size > 0, "y_size must be greater than 0"
 71
 72        self.x_size = x_size
 73        self.y_size = y_size
 74        self.blocks = blocks
 75        self.add_exterior_walls = add_exterior_walls
 76
 77        # Update the blocks if add_exterior_walls is True
 78        # This is done by adding all the cells in the first and last row and column
 79        if add_exterior_walls:
 80            self.blocks += [
 81                (x, y) for x in range(x_size) for y in [0, y_size - 1]
 82            ] + [(x, y) for x in [0, x_size - 1] for y in range(y_size)]
 83
 84        # If shape is None, set the default shape
 85        if shape is None:
 86            self.shape = [(0, 0), (0, 1), (1, 0), (1, 1)]
 87        else:
 88            self.shape = shape
 89
 90        # If conn_data is None, set the default connection data
 91        if conn_data is None:
 92            sqrt_2 = 1.4142
 93            self.conn_data = [
 94                (-1, -1, sqrt_2),
 95                (-1, 0, 1),
 96                (-1, 1, sqrt_2),
 97                (0, -1, 1),
 98                (0, 1, 1),
 99                (1, -1, sqrt_2),
100                (1, 0, 1),
101                (1, 1, sqrt_2),
102            ]
103        else:
104            self.conn_data = conn_data
105
106        self.graph = self.__create_graph__()
107        self.nodes = [self.__get_x_y__(idx) for idx in range(len(self.graph))]
108        self.cacheGraph = CacheGraph(self.graph)
109
110    def __get_idx__(self, x: int, y: int):
111        """
112        Function:
113        - Get the index of a cell in a 2D grid given its x and y coordinates
114        - This does not have any error checking, so be careful with the inputs
115
116        Required Arguments:
117        - `x`
118            - Type: int
119            - What: The x coordinate of the cell
120        - `y`
121            - Type: int
122            - What: The y coordinate of the cell
123
124        Returns:
125
126        - `idx`
127            - Type: int
128            - What: The index of the cell in the grid
129        """
130        return x + y * self.x_size
131
132    def __get_x_y__(self, idx: int):
133        """
134        Function:
135        - Get the x and y coordinates of a cell in a 2D grid given its index
136        - This does not have any error checking so be careful with the inputs
137
138        Required Arguments:
139
140        - `idx`
141            - Type: int
142            - What: The index of the cell
143
144        Returns:
145        - `x`
146            - Type: int
147            - What: The x coordinate of the cell
148        - `y`
149            - Type: int
150            - What: The y coordinate of the cell
151        """
152        return idx % self.x_size, idx // self.x_size
153
154    def get_idx(self, x: int, y: int):
155        """
156        Function:
157        - Get the index of a cell in a 2D grid given its x and y coordinates
158
159        Required Arguments:
160        - `x`
161            - Type: int
162            - What: The x coordinate of the cell
163        - `y`
164            - Type: int
165            - What: The y coordinate of the cell
166
167        Returns:
168
169        - `idx`
170            - Type: int
171            - What: The index of the cell in the grid
172        """
173        assert x >= 0 and x < self.x_size, "x coordinate is out of bounds"
174        assert y >= 0 and y < self.y_size, "y coordinate is out of bounds"
175        return x + y * self.x_size
176
177    def get_x_y(self, idx: int):
178        """
179        Function:
180        - Get the x and y coordinates of a cell in a 2D grid given its index
181
182        Required Arguments:
183
184        - `idx`
185            - Type: int
186            - What: The index of the cell
187
188        Returns:
189        - `x`
190            - Type: int
191            - What: The x coordinate of the cell
192        - `y`
193            - Type: int
194            - What: The y coordinate of the cell
195        """
196        assert idx >= 0 and idx < len(self.graph), "idx is out of bounds"
197        return idx % self.x_size, idx // self.x_size
198
199    def __get_relative_shape_offsets__(
200        self,
201        shape: list[tuple[int | float, int | float]],
202        x_off: int,
203        y_off: int,
204    ):
205        return list(
206            ShapeMoverUtils.moving_shape_overlap_intervals(
207                x_coord=0,
208                y_coord=0,
209                x_shift=x_off,
210                y_shift=y_off,
211                t_start=0,
212                t_end=1,
213                shape=shape,
214            ).keys()
215        )
216
217    def __create_graph__(
218        self,
219    ):
220        """
221        Function:
222
223        - Create a graph from the grid specifications
224
225        Returns:
226
227        - `graph`
228            - Type: list
229            - What: The adjacency list representation of the graph
230        """
231        ####################
232        # Create a list of lists to hold all the possible connections in an easy to access/understand format
233        ####################
234
235        # Each cell in the list will be a dictionary of connections
236        # The keys will be the coordinates of the cell that can be moved to
237        # The values will be the distance to that cell
238        # This essentially creates a sparse matrix
239        graph_matrix = [
240            [{} for _ in range(self.x_size)] for _ in range(self.y_size)
241        ]
242        for y in range(self.y_size):
243            for x in range(self.x_size):
244                for x_off, y_off, dist in self.conn_data:
245                    if (
246                        0 <= x + x_off < self.x_size
247                        and 0 <= y + y_off < self.y_size
248                    ):
249                        graph_matrix[y][x][(x + x_off, y + y_off)] = dist
250
251        ####################
252        # Remove all connections that would not be possible based on the blocks and the shape
253        ####################
254
255        # Create a set of offsets that will be used to help remove connections relative to each blocked cell
256        delta_offsets = {
257            (x_delta, y_delta): self.__get_relative_shape_offsets__(
258                self.shape, x_delta, y_delta
259            )
260            for x_delta, y_delta, _ in self.conn_data
261        }
262
263        # Loop over all the blocks and remove all connections that would be not possible based on the blocks
264        for block in self.blocks:
265            # Clear out all connections leaving from the blocked cell
266            graph_matrix[block[1]][block[0]] = {}
267            # Clear out all connections that would be blocked by each movement given the blocked cell and the delta offsets
268            # Our delta offsets dictionary essentially tells us which relative cells would be blocked by the moving shape
269            for delta, offsets in delta_offsets.items():
270                for offset in offsets:
271                    x_cell = block[0] - offset[0]
272                    y_cell = block[1] - offset[1]
273                    if 0 <= x_cell < self.x_size and 0 <= y_cell < self.y_size:
274                        x_move_to = x_cell + delta[0]
275                        y_move_to = y_cell + delta[1]
276                        graph_matrix[y_cell][x_cell].pop(
277                            (x_move_to, y_move_to), None
278                        )
279
280        ####################
281        # Convert the graph matrix into the very efficient graph format as specified by the scgraph library
282        ####################
283        graph = []
284        for x_data in graph_matrix:
285            for connection_dict in x_data:
286                graph.append(
287                    {
288                        self.__get_idx__(x, y): dist
289                        for (x, y), dist in connection_dict.items()
290                    }
291                )
292
293        return graph
294
295    def euclidean_heuristic(self, origin_id: int, destination_id: int) -> float:
296        """
297        Function:
298
299        - Calculate the Euclidean distance between two nodes in the grid graph
300
301        Required Arguments:
302
303        - `origin_id`
304            - Type: int
305            - What: The id of the origin node
306        - `destination_id`
307            - Type: int
308            - What: The id of the destination node
309
310        Returns:
311
312        - `distance`
313            - Type: float
314            - What: The Euclidean distance between the two nodes
315        """
316        origin_location = self.nodes[origin_id]
317        destination_location = self.nodes[destination_id]
318        return (
319            (origin_location[0] - destination_location[0]) ** 2
320            + (origin_location[1] - destination_location[1]) ** 2
321        ) ** 0.5
322
323    def __get_closest_node_with_connections__(
324        self,
325        x: int | float,
326        y: int | float,
327    ):
328        """
329        Function:
330
331        - Get the closest node in the graph that has connections
332
333        Required Arguments:
334
335        - `x`
336            - Type: int | float
337            - What: The x coordinate of the node
338        - `y`
339            - Type: int | float
340            - What: The y coordinate of the node
341
342        Returns:
343
344        - `closest_node_id`
345            - Type: int
346            - What: The id of the closest node with connections
347        - `closest_distance`
348            - Type: float
349            - What: The distance to the closest node with connections
350        """
351        x = int(x) if int(x) == x else x
352        y = int(y) if int(y) == y else y
353        if isinstance(x, int) and isinstance(y, int):
354            return self.get_idx(x, y), 0.0
355        closest_node_id = None
356        closest_distance = float("inf")
357        for x_off, y_off in [(0, 0), (1, 0), (0, 1), (1, 1)]:
358            node_x = int(x) + x_off
359            node_y = int(y) + y_off
360            try:
361                node_id = self.get_idx(node_x, node_y)
362            except AssertionError:
363                continue
364            if self.graph[node_id] == {}:
365                continue
366            distance = ((node_x - x) ** 2 + (node_y - y) ** 2) ** 0.5
367            if distance < closest_distance:
368                closest_distance = distance
369                closest_node_id = node_id
370        if closest_node_id is None:
371            raise ValueError(
372                "No valid adjacent node with connections found for the given coordinates"
373            )
374        return closest_node_id, closest_distance
375
376    def format_coordinate(self, cooinate_dict, output_coorinate_path):
377        if output_coorinate_path == "list_of_tuples":
378            return (cooinate_dict["x"], cooinate_dict["y"])
379        elif output_coorinate_path == "list_of_lists":
380            return [cooinate_dict["x"], cooinate_dict["y"]]
381        elif output_coordinate_path == "list_of_dicts":
382            return {"x": cooinate_dict["x"], "y": cooinate_dict["y"]}
383        else:
384            raise ValueError("Invalid output_coordinate_path format")
385
386    def get_shortest_path(
387        self,
388        origin_node: (
389            dict[Literal["x", "y"], int | float]
390            | tuple[int | float, int | float]
391            | list[int | float]
392        ),
393        destination_node: (
394            dict[Literal["x", "y"], int | float]
395            | tuple[int | float, int | float]
396            | list[int | float]
397        ),
398        output_coordinate_path: str = "list_of_dicts",
399        cache: bool = False,
400        cache_for: str = "origin",
401        output_path: bool = False,
402        heuristic_fn: callable | Literal["euclidean"] | None = "euclidean",
403        algorithm_fn: callable = Graph.a_star,
404        algorithm_kwargs: dict | None = None,
405        **kwargs,
406    ) -> dict:
407        """
408        Function:
409
410        - Identify the shortest path between two nodes in a sparse network graph
411        - If an off graph origin and/or destination node is provided, it will find the closest adjacent node with connections
412          as the origin and/or destination node
413            - This is done to increase the likelihood that the pathfinding algorithm can find a valid path
414            - If no valid adjacent node is found, an error will be raised
415
416        - Return a dictionary of various path information including:
417            - `path`: A list of graph ids in the order they are visited
418            - `coordinate_path`: A list of dicts (x, y) in the order they are visited
419            - `length`: The length of the path
420
421         Required Arguments:
422
423        - `origin_node`
424            - Type: dict of int | float
425            - What: A dictionary with the keys 'x' and 'y'
426            - Alternatively, a tuple or list of two values (x, y) can be used
427        - `destination_node`
428            - Type: dict of int | float
429            - What: A dictionary with the keys 'x' and 'y'
430            - Alternatively, a tuple or list of two values (x, y) can be used
431
432        Optional Arguments:
433
434        - `output_coordinate_path`
435            - Type: str
436            - What: The format of the output coordinate path
437            - Default: 'list_of_lists'
438            - Options:
439                - `list_of_tuples`: A list of tuples with the first value being x and the second being y
440                - 'list_of_dicts': A list of dictionaries with keys 'x' and 'y'
441                - 'list_of_lists': A list of lists with the first value being x and the second being y
442        - `cache`
443            - Type: bool
444            - What: Whether to use the cache (save and reuse the spanning tree)
445            - Default: False
446        - `cache_for`
447            - Type: str
448            - What: Whether to cache the spanning tree for the origin or destination node if `cache` is True
449            - Default: 'origin'
450            - Options: 'origin', 'destination'
451        - `output_path`
452            - Type: bool
453            - What: Whether to output the path as a list of graph idxs (mostly for debugging purposes)
454            - Default: False
455        - `heuristic_fn`
456            - Type: callable | Literal['euclidean'] | None
457            - What: A heuristic function to use for the A* algorithm if caching is False
458            - Default: 'euclidean' (A predefined heuristic function that calculates the Euclidean distance for this grid graph)
459            - If None, the A* algorithm will default to Dijkstra's algorithm
460            - If a callable is provided, it should take two arguments: origin_id and destination_id and return a float representing the heuristic distance between the two nodes
461                - Note: This distance should never be greater than the actual distance between the two nodes or you may get suboptimal paths
462                - Note: This is deprecated and will be removed in a future version
463        - `algorithm_fn`
464            - Type: callable | None
465            - What: The algorithm to use for pathfinding
466            - Default: 'a_star'
467            - If None, the default algorithm will be used
468        - `algorithm_kwargs`
469            - Type: dict
470            - What: Additional keyword arguments to pass to the algorithm function
471            - Default: {}
472        - `**kwargs`
473            - Additional keyword arguments. These are included for forwards and backwards compatibility reasons, but are not currently used.
474        """
475        if isinstance(origin_node, (tuple, list)):
476            origin_node = {"x": origin_node[0], "y": origin_node[1]}
477        if isinstance(destination_node, (tuple, list)):
478            destination_node = {
479                "x": destination_node[0],
480                "y": destination_node[1],
481            }
482        if algorithm_kwargs is None:
483            algorithm_kwargs = {}
484
485        origin_id, origin_distance = self.__get_closest_node_with_connections__(
486            **origin_node
487        )
488        destination_id, destination_distance = (
489            self.__get_closest_node_with_connections__(**destination_node)
490        )
491
492        if self.graph[origin_id] == {}:
493            raise ValueError(
494                "Origin node is not connected to any other nodes. This is likely caused by the origin node not being possible given a blocked cell or nearby blocked cell"
495            )
496        if self.graph[destination_id] == {}:
497            raise ValueError(
498                "Destination node is not connected to any other nodes. This is likely caused by the destination node not being possible given a blocked cell or nearby blocked cell"
499            )
500        if cache:
501            if cache_for not in ["origin", "destination"]:
502                raise ValueError(
503                    "cache_for must be 'origin' or 'destination' when cache is True"
504                )
505            # Reverse for cache graphing if cache_for is destination since the graph is undirected
506            if cache_for == "destination":
507                origin_id, destination_id = destination_id, origin_id
508            output = self.cacheGraph.get_shortest_path(
509                origin_id=origin_id,
510                destination_id=destination_id,
511            )
512            # Undo the reverse if cache_for is destination
513            if cache_for == "destination":
514                output["path"].reverse()
515                origin_id, destination_id = destination_id, origin_id
516        else:
517            # TODO: Remove this backwards compatibility hack in future versions
518            if algorithm_fn == Graph.a_star:
519                if "heuristic_fn" not in algorithm_kwargs:
520                    algorithm_kwargs["heuristic_fn"] = (
521                        self.euclidean_heuristic
522                        if heuristic_fn == "euclidean"
523                        else heuristic_fn
524                    )
525            output = algorithm_fn(
526                graph=self.graph,
527                origin_id=origin_id,
528                destination_id=destination_id,
529                **algorithm_kwargs,
530            )
531        output["coordinate_path"] = self.get_coordinate_path(
532            output["path"], output_coordinate_path
533        )
534        if origin_distance > 0:
535            output["coordinate_path"] = [
536                self.format_coordinate(origin_node, output_coordinate_path)
537            ] + output["coordinate_path"]
538            output["path"] = [-1] + output["path"]
539            output["length"] += origin_distance
540        if destination_distance > 0:
541            output["coordinate_path"].append(
542                self.format_coordinate(destination_node, output_coordinate_path)
543            )
544            output["path"].append(-1)
545            output["length"] += destination_distance
546        if not output_path:
547            del output["path"]
548        return output
549
550    def get_coordinate_path(
551        self, path: list[int], output_coordinate_path: str = "list_of_dicts"
552    ) -> list[dict[str, int]]:
553        """
554        Function:
555
556        - Return a list of node dictionaries (lat + long) in the order they are visited
557
558        Required Arguments:
559
560        - `path`
561            - Type: list
562            - What: A list of node ids in the order they are visited
563
564        Optional Arguments:
565
566        - `output_coordinate_path`
567            - Type: str
568            - What: The format of the output coordinate path
569            - Default: 'list_of_dicts'
570            - Options:
571                - 'list_of_dicts': A list of dictionaries with keys 'x' and 'y'
572                - 'list_of_lists': A list of lists with the first value being x and the second being y
573                - `list_of_tuples`: A list of tuples with the first value being x and the second being y
574
575        Returns:
576
577        - `coordinate_path`
578            - Type: list
579            - What: A list of dictionaries with keys 'x' and 'y' in the order they are visited
580        """
581        output = [self.__get_x_y__(idx) for idx in path]
582        if output_coordinate_path == "list_of_tuples":
583            return output
584        elif output_coordinate_path == "list_of_lists":
585            return [[x, y] for x, y in output]
586        elif output_coordinate_path == "list_of_dicts":
587            return [{"x": x, "y": y} for x, y in output]
588        else:
589            raise ValueError(
590                "output_coordinate_path must be 'list_of_dicts' or 'list_of_lists' or 'list_of_tuples'"
591            )
592
593    def export_object(
594        self, filename: str = "", include_blocks: bool = False
595    ) -> dict:
596        """
597        Function:
598
599        - Export the graph as a list of dictionaries
600
601        Arguments:
602
603        - `filename`
604            - Type: str
605            - What: The name of the file to export the graph to.
606            - An extension of .gridgraph will be added to the file name if not already present
607
608        Optional Arguments:
609
610        - `include_blocks`
611            - Type: bool
612            - What: Whether to include the blocks in the export
613            - Default: False
614            - Note: This is not needed as the graph is already created
615            - Note: You can include blocks in the export if you need them for some reason
616                - This will be set as the blocks attribute in the imported object
617                - This will increase the size of the export file
618        """
619        if filename == "":
620            raise ValueError("filename must be specified to export the graph")
621        filename = (
622            filename
623            if filename.endswith(".gridgraph")
624            else filename + ".gridgraph"
625        )
626        try:
627            import pickle, zlib, os
628        except ImportError:
629            raise ImportError(
630                "os, pickle and zlib are required to export the graph"
631            )
632        export_data = {
633            "graph_attributes": {
634                "graph": self.graph,
635                "nodes": self.nodes,
636                "x_size": self.x_size,
637                "y_size": self.y_size,
638                "shape": self.shape,
639                "add_exterior_walls": self.add_exterior_walls,
640                "conn_data": self.conn_data,
641            },
642            "graph_cache": self.cacheGraph.cache,
643            "export_version": 1,
644        }
645        if include_blocks:
646            export_data["graph_attributes"]["blocks"] = self.blocks
647
648        if not os.path.exists(os.path.dirname(filename)):
649            os.makedirs(os.path.dirname(filename))
650        with open(filename, "wb") as f:
651            f.write(zlib.compress(pickle.dumps(export_data)))
652
653    @staticmethod
654    def import_object(filename: str = "") -> None:
655        """
656        Function:
657
658        - A staticmethod to import the graph from a file
659
660        Arguments:
661
662        - `filename`
663            - Type: str
664            - What: The name of the file to import the graph from.
665            - An extension of .gridgraph will be added to the file name if not already present
666
667        Returns:
668
669        - `GridGraph`
670            - Type: GridGraph
671            - What: The imported graph object
672        """
673        if filename == "":
674            raise ValueError("filename must be specified to import the graph")
675        filename = (
676            filename
677            if filename.endswith(".gridgraph")
678            else filename + ".gridgraph"
679        )
680        try:
681            import pickle, zlib
682        except ImportError:
683            raise ImportError(
684                "pickle and zlib are required to import the graph"
685            )
686        with open(filename, "rb") as f:
687            import_data = pickle.loads(zlib.decompress(f.read()))
688
689        # Check the version of the export
690        if import_data["export_version"] != 1:
691            raise ValueError(
692                f"Incompatible export version {import_data['export_version']}. The current version is 1"
693            )
694        # Create a new basic GridGraph object and overwrite the attributes with the imported data
695        # This is done as the init method calls the __create_graph__ method which is not needed here
696        GridGraph_object = GridGraph(
697            x_size=1, y_size=1, blocks=[], add_exterior_walls=False
698        )
699        for key, value in import_data["graph_attributes"].items():
700            GridGraph_object.__setattr__(key, value)
701
702        GridGraph_object.cacheGraph = CacheGraph(GridGraph_object.graph)
703        GridGraph_object.cacheGraph.cache = import_data["graph_cache"]
704        return GridGraph_object
GridGraph( x_size: int, y_size: int, blocks: list[tuple[int, int]], shape: list[tuple[int | float, int | float]] | None = None, add_exterior_walls: bool = True, conn_data: list[tuple[int, int, float]] | None = None)
  9    def __init__(
 10        self,
 11        x_size: int,
 12        y_size: int,
 13        blocks: list[tuple[int, int]],
 14        shape: list[tuple[int | float, int | float]] | None = None,
 15        add_exterior_walls: bool = True,
 16        conn_data: list[tuple[int, int, float]] | None = None,
 17    ) -> None:
 18        """
 19        Initializes a GridGraph object.
 20
 21        Required Arguments:
 22
 23        - `x_size`
 24            - Type: int
 25            - What: The size of the grid in the x direction
 26        - `y_size`
 27            - Type: int
 28            - What: The size of the grid in the y direction
 29        - `blocks`
 30            - Type: list of tuples
 31            - What: A list of tuples representing the coordinates of the blocked cells
 32
 33        Optional Arguments:
 34
 35        - `shape`
 36            - Type: list of tuples
 37            - What: A list of tuples representing the shape of the moving object relative to the object location in the grid
 38            - Default: [(0, 0), (0, 1), (1, 0), (1, 1)]
 39            - Example: [(0, 0), (0, 1), (1, 0), (1, 1)]
 40                - Note: This would form a square of size 1x1
 41                - Assuming the grid location is at (0,0) the square takes up the grid space between (0, 0) and (1, 1)
 42                - Assumint the grid location is at (1,1) the square takes up the grid space between (1, 1) and (2, 2) with the same shape
 43            - Note: This shape is used to determine which connections are allowed when passing through the grid.
 44                - For example
 45                    - The shape is a square of size 1x1
 46                    - The location of the square is (0, 0)
 47                    - There is a blocked cell at (0, 1)
 48                    - There is a potential connection to (1, 1), (0, 1), and (1, 0)
 49                    - The square can not move to (1, 1) as it would collide with the blocked cell at (0, 1)
 50                      - This is because the square would pass partly through the blocked cell on its way to (1, 1)
 51                      - To achieve the same result, it can move to (1, 0) and then up to (1, 1) taking extra time / distance
 52        - `add_exterior_walls`
 53            - Type: bool
 54            - What: Whether to add exterior walls to the grid
 55            - Default: True
 56        - `conn_data`
 57            - Type: list of tuples
 58            - What: A list of tuples representing movements allowed in the grid
 59                - Each tuple should contain three values: (x_offset, y_offset, distance)
 60                - x_offset: The x offset from the current cell
 61                - y_offset: The y offset from the current cell
 62                - distance: The distance to the connected cell
 63            - Default: [(0, 1, 1), (1, 0, 1), (0, -1, 1), (-1, 0, 1), (1, 1, sqrt(2)), (-1, -1, sqrt(2)), (1, -1, sqrt(2)), (-1, 1, sqrt(2))]
 64                - Note: If None, the default connection data will be used
 65                - Note: This includes 8 directions, 4 cardinal and 4 diagonal
 66                - Cardinal directions are 1 unit away, diagonal directions are sqrt(2) units away
 67        """
 68        # Validate the inputs
 69        assert x_size > 0, "x_size must be greater than 0"
 70        assert y_size > 0, "y_size must be greater than 0"
 71
 72        self.x_size = x_size
 73        self.y_size = y_size
 74        self.blocks = blocks
 75        self.add_exterior_walls = add_exterior_walls
 76
 77        # Update the blocks if add_exterior_walls is True
 78        # This is done by adding all the cells in the first and last row and column
 79        if add_exterior_walls:
 80            self.blocks += [
 81                (x, y) for x in range(x_size) for y in [0, y_size - 1]
 82            ] + [(x, y) for x in [0, x_size - 1] for y in range(y_size)]
 83
 84        # If shape is None, set the default shape
 85        if shape is None:
 86            self.shape = [(0, 0), (0, 1), (1, 0), (1, 1)]
 87        else:
 88            self.shape = shape
 89
 90        # If conn_data is None, set the default connection data
 91        if conn_data is None:
 92            sqrt_2 = 1.4142
 93            self.conn_data = [
 94                (-1, -1, sqrt_2),
 95                (-1, 0, 1),
 96                (-1, 1, sqrt_2),
 97                (0, -1, 1),
 98                (0, 1, 1),
 99                (1, -1, sqrt_2),
100                (1, 0, 1),
101                (1, 1, sqrt_2),
102            ]
103        else:
104            self.conn_data = conn_data
105
106        self.graph = self.__create_graph__()
107        self.nodes = [self.__get_x_y__(idx) for idx in range(len(self.graph))]
108        self.cacheGraph = CacheGraph(self.graph)

Initializes a GridGraph object.

Required Arguments:

  • x_size
    • Type: int
    • What: The size of the grid in the x direction
  • y_size
    • Type: int
    • What: The size of the grid in the y direction
  • blocks
    • Type: list of tuples
    • What: A list of tuples representing the coordinates of the blocked cells

Optional Arguments:

  • shape
    • Type: list of tuples
    • What: A list of tuples representing the shape of the moving object relative to the object location in the grid
    • Default: [(0, 0), (0, 1), (1, 0), (1, 1)]
    • Example: [(0, 0), (0, 1), (1, 0), (1, 1)]
      • Note: This would form a square of size 1x1
      • Assuming the grid location is at (0,0) the square takes up the grid space between (0, 0) and (1, 1)
      • Assumint the grid location is at (1,1) the square takes up the grid space between (1, 1) and (2, 2) with the same shape
    • Note: This shape is used to determine which connections are allowed when passing through the grid.
      • For example
        • The shape is a square of size 1x1
        • The location of the square is (0, 0)
        • There is a blocked cell at (0, 1)
        • There is a potential connection to (1, 1), (0, 1), and (1, 0)
        • The square can not move to (1, 1) as it would collide with the blocked cell at (0, 1)
          • This is because the square would pass partly through the blocked cell on its way to (1, 1)
          • To achieve the same result, it can move to (1, 0) and then up to (1, 1) taking extra time / distance
  • add_exterior_walls
    • Type: bool
    • What: Whether to add exterior walls to the grid
    • Default: True
  • conn_data
    • Type: list of tuples
    • What: A list of tuples representing movements allowed in the grid
      • Each tuple should contain three values: (x_offset, y_offset, distance)
      • x_offset: The x offset from the current cell
      • y_offset: The y offset from the current cell
      • distance: The distance to the connected cell
    • Default: [(0, 1, 1), (1, 0, 1), (0, -1, 1), (-1, 0, 1), (1, 1, sqrt(2)), (-1, -1, sqrt(2)), (1, -1, sqrt(2)), (-1, 1, sqrt(2))]
      • Note: If None, the default connection data will be used
      • Note: This includes 8 directions, 4 cardinal and 4 diagonal
      • Cardinal directions are 1 unit away, diagonal directions are sqrt(2) units away
x_size
y_size
blocks
add_exterior_walls
graph
nodes
cacheGraph
def get_idx(self, x: int, y: int):
154    def get_idx(self, x: int, y: int):
155        """
156        Function:
157        - Get the index of a cell in a 2D grid given its x and y coordinates
158
159        Required Arguments:
160        - `x`
161            - Type: int
162            - What: The x coordinate of the cell
163        - `y`
164            - Type: int
165            - What: The y coordinate of the cell
166
167        Returns:
168
169        - `idx`
170            - Type: int
171            - What: The index of the cell in the grid
172        """
173        assert x >= 0 and x < self.x_size, "x coordinate is out of bounds"
174        assert y >= 0 and y < self.y_size, "y coordinate is out of bounds"
175        return x + y * self.x_size

Function:

  • Get the index of a cell in a 2D grid given its x and y coordinates

Required Arguments:

  • x
    • Type: int
    • What: The x coordinate of the cell
  • y
    • Type: int
    • What: The y coordinate of the cell

Returns:

  • idx
    • Type: int
    • What: The index of the cell in the grid
def get_x_y(self, idx: int):
177    def get_x_y(self, idx: int):
178        """
179        Function:
180        - Get the x and y coordinates of a cell in a 2D grid given its index
181
182        Required Arguments:
183
184        - `idx`
185            - Type: int
186            - What: The index of the cell
187
188        Returns:
189        - `x`
190            - Type: int
191            - What: The x coordinate of the cell
192        - `y`
193            - Type: int
194            - What: The y coordinate of the cell
195        """
196        assert idx >= 0 and idx < len(self.graph), "idx is out of bounds"
197        return idx % self.x_size, idx // self.x_size

Function:

  • Get the x and y coordinates of a cell in a 2D grid given its index

Required Arguments:

  • idx
    • Type: int
    • What: The index of the cell

Returns:

  • x
    • Type: int
    • What: The x coordinate of the cell
  • y
    • Type: int
    • What: The y coordinate of the cell
def euclidean_heuristic(self, origin_id: int, destination_id: int) -> float:
295    def euclidean_heuristic(self, origin_id: int, destination_id: int) -> float:
296        """
297        Function:
298
299        - Calculate the Euclidean distance between two nodes in the grid graph
300
301        Required Arguments:
302
303        - `origin_id`
304            - Type: int
305            - What: The id of the origin node
306        - `destination_id`
307            - Type: int
308            - What: The id of the destination node
309
310        Returns:
311
312        - `distance`
313            - Type: float
314            - What: The Euclidean distance between the two nodes
315        """
316        origin_location = self.nodes[origin_id]
317        destination_location = self.nodes[destination_id]
318        return (
319            (origin_location[0] - destination_location[0]) ** 2
320            + (origin_location[1] - destination_location[1]) ** 2
321        ) ** 0.5

Function:

  • Calculate the Euclidean distance between two nodes in the grid graph

Required Arguments:

  • origin_id
    • Type: int
    • What: The id of the origin node
  • destination_id
    • Type: int
    • What: The id of the destination node

Returns:

  • distance
    • Type: float
    • What: The Euclidean distance between the two nodes
def format_coordinate(self, cooinate_dict, output_coorinate_path):
376    def format_coordinate(self, cooinate_dict, output_coorinate_path):
377        if output_coorinate_path == "list_of_tuples":
378            return (cooinate_dict["x"], cooinate_dict["y"])
379        elif output_coorinate_path == "list_of_lists":
380            return [cooinate_dict["x"], cooinate_dict["y"]]
381        elif output_coordinate_path == "list_of_dicts":
382            return {"x": cooinate_dict["x"], "y": cooinate_dict["y"]}
383        else:
384            raise ValueError("Invalid output_coordinate_path format")
def get_shortest_path( self, origin_node: dict[typing.Literal['x', 'y'], int | float] | tuple[int | float, int | float] | list[int | float], destination_node: dict[typing.Literal['x', 'y'], int | float] | tuple[int | float, int | float] | list[int | float], output_coordinate_path: str = 'list_of_dicts', cache: bool = False, cache_for: str = 'origin', output_path: bool = False, heuristic_fn: Union[<built-in function callable>, Literal['euclidean'], NoneType] = 'euclidean', algorithm_fn: <built-in function callable> = <function Graph.a_star>, algorithm_kwargs: dict | None = None, **kwargs) -> dict:
386    def get_shortest_path(
387        self,
388        origin_node: (
389            dict[Literal["x", "y"], int | float]
390            | tuple[int | float, int | float]
391            | list[int | float]
392        ),
393        destination_node: (
394            dict[Literal["x", "y"], int | float]
395            | tuple[int | float, int | float]
396            | list[int | float]
397        ),
398        output_coordinate_path: str = "list_of_dicts",
399        cache: bool = False,
400        cache_for: str = "origin",
401        output_path: bool = False,
402        heuristic_fn: callable | Literal["euclidean"] | None = "euclidean",
403        algorithm_fn: callable = Graph.a_star,
404        algorithm_kwargs: dict | None = None,
405        **kwargs,
406    ) -> dict:
407        """
408        Function:
409
410        - Identify the shortest path between two nodes in a sparse network graph
411        - If an off graph origin and/or destination node is provided, it will find the closest adjacent node with connections
412          as the origin and/or destination node
413            - This is done to increase the likelihood that the pathfinding algorithm can find a valid path
414            - If no valid adjacent node is found, an error will be raised
415
416        - Return a dictionary of various path information including:
417            - `path`: A list of graph ids in the order they are visited
418            - `coordinate_path`: A list of dicts (x, y) in the order they are visited
419            - `length`: The length of the path
420
421         Required Arguments:
422
423        - `origin_node`
424            - Type: dict of int | float
425            - What: A dictionary with the keys 'x' and 'y'
426            - Alternatively, a tuple or list of two values (x, y) can be used
427        - `destination_node`
428            - Type: dict of int | float
429            - What: A dictionary with the keys 'x' and 'y'
430            - Alternatively, a tuple or list of two values (x, y) can be used
431
432        Optional Arguments:
433
434        - `output_coordinate_path`
435            - Type: str
436            - What: The format of the output coordinate path
437            - Default: 'list_of_lists'
438            - Options:
439                - `list_of_tuples`: A list of tuples with the first value being x and the second being y
440                - 'list_of_dicts': A list of dictionaries with keys 'x' and 'y'
441                - 'list_of_lists': A list of lists with the first value being x and the second being y
442        - `cache`
443            - Type: bool
444            - What: Whether to use the cache (save and reuse the spanning tree)
445            - Default: False
446        - `cache_for`
447            - Type: str
448            - What: Whether to cache the spanning tree for the origin or destination node if `cache` is True
449            - Default: 'origin'
450            - Options: 'origin', 'destination'
451        - `output_path`
452            - Type: bool
453            - What: Whether to output the path as a list of graph idxs (mostly for debugging purposes)
454            - Default: False
455        - `heuristic_fn`
456            - Type: callable | Literal['euclidean'] | None
457            - What: A heuristic function to use for the A* algorithm if caching is False
458            - Default: 'euclidean' (A predefined heuristic function that calculates the Euclidean distance for this grid graph)
459            - If None, the A* algorithm will default to Dijkstra's algorithm
460            - If a callable is provided, it should take two arguments: origin_id and destination_id and return a float representing the heuristic distance between the two nodes
461                - Note: This distance should never be greater than the actual distance between the two nodes or you may get suboptimal paths
462                - Note: This is deprecated and will be removed in a future version
463        - `algorithm_fn`
464            - Type: callable | None
465            - What: The algorithm to use for pathfinding
466            - Default: 'a_star'
467            - If None, the default algorithm will be used
468        - `algorithm_kwargs`
469            - Type: dict
470            - What: Additional keyword arguments to pass to the algorithm function
471            - Default: {}
472        - `**kwargs`
473            - Additional keyword arguments. These are included for forwards and backwards compatibility reasons, but are not currently used.
474        """
475        if isinstance(origin_node, (tuple, list)):
476            origin_node = {"x": origin_node[0], "y": origin_node[1]}
477        if isinstance(destination_node, (tuple, list)):
478            destination_node = {
479                "x": destination_node[0],
480                "y": destination_node[1],
481            }
482        if algorithm_kwargs is None:
483            algorithm_kwargs = {}
484
485        origin_id, origin_distance = self.__get_closest_node_with_connections__(
486            **origin_node
487        )
488        destination_id, destination_distance = (
489            self.__get_closest_node_with_connections__(**destination_node)
490        )
491
492        if self.graph[origin_id] == {}:
493            raise ValueError(
494                "Origin node is not connected to any other nodes. This is likely caused by the origin node not being possible given a blocked cell or nearby blocked cell"
495            )
496        if self.graph[destination_id] == {}:
497            raise ValueError(
498                "Destination node is not connected to any other nodes. This is likely caused by the destination node not being possible given a blocked cell or nearby blocked cell"
499            )
500        if cache:
501            if cache_for not in ["origin", "destination"]:
502                raise ValueError(
503                    "cache_for must be 'origin' or 'destination' when cache is True"
504                )
505            # Reverse for cache graphing if cache_for is destination since the graph is undirected
506            if cache_for == "destination":
507                origin_id, destination_id = destination_id, origin_id
508            output = self.cacheGraph.get_shortest_path(
509                origin_id=origin_id,
510                destination_id=destination_id,
511            )
512            # Undo the reverse if cache_for is destination
513            if cache_for == "destination":
514                output["path"].reverse()
515                origin_id, destination_id = destination_id, origin_id
516        else:
517            # TODO: Remove this backwards compatibility hack in future versions
518            if algorithm_fn == Graph.a_star:
519                if "heuristic_fn" not in algorithm_kwargs:
520                    algorithm_kwargs["heuristic_fn"] = (
521                        self.euclidean_heuristic
522                        if heuristic_fn == "euclidean"
523                        else heuristic_fn
524                    )
525            output = algorithm_fn(
526                graph=self.graph,
527                origin_id=origin_id,
528                destination_id=destination_id,
529                **algorithm_kwargs,
530            )
531        output["coordinate_path"] = self.get_coordinate_path(
532            output["path"], output_coordinate_path
533        )
534        if origin_distance > 0:
535            output["coordinate_path"] = [
536                self.format_coordinate(origin_node, output_coordinate_path)
537            ] + output["coordinate_path"]
538            output["path"] = [-1] + output["path"]
539            output["length"] += origin_distance
540        if destination_distance > 0:
541            output["coordinate_path"].append(
542                self.format_coordinate(destination_node, output_coordinate_path)
543            )
544            output["path"].append(-1)
545            output["length"] += destination_distance
546        if not output_path:
547            del output["path"]
548        return output

Function:

  • Identify the shortest path between two nodes in a sparse network graph
  • If an off graph origin and/or destination node is provided, it will find the closest adjacent node with connections as the origin and/or destination node

    • This is done to increase the likelihood that the pathfinding algorithm can find a valid path
    • If no valid adjacent node is found, an error will be raised
  • Return a dictionary of various path information including:

    • path: A list of graph ids in the order they are visited
    • coordinate_path: A list of dicts (x, y) in the order they are visited
    • length: The length of the path

    Required Arguments:

  • origin_node

    • Type: dict of int | float
    • What: A dictionary with the keys 'x' and 'y'
    • Alternatively, a tuple or list of two values (x, y) can be used
  • destination_node
    • Type: dict of int | float
    • What: A dictionary with the keys 'x' and 'y'
    • Alternatively, a tuple or list of two values (x, y) can be used

Optional Arguments:

  • output_coordinate_path
    • Type: str
    • What: The format of the output coordinate path
    • Default: 'list_of_lists'
    • Options:
      • list_of_tuples: A list of tuples with the first value being x and the second being y
      • 'list_of_dicts': A list of dictionaries with keys 'x' and 'y'
      • 'list_of_lists': A list of lists with the first value being x and the second being y
  • cache
    • Type: bool
    • What: Whether to use the cache (save and reuse the spanning tree)
    • Default: False
  • cache_for
    • Type: str
    • What: Whether to cache the spanning tree for the origin or destination node if cache is True
    • Default: 'origin'
    • Options: 'origin', 'destination'
  • output_path
    • Type: bool
    • What: Whether to output the path as a list of graph idxs (mostly for debugging purposes)
    • Default: False
  • heuristic_fn
    • Type: callable | Literal['euclidean'] | None
    • What: A heuristic function to use for the A* algorithm if caching is False
    • Default: 'euclidean' (A predefined heuristic function that calculates the Euclidean distance for this grid graph)
    • If None, the A* algorithm will default to Dijkstra's algorithm
    • If a callable is provided, it should take two arguments: origin_id and destination_id and return a float representing the heuristic distance between the two nodes
      • Note: This distance should never be greater than the actual distance between the two nodes or you may get suboptimal paths
      • Note: This is deprecated and will be removed in a future version
  • algorithm_fn
    • Type: callable | None
    • What: The algorithm to use for pathfinding
    • Default: 'a_star'
    • If None, the default algorithm will be used
  • algorithm_kwargs
    • Type: dict
    • What: Additional keyword arguments to pass to the algorithm function
    • Default: {}
  • **kwargs
    • Additional keyword arguments. These are included for forwards and backwards compatibility reasons, but are not currently used.
def get_coordinate_path( self, path: list[int], output_coordinate_path: str = 'list_of_dicts') -> list[dict[str, int]]:
550    def get_coordinate_path(
551        self, path: list[int], output_coordinate_path: str = "list_of_dicts"
552    ) -> list[dict[str, int]]:
553        """
554        Function:
555
556        - Return a list of node dictionaries (lat + long) in the order they are visited
557
558        Required Arguments:
559
560        - `path`
561            - Type: list
562            - What: A list of node ids in the order they are visited
563
564        Optional Arguments:
565
566        - `output_coordinate_path`
567            - Type: str
568            - What: The format of the output coordinate path
569            - Default: 'list_of_dicts'
570            - Options:
571                - 'list_of_dicts': A list of dictionaries with keys 'x' and 'y'
572                - 'list_of_lists': A list of lists with the first value being x and the second being y
573                - `list_of_tuples`: A list of tuples with the first value being x and the second being y
574
575        Returns:
576
577        - `coordinate_path`
578            - Type: list
579            - What: A list of dictionaries with keys 'x' and 'y' in the order they are visited
580        """
581        output = [self.__get_x_y__(idx) for idx in path]
582        if output_coordinate_path == "list_of_tuples":
583            return output
584        elif output_coordinate_path == "list_of_lists":
585            return [[x, y] for x, y in output]
586        elif output_coordinate_path == "list_of_dicts":
587            return [{"x": x, "y": y} for x, y in output]
588        else:
589            raise ValueError(
590                "output_coordinate_path must be 'list_of_dicts' or 'list_of_lists' or 'list_of_tuples'"
591            )

Function:

  • Return a list of node dictionaries (lat + long) in the order they are visited

Required Arguments:

  • path
    • Type: list
    • What: A list of node ids in the order they are visited

Optional Arguments:

  • output_coordinate_path
    • Type: str
    • What: The format of the output coordinate path
    • Default: 'list_of_dicts'
    • Options:
      • 'list_of_dicts': A list of dictionaries with keys 'x' and 'y'
      • 'list_of_lists': A list of lists with the first value being x and the second being y
      • list_of_tuples: A list of tuples with the first value being x and the second being y

Returns:

  • coordinate_path
    • Type: list
    • What: A list of dictionaries with keys 'x' and 'y' in the order they are visited
def export_object(self, filename: str = '', include_blocks: bool = False) -> dict:
593    def export_object(
594        self, filename: str = "", include_blocks: bool = False
595    ) -> dict:
596        """
597        Function:
598
599        - Export the graph as a list of dictionaries
600
601        Arguments:
602
603        - `filename`
604            - Type: str
605            - What: The name of the file to export the graph to.
606            - An extension of .gridgraph will be added to the file name if not already present
607
608        Optional Arguments:
609
610        - `include_blocks`
611            - Type: bool
612            - What: Whether to include the blocks in the export
613            - Default: False
614            - Note: This is not needed as the graph is already created
615            - Note: You can include blocks in the export if you need them for some reason
616                - This will be set as the blocks attribute in the imported object
617                - This will increase the size of the export file
618        """
619        if filename == "":
620            raise ValueError("filename must be specified to export the graph")
621        filename = (
622            filename
623            if filename.endswith(".gridgraph")
624            else filename + ".gridgraph"
625        )
626        try:
627            import pickle, zlib, os
628        except ImportError:
629            raise ImportError(
630                "os, pickle and zlib are required to export the graph"
631            )
632        export_data = {
633            "graph_attributes": {
634                "graph": self.graph,
635                "nodes": self.nodes,
636                "x_size": self.x_size,
637                "y_size": self.y_size,
638                "shape": self.shape,
639                "add_exterior_walls": self.add_exterior_walls,
640                "conn_data": self.conn_data,
641            },
642            "graph_cache": self.cacheGraph.cache,
643            "export_version": 1,
644        }
645        if include_blocks:
646            export_data["graph_attributes"]["blocks"] = self.blocks
647
648        if not os.path.exists(os.path.dirname(filename)):
649            os.makedirs(os.path.dirname(filename))
650        with open(filename, "wb") as f:
651            f.write(zlib.compress(pickle.dumps(export_data)))

Function:

  • Export the graph as a list of dictionaries

Arguments:

  • filename
    • Type: str
    • What: The name of the file to export the graph to.
    • An extension of .gridgraph will be added to the file name if not already present

Optional Arguments:

  • include_blocks
    • Type: bool
    • What: Whether to include the blocks in the export
    • Default: False
    • Note: This is not needed as the graph is already created
    • Note: You can include blocks in the export if you need them for some reason
      • This will be set as the blocks attribute in the imported object
      • This will increase the size of the export file
@staticmethod
def import_object(filename: str = '') -> None:
653    @staticmethod
654    def import_object(filename: str = "") -> None:
655        """
656        Function:
657
658        - A staticmethod to import the graph from a file
659
660        Arguments:
661
662        - `filename`
663            - Type: str
664            - What: The name of the file to import the graph from.
665            - An extension of .gridgraph will be added to the file name if not already present
666
667        Returns:
668
669        - `GridGraph`
670            - Type: GridGraph
671            - What: The imported graph object
672        """
673        if filename == "":
674            raise ValueError("filename must be specified to import the graph")
675        filename = (
676            filename
677            if filename.endswith(".gridgraph")
678            else filename + ".gridgraph"
679        )
680        try:
681            import pickle, zlib
682        except ImportError:
683            raise ImportError(
684                "pickle and zlib are required to import the graph"
685            )
686        with open(filename, "rb") as f:
687            import_data = pickle.loads(zlib.decompress(f.read()))
688
689        # Check the version of the export
690        if import_data["export_version"] != 1:
691            raise ValueError(
692                f"Incompatible export version {import_data['export_version']}. The current version is 1"
693            )
694        # Create a new basic GridGraph object and overwrite the attributes with the imported data
695        # This is done as the init method calls the __create_graph__ method which is not needed here
696        GridGraph_object = GridGraph(
697            x_size=1, y_size=1, blocks=[], add_exterior_walls=False
698        )
699        for key, value in import_data["graph_attributes"].items():
700            GridGraph_object.__setattr__(key, value)
701
702        GridGraph_object.cacheGraph = CacheGraph(GridGraph_object.graph)
703        GridGraph_object.cacheGraph.cache = import_data["graph_cache"]
704        return GridGraph_object

Function:

  • A staticmethod to import the graph from a file

Arguments:

  • filename
    • Type: str
    • What: The name of the file to import the graph from.
    • An extension of .gridgraph will be added to the file name if not already present

Returns:

  • GridGraph
    • Type: GridGraph
    • What: The imported graph object