Loading raps/engine.py +1 −10 Original line number Diff line number Diff line Loading @@ -195,19 +195,9 @@ class Engine: self.add_job(job) for timestep in range(timesteps): #while self.current_time >= last_submit_time and jobs: #job = self.queue.pop(0) #self.scheduler.schedule([job], self.running, self.available_nodes, self.current_time) self.scheduler.schedule(self.queue, self.running, self.available_nodes, self.current_time) #if jobs: # last_submit_time = job.submit_time #else: # last_submit_time = float('inf') # Avoid infinite loop yield self.tick() # Stop the simulation if no more jobs are running or in the queue if not self.queue and not self.running and not self.replay: print(f"[DEBUG] {self.config['system_name']} - Stopping simulation at time {self.current_time}") Loading @@ -215,6 +205,7 @@ class Engine: if self.debug and timestep % self.config['UI_UPDATE_FREQ'] == 0: print(".", end="", flush=True) yield self.tick() def get_stats(self): """ Return output statistics """ Loading raps/schedulers/default.py +9 −43 Original line number Diff line number Diff line Loading @@ -65,53 +65,19 @@ class Scheduler: queue.remove(job) # Remove the job from the queue if debug: scheduled_nodes = summarize_ranges(job.scheduled_nodes) print(f"t={current_time}: Scheduled job with wall time {job.wall_time} on nodes {scheduled_nodes}") print(f"t={current_time}: Scheduled job {job.id} with wall time {job.wall_time} on nodes {scheduled_nodes}") else: # Optionally, if you have a BACKFILL policy, you can attempt that here. # Otherwise, just leave the job in the queue. continue def schedule2(self, queue, running, available_nodes, current_time, debug=False): """Schedules jobs from the queue to available nodes.""" queue[:] = self.sort_jobs(queue) # Ensure queue is sorted before scheduling while queue: # Try scheduling the first job in the queue job = queue.pop(0) synthetic_bool = len(available_nodes) >= job.nodes_required telemetry_bool = job.requested_nodes and set(job.requested_nodes).issubset(set(available_nodes)) if synthetic_bool or telemetry_bool: # Schedule job self.assign_nodes_to_job(job, available_nodes, current_time) running.append(job) #self.history.append(dict(id=job.id, time=current_time, nodes=job.nodes_required, wall_time=job.wall_time)) if debug: scheduled_nodes = summarize_ranges(job.scheduled_nodes) print(f"t={current_time}: Scheduled job with wall time", f"{job.wall_time} on nodes {scheduled_nodes}") else: # If the job cannot be scheduled, either try backfilling or requeue it if queue and self.policy == PolicyType.BACKFILL: queue.insert(0, job) # Optionally, if you have a BACKFILL policy, attempt backfilling here. if self.policy == PolicyType.BACKFILL: # Try to find a backfill candidate from the entire queue. backfill_job = self.find_backfill_job(queue, len(available_nodes), current_time) if backfill_job: self.assign_nodes_to_job(backfill_job, available_nodes, current_time) self.queue.remove(backfill_job) if self.debug: running.append(backfill_job) queue.remove(backfill_job) if debug: scheduled_nodes = summarize_ranges(backfill_job.scheduled_nodes) print(f"t={self.current_time}: Backfilling job {backfill_job.id} with wall time", f"{backfill_job.wall_time} on nodes {scheduled_nodes}") else: queue.append(job) # Note, this should be fixed. It shouldn't go to the end of the queue. break print(f"t={current_time}: Backfilling job {backfill_job.id} with wall time {backfill_job.wall_time} on nodes {scheduled_nodes}") def find_backfill_job(self, queue, num_free_nodes, current_time): Loading Loading
raps/engine.py +1 −10 Original line number Diff line number Diff line Loading @@ -195,19 +195,9 @@ class Engine: self.add_job(job) for timestep in range(timesteps): #while self.current_time >= last_submit_time and jobs: #job = self.queue.pop(0) #self.scheduler.schedule([job], self.running, self.available_nodes, self.current_time) self.scheduler.schedule(self.queue, self.running, self.available_nodes, self.current_time) #if jobs: # last_submit_time = job.submit_time #else: # last_submit_time = float('inf') # Avoid infinite loop yield self.tick() # Stop the simulation if no more jobs are running or in the queue if not self.queue and not self.running and not self.replay: print(f"[DEBUG] {self.config['system_name']} - Stopping simulation at time {self.current_time}") Loading @@ -215,6 +205,7 @@ class Engine: if self.debug and timestep % self.config['UI_UPDATE_FREQ'] == 0: print(".", end="", flush=True) yield self.tick() def get_stats(self): """ Return output statistics """ Loading
raps/schedulers/default.py +9 −43 Original line number Diff line number Diff line Loading @@ -65,53 +65,19 @@ class Scheduler: queue.remove(job) # Remove the job from the queue if debug: scheduled_nodes = summarize_ranges(job.scheduled_nodes) print(f"t={current_time}: Scheduled job with wall time {job.wall_time} on nodes {scheduled_nodes}") print(f"t={current_time}: Scheduled job {job.id} with wall time {job.wall_time} on nodes {scheduled_nodes}") else: # Optionally, if you have a BACKFILL policy, you can attempt that here. # Otherwise, just leave the job in the queue. continue def schedule2(self, queue, running, available_nodes, current_time, debug=False): """Schedules jobs from the queue to available nodes.""" queue[:] = self.sort_jobs(queue) # Ensure queue is sorted before scheduling while queue: # Try scheduling the first job in the queue job = queue.pop(0) synthetic_bool = len(available_nodes) >= job.nodes_required telemetry_bool = job.requested_nodes and set(job.requested_nodes).issubset(set(available_nodes)) if synthetic_bool or telemetry_bool: # Schedule job self.assign_nodes_to_job(job, available_nodes, current_time) running.append(job) #self.history.append(dict(id=job.id, time=current_time, nodes=job.nodes_required, wall_time=job.wall_time)) if debug: scheduled_nodes = summarize_ranges(job.scheduled_nodes) print(f"t={current_time}: Scheduled job with wall time", f"{job.wall_time} on nodes {scheduled_nodes}") else: # If the job cannot be scheduled, either try backfilling or requeue it if queue and self.policy == PolicyType.BACKFILL: queue.insert(0, job) # Optionally, if you have a BACKFILL policy, attempt backfilling here. if self.policy == PolicyType.BACKFILL: # Try to find a backfill candidate from the entire queue. backfill_job = self.find_backfill_job(queue, len(available_nodes), current_time) if backfill_job: self.assign_nodes_to_job(backfill_job, available_nodes, current_time) self.queue.remove(backfill_job) if self.debug: running.append(backfill_job) queue.remove(backfill_job) if debug: scheduled_nodes = summarize_ranges(backfill_job.scheduled_nodes) print(f"t={self.current_time}: Backfilling job {backfill_job.id} with wall time", f"{backfill_job.wall_time} on nodes {scheduled_nodes}") else: queue.append(job) # Note, this should be fixed. It shouldn't go to the end of the queue. break print(f"t={current_time}: Backfilling job {backfill_job.id} with wall time {backfill_job.wall_time} on nodes {scheduled_nodes}") def find_backfill_job(self, queue, num_free_nodes, current_time): Loading