scar_sim.queue

  1from heapq import heappop, heappush
  2from scar_sim.utils import hard_round
  3
  4
  5class Queue:
  6    def __init__(self, log_events: bool = False, precision: int = 4):
  7        """
  8        Initializes a priority queue for managing simulation events.
  9
 10        Optional Arguments:
 11
 12        - log_events (bool): If True, logs each event processed for debugging or analysis. Default is False.
 13        - precision (int): The number of decimal places to round time values to, ensuring numerical stability.
 14            - Default: 4
 15        """
 16        self.__queue__ = []
 17        self.__current_time__ = 0.0
 18        self.__log__ = []
 19        self.__event_dict__ = {}
 20        self.__event_id__ = 0
 21        self.__log_events__ = log_events
 22        self.__precision__ = precision
 23
 24    def add(
 25        self,
 26        time_delta: float,
 27        func,
 28        args: tuple = None,
 29        kwargs: dict = None,
 30    ) -> None:
 31        """
 32        Schedules a new event in the queue to be executed after a specified time delta.
 33
 34        Required Arguments:
 35
 36        - time_delta (float): The time delay after which the event should be executed. Must be non-negative.
 37        - func (callable): The function to be called when the event is processed.
 38
 39        Optional Arguments:
 40
 41        - args (tuple): Positional arguments to pass to the function when called.
 42            - Default: tuple()
 43        - kwargs (dict): Keyword arguments to pass to the function when called.
 44            - Default: dict()
 45        """
 46        args = args if args is not None else tuple()
 47        kwargs = kwargs if kwargs is not None else dict()
 48        if time_delta < 0:
 49            raise ValueError("Cannot schedule events in the past")
 50        self.__event_id__ += 1
 51        self.__event_dict__[self.__event_id__] = (
 52            self.__event_id__,
 53            func,
 54            args,
 55            kwargs,
 56        )
 57        next_time = hard_round(
 58            self.__current_time__ + time_delta, self.__precision__
 59        )
 60        heappush(self.__queue__, (next_time, self.__event_id__))
 61
 62    def process(self) -> None:
 63        """
 64        Processes the next event in the queue, updating the current time and executing the associated function.
 65
 66        Logs the event if logging is enabled.
 67
 68        Raises:
 69
 70        - IndexError: If there are no events to process in the queue.
 71
 72        Returns:
 73
 74        - None
 75        """
 76        if not self.__queue__:
 77            raise IndexError("No events to process in the queue")
 78        self.__current_time__, event_id = heappop(self.__queue__)
 79        event_id, func, args, kwargs = self.__event_dict__.pop(event_id)
 80        func(*args, **kwargs)
 81
 82        if self.__log_events__:
 83            self.__log__.append(
 84                {
 85                    "event_id": event_id,
 86                    "time": self.__current_time__,
 87                    "func": func.__name__,
 88                    "args": args,
 89                    "kwargs": kwargs,
 90                }
 91            )
 92
 93    def run(self, max_time: float) -> None:
 94        """
 95        Runs the event queue, processing events in chronological order until the specified maximum time is reached.
 96
 97        Once the simulation is complete, the current time is set to max_time to ensure future events are scheduled correctly.
 98
 99        Required Arguments:
100
101        - max_time (float): The maximum simulation time at which to stop processing events.
102
103        Raises:
104
105        - ValueError: If max_time is less than the current simulation time.
106
107        Returns:
108
109        - None
110        """
111        if max_time < self.__current_time__:
112            raise ValueError(
113                "max_time cannot be less than the current simulation time"
114            )
115        while self.__queue__ and self.__queue__[0][0] <= max_time:
116            self.process()
117        self.__current_time__ = max_time
class Queue:
  6class Queue:
  7    def __init__(self, log_events: bool = False, precision: int = 4):
  8        """
  9        Initializes a priority queue for managing simulation events.
 10
 11        Optional Arguments:
 12
 13        - log_events (bool): If True, logs each event processed for debugging or analysis. Default is False.
 14        - precision (int): The number of decimal places to round time values to, ensuring numerical stability.
 15            - Default: 4
 16        """
 17        self.__queue__ = []
 18        self.__current_time__ = 0.0
 19        self.__log__ = []
 20        self.__event_dict__ = {}
 21        self.__event_id__ = 0
 22        self.__log_events__ = log_events
 23        self.__precision__ = precision
 24
 25    def add(
 26        self,
 27        time_delta: float,
 28        func,
 29        args: tuple = None,
 30        kwargs: dict = None,
 31    ) -> None:
 32        """
 33        Schedules a new event in the queue to be executed after a specified time delta.
 34
 35        Required Arguments:
 36
 37        - time_delta (float): The time delay after which the event should be executed. Must be non-negative.
 38        - func (callable): The function to be called when the event is processed.
 39
 40        Optional Arguments:
 41
 42        - args (tuple): Positional arguments to pass to the function when called.
 43            - Default: tuple()
 44        - kwargs (dict): Keyword arguments to pass to the function when called.
 45            - Default: dict()
 46        """
 47        args = args if args is not None else tuple()
 48        kwargs = kwargs if kwargs is not None else dict()
 49        if time_delta < 0:
 50            raise ValueError("Cannot schedule events in the past")
 51        self.__event_id__ += 1
 52        self.__event_dict__[self.__event_id__] = (
 53            self.__event_id__,
 54            func,
 55            args,
 56            kwargs,
 57        )
 58        next_time = hard_round(
 59            self.__current_time__ + time_delta, self.__precision__
 60        )
 61        heappush(self.__queue__, (next_time, self.__event_id__))
 62
 63    def process(self) -> None:
 64        """
 65        Processes the next event in the queue, updating the current time and executing the associated function.
 66
 67        Logs the event if logging is enabled.
 68
 69        Raises:
 70
 71        - IndexError: If there are no events to process in the queue.
 72
 73        Returns:
 74
 75        - None
 76        """
 77        if not self.__queue__:
 78            raise IndexError("No events to process in the queue")
 79        self.__current_time__, event_id = heappop(self.__queue__)
 80        event_id, func, args, kwargs = self.__event_dict__.pop(event_id)
 81        func(*args, **kwargs)
 82
 83        if self.__log_events__:
 84            self.__log__.append(
 85                {
 86                    "event_id": event_id,
 87                    "time": self.__current_time__,
 88                    "func": func.__name__,
 89                    "args": args,
 90                    "kwargs": kwargs,
 91                }
 92            )
 93
 94    def run(self, max_time: float) -> None:
 95        """
 96        Runs the event queue, processing events in chronological order until the specified maximum time is reached.
 97
 98        Once the simulation is complete, the current time is set to max_time to ensure future events are scheduled correctly.
 99
100        Required Arguments:
101
102        - max_time (float): The maximum simulation time at which to stop processing events.
103
104        Raises:
105
106        - ValueError: If max_time is less than the current simulation time.
107
108        Returns:
109
110        - None
111        """
112        if max_time < self.__current_time__:
113            raise ValueError(
114                "max_time cannot be less than the current simulation time"
115            )
116        while self.__queue__ and self.__queue__[0][0] <= max_time:
117            self.process()
118        self.__current_time__ = max_time
Queue(log_events: bool = False, precision: int = 4)
 7    def __init__(self, log_events: bool = False, precision: int = 4):
 8        """
 9        Initializes a priority queue for managing simulation events.
10
11        Optional Arguments:
12
13        - log_events (bool): If True, logs each event processed for debugging or analysis. Default is False.
14        - precision (int): The number of decimal places to round time values to, ensuring numerical stability.
15            - Default: 4
16        """
17        self.__queue__ = []
18        self.__current_time__ = 0.0
19        self.__log__ = []
20        self.__event_dict__ = {}
21        self.__event_id__ = 0
22        self.__log_events__ = log_events
23        self.__precision__ = precision

