Loading raps/engine.py +15 −9 Original line number Diff line number Diff line Loading @@ -194,28 +194,34 @@ class Engine: raise TypeError(f"Invalid type for utilization: {type(trace)}.") def run_simulation(self, jobs, timesteps): """ Generator that yields after each simulation tick """ last_submit_time = 0 def run_simulation(self, jobs, timesteps, autoshutdown=False): """Generator that yields after each simulation tick.""" self.timesteps = timesteps for job_info in jobs: job = Job(job_info, self.current_time) self.add_job(job) # Sort pending jobs by submit_time. jobs_to_submit = sorted(jobs, key=lambda j: j['submit_time']) for timestep in range(timesteps): # Submit jobs whose submit_time is <= current_time while jobs_to_submit and jobs_to_submit[0]['submit_time'] <= self.current_time: job_info = jobs_to_submit.pop(0) job = Job(job_info, self.current_time) self.add_job(job) # Schedule jobs that are now in the queue. self.scheduler.schedule(self.queue, self.running, self.current_time) # Stop the simulation if no more jobs are running or in the queue if not self.queue and not self.running and not self.replay: # Stop the simulation if no more jobs are running or in the queue. if autoshutdown and not self.queue and not self.running and not self.replay: print(f"[DEBUG] {self.config['system_name']} - Stopping simulation at time {self.current_time}") break if self.debug and timestep % self.config['UI_UPDATE_FREQ'] == 0: print(".", end="", flush=True) yield self.tick() def get_stats(self): """ Return output statistics """ sum_values = lambda values: sum(x[1] for x in values) if values else 0 Loading raps/schedulers/default.py +12 −3 Original line number Diff line number Diff line Loading @@ -42,9 +42,18 @@ class Scheduler: # Iterate over a copy of the queue since we might remove items for job in queue[:]: # Check if the resource manager has enough nodes. if len(self.resource_manager.available_nodes) >= job.nodes_required: # Use ResourceManager to assign nodes. # For synthetic jobs the number of requested nodes is given. # Make sure the available nodes count meets job.nodes_required. synthetic_bool = len(self.resource_manager.available_nodes) >= job.nodes_required # For telemetry replay jobs a list of requested nodes is provided. # Make sure the requested nodes are available. telemetry_bool = False if job.requested_nodes: telemetry_bool = set(job.requested_nodes).issubset(set(self.resource_manager.available_nodes)) if synthetic_bool or telemetry_bool: self.resource_manager.assign_nodes_to_job(job, current_time) running.append(job) queue.remove(job) Loading Loading
raps/engine.py +15 −9 Original line number Diff line number Diff line Loading @@ -194,28 +194,34 @@ class Engine: raise TypeError(f"Invalid type for utilization: {type(trace)}.") def run_simulation(self, jobs, timesteps): """ Generator that yields after each simulation tick """ last_submit_time = 0 def run_simulation(self, jobs, timesteps, autoshutdown=False): """Generator that yields after each simulation tick.""" self.timesteps = timesteps for job_info in jobs: job = Job(job_info, self.current_time) self.add_job(job) # Sort pending jobs by submit_time. jobs_to_submit = sorted(jobs, key=lambda j: j['submit_time']) for timestep in range(timesteps): # Submit jobs whose submit_time is <= current_time while jobs_to_submit and jobs_to_submit[0]['submit_time'] <= self.current_time: job_info = jobs_to_submit.pop(0) job = Job(job_info, self.current_time) self.add_job(job) # Schedule jobs that are now in the queue. self.scheduler.schedule(self.queue, self.running, self.current_time) # Stop the simulation if no more jobs are running or in the queue if not self.queue and not self.running and not self.replay: # Stop the simulation if no more jobs are running or in the queue. if autoshutdown and not self.queue and not self.running and not self.replay: print(f"[DEBUG] {self.config['system_name']} - Stopping simulation at time {self.current_time}") break if self.debug and timestep % self.config['UI_UPDATE_FREQ'] == 0: print(".", end="", flush=True) yield self.tick() def get_stats(self): """ Return output statistics """ sum_values = lambda values: sum(x[1] for x in values) if values else 0 Loading
raps/schedulers/default.py +12 −3 Original line number Diff line number Diff line Loading @@ -42,9 +42,18 @@ class Scheduler: # Iterate over a copy of the queue since we might remove items for job in queue[:]: # Check if the resource manager has enough nodes. if len(self.resource_manager.available_nodes) >= job.nodes_required: # Use ResourceManager to assign nodes. # For synthetic jobs the number of requested nodes is given. # Make sure the available nodes count meets job.nodes_required. synthetic_bool = len(self.resource_manager.available_nodes) >= job.nodes_required # For telemetry replay jobs a list of requested nodes is provided. # Make sure the requested nodes are available. telemetry_bool = False if job.requested_nodes: telemetry_bool = set(job.requested_nodes).issubset(set(self.resource_manager.available_nodes)) if synthetic_bool or telemetry_bool: self.resource_manager.assign_nodes_to_job(job, current_time) running.append(job) queue.remove(job) Loading