scar_sim.queue
1from heapq import heappop, heappush 2from scar_sim.utils import hard_round 3 4 5class Queue: 6 def __init__(self, log_events: bool = False, precision: int = 4): 7 """ 8 Initializes a priority queue for managing simulation events. 9 10 Optional Arguments: 11 12 - log_events (bool): If True, logs each event processed for debugging or analysis. Default is False. 13 - precision (int): The number of decimal places to round time values to, ensuring numerical stability. 14 - Default: 4 15 """ 16 self.__queue__ = [] 17 self.__current_time__ = 0.0 18 self.__log__ = [] 19 self.__event_dict__ = {} 20 self.__event_id__ = 0 21 self.__log_events__ = log_events 22 self.__precision__ = precision 23 24 def add( 25 self, 26 time_delta: float, 27 func, 28 args: tuple = None, 29 kwargs: dict = None, 30 ) -> None: 31 """ 32 Schedules a new event in the queue to be executed after a specified time delta. 33 34 Required Arguments: 35 36 - time_delta (float): The time delay after which the event should be executed. Must be non-negative. 37 - func (callable): The function to be called when the event is processed. 38 39 Optional Arguments: 40 41 - args (tuple): Positional arguments to pass to the function when called. 42 - Default: tuple() 43 - kwargs (dict): Keyword arguments to pass to the function when called. 44 - Default: dict() 45 """ 46 args = args if args is not None else tuple() 47 kwargs = kwargs if kwargs is not None else dict() 48 if time_delta < 0: 49 raise ValueError("Cannot schedule events in the past") 50 self.__event_id__ += 1 51 self.__event_dict__[self.__event_id__] = ( 52 self.__event_id__, 53 func, 54 args, 55 kwargs, 56 ) 57 next_time = hard_round( 58 self.__current_time__ + time_delta, self.__precision__ 59 ) 60 heappush(self.__queue__, (next_time, self.__event_id__)) 61 62 def process(self) -> None: 63 """ 64 Processes the next event in the queue, updating the current time and executing the associated function. 65 66 Logs the event if logging is enabled. 67 68 Raises: 69 70 - IndexError: If there are no events to process in the queue. 71 72 Returns: 73 74 - None 75 """ 76 if not self.__queue__: 77 raise IndexError("No events to process in the queue") 78 self.__current_time__, event_id = heappop(self.__queue__) 79 event_id, func, args, kwargs = self.__event_dict__.pop(event_id) 80 func(*args, **kwargs) 81 82 if self.__log_events__: 83 self.__log__.append( 84 { 85 "event_id": event_id, 86 "time": self.__current_time__, 87 "func": func.__name__, 88 "args": args, 89 "kwargs": kwargs, 90 } 91 ) 92 93 def run(self, max_time: float) -> None: 94 """ 95 Runs the event queue, processing events in chronological order until the specified maximum time is reached. 96 97 Once the simulation is complete, the current time is set to max_time to ensure future events are scheduled correctly. 98 99 Required Arguments: 100 101 - max_time (float): The maximum simulation time at which to stop processing events. 102 103 Raises: 104 105 - ValueError: If max_time is less than the current simulation time. 106 107 Returns: 108 109 - None 110 """ 111 if max_time < self.__current_time__: 112 raise ValueError( 113 "max_time cannot be less than the current simulation time" 114 ) 115 while self.__queue__ and self.__queue__[0][0] <= max_time: 116 self.process() 117 self.__current_time__ = max_time
class
Queue:
6class Queue: 7 def __init__(self, log_events: bool = False, precision: int = 4): 8 """ 9 Initializes a priority queue for managing simulation events. 10 11 Optional Arguments: 12 13 - log_events (bool): If True, logs each event processed for debugging or analysis. Default is False. 14 - precision (int): The number of decimal places to round time values to, ensuring numerical stability. 15 - Default: 4 16 """ 17 self.__queue__ = [] 18 self.__current_time__ = 0.0 19 self.__log__ = [] 20 self.__event_dict__ = {} 21 self.__event_id__ = 0 22 self.__log_events__ = log_events 23 self.__precision__ = precision 24 25 def add( 26 self, 27 time_delta: float, 28 func, 29 args: tuple = None, 30 kwargs: dict = None, 31 ) -> None: 32 """ 33 Schedules a new event in the queue to be executed after a specified time delta. 34 35 Required Arguments: 36 37 - time_delta (float): The time delay after which the event should be executed. Must be non-negative. 38 - func (callable): The function to be called when the event is processed. 39 40 Optional Arguments: 41 42 - args (tuple): Positional arguments to pass to the function when called. 43 - Default: tuple() 44 - kwargs (dict): Keyword arguments to pass to the function when called. 45 - Default: dict() 46 """ 47 args = args if args is not None else tuple() 48 kwargs = kwargs if kwargs is not None else dict() 49 if time_delta < 0: 50 raise ValueError("Cannot schedule events in the past") 51 self.__event_id__ += 1 52 self.__event_dict__[self.__event_id__] = ( 53 self.__event_id__, 54 func, 55 args, 56 kwargs, 57 ) 58 next_time = hard_round( 59 self.__current_time__ + time_delta, self.__precision__ 60 ) 61 heappush(self.__queue__, (next_time, self.__event_id__)) 62 63 def process(self) -> None: 64 """ 65 Processes the next event in the queue, updating the current time and executing the associated function. 66 67 Logs the event if logging is enabled. 68 69 Raises: 70 71 - IndexError: If there are no events to process in the queue. 72 73 Returns: 74 75 - None 76 """ 77 if not self.__queue__: 78 raise IndexError("No events to process in the queue") 79 self.__current_time__, event_id = heappop(self.__queue__) 80 event_id, func, args, kwargs = self.__event_dict__.pop(event_id) 81 func(*args, **kwargs) 82 83 if self.__log_events__: 84 self.__log__.append( 85 { 86 "event_id": event_id, 87 "time": self.__current_time__, 88 "func": func.__name__, 89 "args": args, 90 "kwargs": kwargs, 91 } 92 ) 93 94 def run(self, max_time: float) -> None: 95 """ 96 Runs the event queue, processing events in chronological order until the specified maximum time is reached. 97 98 Once the simulation is complete, the current time is set to max_time to ensure future events are scheduled correctly. 99 100 Required Arguments: 101 102 - max_time (float): The maximum simulation time at which to stop processing events. 103 104 Raises: 105 106 - ValueError: If max_time is less than the current simulation time. 107 108 Returns: 109 110 - None 111 """ 112 if max_time < self.__current_time__: 113 raise ValueError( 114 "max_time cannot be less than the current simulation time" 115 ) 116 while self.__queue__ and self.__queue__[0][0] <= max_time: 117 self.process() 118 self.__current_time__ = max_time
Queue(log_events: bool = False, precision: int = 4)
7 def __init__(self, log_events: bool = False, precision: int = 4): 8 """ 9 Initializes a priority queue for managing simulation events. 10 11 Optional Arguments: 12 13 - log_events (bool): If True, logs each event processed for debugging or analysis. Default is False. 14 - precision (int): The number of decimal places to round time values to, ensuring numerical stability. 15 - Default: 4 16 """ 17 self.__queue__ = [] 18 self.__current_time__ = 0.0 19 self.__log__ = [] 20 self.__event_dict__ = {} 21 self.__event_id__ = 0 22 self.__log_events__ = log_events 23 self.__precision__ = precision
Initializes a priority queue for managing simulation events.
Optional Arguments:
- log_events (bool): If True, logs each event processed for debugging or analysis. Default is False.
- precision (int): The number of decimal places to round time values to, ensuring numerical stability.
- Default: 4
def
add( self, time_delta: float, func, args: tuple = None, kwargs: dict = None) -> None:
25 def add( 26 self, 27 time_delta: float, 28 func, 29 args: tuple = None, 30 kwargs: dict = None, 31 ) -> None: 32 """ 33 Schedules a new event in the queue to be executed after a specified time delta. 34 35 Required Arguments: 36 37 - time_delta (float): The time delay after which the event should be executed. Must be non-negative. 38 - func (callable): The function to be called when the event is processed. 39 40 Optional Arguments: 41 42 - args (tuple): Positional arguments to pass to the function when called. 43 - Default: tuple() 44 - kwargs (dict): Keyword arguments to pass to the function when called. 45 - Default: dict() 46 """ 47 args = args if args is not None else tuple() 48 kwargs = kwargs if kwargs is not None else dict() 49 if time_delta < 0: 50 raise ValueError("Cannot schedule events in the past") 51 self.__event_id__ += 1 52 self.__event_dict__[self.__event_id__] = ( 53 self.__event_id__, 54 func, 55 args, 56 kwargs, 57 ) 58 next_time = hard_round( 59 self.__current_time__ + time_delta, self.__precision__ 60 ) 61 heappush(self.__queue__, (next_time, self.__event_id__))
Schedules a new event in the queue to be executed after a specified time delta.
Required Arguments:
- time_delta (float): The time delay after which the event should be executed. Must be non-negative.
- func (callable): The function to be called when the event is processed.
Optional Arguments:
- args (tuple): Positional arguments to pass to the function when called.
- Default: tuple()
- kwargs (dict): Keyword arguments to pass to the function when called.
- Default: dict()
def
process(self) -> None:
63 def process(self) -> None: 64 """ 65 Processes the next event in the queue, updating the current time and executing the associated function. 66 67 Logs the event if logging is enabled. 68 69 Raises: 70 71 - IndexError: If there are no events to process in the queue. 72 73 Returns: 74 75 - None 76 """ 77 if not self.__queue__: 78 raise IndexError("No events to process in the queue") 79 self.__current_time__, event_id = heappop(self.__queue__) 80 event_id, func, args, kwargs = self.__event_dict__.pop(event_id) 81 func(*args, **kwargs) 82 83 if self.__log_events__: 84 self.__log__.append( 85 { 86 "event_id": event_id, 87 "time": self.__current_time__, 88 "func": func.__name__, 89 "args": args, 90 "kwargs": kwargs, 91 } 92 )
Processes the next event in the queue, updating the current time and executing the associated function.
Logs the event if logging is enabled.
Raises:
- IndexError: If there are no events to process in the queue.
Returns:
- None
def
run(self, max_time: float) -> None:
94 def run(self, max_time: float) -> None: 95 """ 96 Runs the event queue, processing events in chronological order until the specified maximum time is reached. 97 98 Once the simulation is complete, the current time is set to max_time to ensure future events are scheduled correctly. 99 100 Required Arguments: 101 102 - max_time (float): The maximum simulation time at which to stop processing events. 103 104 Raises: 105 106 - ValueError: If max_time is less than the current simulation time. 107 108 Returns: 109 110 - None 111 """ 112 if max_time < self.__current_time__: 113 raise ValueError( 114 "max_time cannot be less than the current simulation time" 115 ) 116 while self.__queue__ and self.__queue__[0][0] <= max_time: 117 self.process() 118 self.__current_time__ = max_time
Runs the event queue, processing events in chronological order until the specified maximum time is reached.
Once the simulation is complete, the current time is set to max_time to ensure future events are scheduled correctly.
Required Arguments:
- max_time (float): The maximum simulation time at which to stop processing events.
Raises:
- ValueError: If max_time is less than the current simulation time.
Returns:
- None