Initializes a priority queue for managing simulation events.

Optional Arguments:

  • log_events (bool): If True, logs each event processed for debugging or analysis. Default is False.
  • precision (int): The number of decimal places to round time values to, ensuring numerical stability.
    • Default: 4
def add( self, time_delta: float, func, args: tuple = None, kwargs: dict = None) -> None:
25    def add(
26        self,
27        time_delta: float,
28        func,
29        args: tuple = None,
30        kwargs: dict = None,
31    ) -> None:
32        """
33        Schedules a new event in the queue to be executed after a specified time delta.
34
35        Required Arguments:
36
37        - time_delta (float): The time delay after which the event should be executed. Must be non-negative.
38        - func (callable): The function to be called when the event is processed.
39
40        Optional Arguments:
41
42        - args (tuple): Positional arguments to pass to the function when called.
43            - Default: tuple()
44        - kwargs (dict): Keyword arguments to pass to the function when called.
45            - Default: dict()
46        """
47        args = args if args is not None else tuple()
48        kwargs = kwargs if kwargs is not None else dict()
49        if time_delta < 0:
50            raise ValueError("Cannot schedule events in the past")
51        self.__event_id__ += 1
52        self.__event_dict__[self.__event_id__] = (
53            self.__event_id__,
54            func,
55            args,
56            kwargs,
57        )
58        next_time = hard_round(
59            self.__current_time__ + time_delta, self.__precision__
60        )
61        heappush(self.__queue__, (next_time, self.__event_id__))

