Loading raps/engine.py +11 −9 Original line number Diff line number Diff line Loading @@ -103,6 +103,9 @@ class Engine: self.jobs_completed += 1 job_stats = job.statistics() self.accounts.update_account_statistics(job_stats) # Free nodes and ensure there are no duplicates self.available_nodes.extend(job.scheduled_nodes) self.available_nodes = sorted(set(self.available_nodes)) # Ask scheduler to schedule any jobs waiting in queue self.scheduler.schedule(self.queue, self.running, self.available_nodes, self.current_time) Loading Loading @@ -172,7 +175,6 @@ class Engine: return tick_data def get_utilization(self, trace, time_quanta_index): """Retrieve utilization value for a given trace at a specific time quanta index.""" if isinstance(trace, (list, np.ndarray)): Loading @@ -193,16 +195,16 @@ class Engine: self.add_job(job) for timestep in range(timesteps): while self.current_time >= last_submit_time and jobs: #while self.current_time >= last_submit_time and jobs: job = jobs.pop(0) job = Job(job_info, self.current_time) self.scheduler.schedule([job], self.running, self.available_nodes, self.current_time) #job = self.queue.pop(0) #self.scheduler.schedule([job], self.running, self.available_nodes, self.current_time) self.scheduler.schedule(self.queue, self.running, self.available_nodes, self.current_time) if jobs: last_submit_time = job.submit_time else: last_submit_time = float('inf') # Avoid infinite loop #if jobs: # last_submit_time = job.submit_time #else: # last_submit_time = float('inf') # Avoid infinite loop yield self.tick() Loading raps/schedulers/default.py +20 −15 Original line number Diff line number Diff line Loading @@ -50,27 +50,32 @@ class Scheduler: job.state = JobState.RUNNING # Job is now running def schedule(self, job_list, running, available_nodes, current_time): """Schedules jobs from the given job_list directly, modifying available_nodes.""" def schedule(self, queue, running, available_nodes, current_time, debug=False): # Sort the queue in place. queue[:] = self.sort_jobs(queue) while job_list: job = job_list.pop(0) # Iterate over a copy of the queue since we might remove items for job in queue[:]: synthetic_bool = len(available_nodes) >= job.nodes_required telemetry_bool = job.requested_nodes and set(job.requested_nodes).issubset(set(available_nodes)) if len(available_nodes) >= job.nodes_required: job.scheduled_nodes = available_nodes[:job.nodes_required] available_nodes[:] = available_nodes[job.nodes_required:] job.start_time = current_time job.end_time = current_time + job.wall_time job.state = JobState.RUNNING if synthetic_bool or telemetry_bool: self.assign_nodes_to_job(job, available_nodes, current_time) running.append(job) queue.remove(job) # Remove the job from the queue if debug: scheduled_nodes = summarize_ranges(job.scheduled_nodes) print(f"t={current_time}: Scheduled job with wall time {job.wall_time} on nodes {scheduled_nodes}") else: job_list.insert(0, job) # Put job back at the front if it can't be scheduled break # Stop scheduling if no nodes are available # Optionally, if you have a BACKFILL policy, you can attempt that here. # Otherwise, just leave the job in the queue. continue def schedule2(self, queue, running, available_nodes, current_time, debug=False): """Schedules jobs from the queue to available nodes.""" queue = self.sort_jobs(queue) # Ensure queue is sorted before scheduling queue[:] = self.sort_jobs(queue) # Ensure queue is sorted before scheduling while queue: Loading @@ -97,7 +102,7 @@ class Scheduler: queue.insert(0, job) backfill_job = self.find_backfill_job(queue, len(available_nodes), current_time) if backfill_job: self.assign_nodes_to_job(backfill_job) self.assign_nodes_to_job(backfill_job, available_nodes, current_time) self.queue.remove(backfill_job) if self.debug: scheduled_nodes = summarize_ranges(backfill_job.scheduled_nodes) Loading Loading
raps/engine.py +11 −9 Original line number Diff line number Diff line Loading @@ -103,6 +103,9 @@ class Engine: self.jobs_completed += 1 job_stats = job.statistics() self.accounts.update_account_statistics(job_stats) # Free nodes and ensure there are no duplicates self.available_nodes.extend(job.scheduled_nodes) self.available_nodes = sorted(set(self.available_nodes)) # Ask scheduler to schedule any jobs waiting in queue self.scheduler.schedule(self.queue, self.running, self.available_nodes, self.current_time) Loading Loading @@ -172,7 +175,6 @@ class Engine: return tick_data def get_utilization(self, trace, time_quanta_index): """Retrieve utilization value for a given trace at a specific time quanta index.""" if isinstance(trace, (list, np.ndarray)): Loading @@ -193,16 +195,16 @@ class Engine: self.add_job(job) for timestep in range(timesteps): while self.current_time >= last_submit_time and jobs: #while self.current_time >= last_submit_time and jobs: job = jobs.pop(0) job = Job(job_info, self.current_time) self.scheduler.schedule([job], self.running, self.available_nodes, self.current_time) #job = self.queue.pop(0) #self.scheduler.schedule([job], self.running, self.available_nodes, self.current_time) self.scheduler.schedule(self.queue, self.running, self.available_nodes, self.current_time) if jobs: last_submit_time = job.submit_time else: last_submit_time = float('inf') # Avoid infinite loop #if jobs: # last_submit_time = job.submit_time #else: # last_submit_time = float('inf') # Avoid infinite loop yield self.tick() Loading
raps/schedulers/default.py +20 −15 Original line number Diff line number Diff line Loading @@ -50,27 +50,32 @@ class Scheduler: job.state = JobState.RUNNING # Job is now running def schedule(self, job_list, running, available_nodes, current_time): """Schedules jobs from the given job_list directly, modifying available_nodes.""" def schedule(self, queue, running, available_nodes, current_time, debug=False): # Sort the queue in place. queue[:] = self.sort_jobs(queue) while job_list: job = job_list.pop(0) # Iterate over a copy of the queue since we might remove items for job in queue[:]: synthetic_bool = len(available_nodes) >= job.nodes_required telemetry_bool = job.requested_nodes and set(job.requested_nodes).issubset(set(available_nodes)) if len(available_nodes) >= job.nodes_required: job.scheduled_nodes = available_nodes[:job.nodes_required] available_nodes[:] = available_nodes[job.nodes_required:] job.start_time = current_time job.end_time = current_time + job.wall_time job.state = JobState.RUNNING if synthetic_bool or telemetry_bool: self.assign_nodes_to_job(job, available_nodes, current_time) running.append(job) queue.remove(job) # Remove the job from the queue if debug: scheduled_nodes = summarize_ranges(job.scheduled_nodes) print(f"t={current_time}: Scheduled job with wall time {job.wall_time} on nodes {scheduled_nodes}") else: job_list.insert(0, job) # Put job back at the front if it can't be scheduled break # Stop scheduling if no nodes are available # Optionally, if you have a BACKFILL policy, you can attempt that here. # Otherwise, just leave the job in the queue. continue def schedule2(self, queue, running, available_nodes, current_time, debug=False): """Schedules jobs from the queue to available nodes.""" queue = self.sort_jobs(queue) # Ensure queue is sorted before scheduling queue[:] = self.sort_jobs(queue) # Ensure queue is sorted before scheduling while queue: Loading @@ -97,7 +102,7 @@ class Scheduler: queue.insert(0, job) backfill_job = self.find_backfill_job(queue, len(available_nodes), current_time) if backfill_job: self.assign_nodes_to_job(backfill_job) self.assign_nodes_to_job(backfill_job, available_nodes, current_time) self.queue.remove(backfill_job) if self.debug: scheduled_nodes = summarize_ranges(backfill_job.scheduled_nodes) Loading