Schedules a new event in the queue to be executed after a specified time delta.

Required Arguments:

  • time_delta (float): The time delay after which the event should be executed. Must be non-negative.
  • func (callable): The function to be called when the event is processed.

Optional Arguments:

  • args (tuple): Positional arguments to pass to the function when called.
    • Default: tuple()
  • kwargs (dict): Keyword arguments to pass to the function when called.
    • Default: dict()
def process(self) -> None:
63    def process(self) -> None:
64        """
65        Processes the next event in the queue, updating the current time and executing the associated function.
66
67        Logs the event if logging is enabled.
68
69        Raises:
70
71        - IndexError: If there are no events to process in the queue.
72
73        Returns:
74
75        - None
76        """
77        if not self.__queue__:
78            raise IndexError("No events to process in the queue")
79        self.__current_time__, event_id = heappop(self.__queue__)
80        event_id, func, args, kwargs = self.__event_dict__.pop(event_id)
81        func(*args, **kwargs)
82
83        if self.__log_events__:
84            self.__log__.append(
85                {
86                    "event_id": event_id,
87                    "time": self.__current_time__,
88                    "func": func.__name__,
89                    "args": args,
90                    "kwargs": kwargs,
91                }
92            )

Processes the next event in the queue, updating the current time and executing the associated function.

Logs the event if logging is enabled.

Raises:

  • IndexError: If there are no events to process in the queue.

Returns:

  • None
def run(self, max_time: float) -> None:
 94    def run(self, max_time: float) -> None:
 95        """
 96        Runs the event queue, processing events in chronological order until the specified maximum time is reached.
 97
 98        Once the simulation is complete, the current time is set to max_time to ensure future events are scheduled correctly.
 99
100        Required Arguments:
101
102        - max_time (float): The maximum simulation time at which to stop processing events.
103
104        Raises:
105
106        - ValueError: If max_time is less than the current simulation time.
107
108        Returns:
109
110        - None
111        """
112        if max_time < self.__current_time__:
113            raise ValueError(
114                "max_time cannot be less than the current simulation time"
115            )
116        while self.__queue__ and self.__queue__[0][0] <= max_time:
117            self.process()
118        self.__current_time__ = max_time

Runs the event queue, processing events in chronological order until the specified maximum time is reached.

Once the simulation is complete, the current time is set to max_time to ensure future events are scheduled correctly.

Required Arguments:

  • max_time (float): The maximum simulation time at which to stop processing events.

Raises:

  • ValueError: If max_time is less than the current simulation time.

Returns:

  • None