Loading main.py +1 −1 Original line number Diff line number Diff line Loading @@ -42,7 +42,7 @@ parser.add_argument('-p', '--plot', nargs='+', choices=['power', 'loss', 'pue', choices = ['png', 'svg', 'jpg', 'pdf', 'eps'] parser.add_argument('--imtype', type=str, choices=choices, default=choices[0], help='Plot image type') parser.add_argument('--system', type=str, default='frontier', help='System config to use') choices = ['fcfs', 'sjf', 'prq'] choices = ['fcfs', 'sjf', 'prq', 'backfill'] parser.add_argument('-s', '--schedule', type=str, choices=choices, default=choices[0], help='Schedule policy to use') choices = ['random', 'benchmark', 'peak', 'idle'] parser.add_argument('-w', '--workload', type=str, choices=choices, default=choices[0], help='Type of synthetic workload') Loading raps/policy.py 0 → 100644 +40 −0 Original line number Diff line number Diff line def find_backfill_job(queue, num_free_nodes, current_time): """ This implementation is based on pseudocode from Leonenkov and Zhumatiy. "Introducing new backfill-based scheduler for slurm resource manager." Procedia computer science 66 (2015): 661-669. """ # Compute shadow time first_job = queue[0] for job in queue: job.end_time = current_time + job.wall_time # Sort jobs according to their termination time (end_time) sorted_queue = sorted(queue, key=lambda job: job.end_time) # Loop over the list and collect nodes until the number of available nodes # is sufficient for the first job in the queue sum_nodes = 0 shadow_time = None for job in sorted_queue: sum_nodes += job.nodes_required if sum_nodes >= first_job.nodes_required: shadow_time = current_time + job.wall_time num_extra_nodes = sum_nodes - job.nodes_required break # Find backfill job backfill_job = None for job in queue: # condition1 checks that the job ends before first_job starts condition1 = job.nodes_required <= num_free_nodes \ and current_time + job.wall_time < shadow_time # condition2 checks that the job does not interfere with first_job condition2 = job.nodes_required <= min(num_free_nodes, num_extra_nodes) if condition1 or condition2: backfill_job = job break return backfill_job raps/scheduler.py +63 −30 Original line number Diff line number Diff line Loading @@ -48,6 +48,7 @@ import pandas as pd from .config import load_config_variables from .job import Job, JobState from .policy import find_backfill_job from .utils import summarize_ranges, expand_ranges load_config_variables([ Loading Loading @@ -136,11 +137,12 @@ class Scheduler: self.policy = kwargs.get('schedule') self.sys_util_history = [] def add_job(self, job): # add job to queue self.queue.append(job) # sort queue if self.policy == 'fcfs': if self.policy == 'fcfs' or self.policy == 'backfill': self.queue.sort(key=lambda job: job.submit_time) elif self.policy == 'sjf': self.queue.sort(key=lambda job: job.wall_time) Loading @@ -149,47 +151,77 @@ class Scheduler: else: raise ValueError(f"The scheduling policy {self.policy} is not supported.") def schedule(self, jobs): """Schedule jobs.""" for job_info in jobs: job = Job(job_info, self.current_time) self.add_job(job) while self.queue: def assign_nodes_to_job(self, job): """Helper function to assign nodes to a job and update available nodes.""" if len(self.available_nodes) < job.nodes_required: # If there are not enough nodes, return or raise an error (handle as needed) raise ValueError(f"Not enough available nodes to schedule job {job.id}") job = self.queue.pop(0) synthetic_bool = len(self.available_nodes) >= job.nodes_required telemetry_bool = job.requested_nodes and job.requested_nodes[0] in self.available_nodes if synthetic_bool or telemetry_bool: if job.requested_nodes: if job.requested_nodes: # Telemetry replay # If the job has requested specific nodes, assign them job.scheduled_nodes = job.requested_nodes mask = ~np.isin(self.available_nodes, job.scheduled_nodes) self.available_nodes = np.array(self.available_nodes) self.available_nodes = self.available_nodes[mask] self.available_nodes = self.available_nodes.tolist() else: # Assign the nodes to this job and remove them from the available pool else: # Synthetic # Assign the nodes from available pool job.scheduled_nodes = self.available_nodes[:job.nodes_required] self.available_nodes = self.available_nodes[job.nodes_required:] # Set job start and end times job.start_time = self.current_time job.end_time = self.current_time + job.wall_time # Add the job to running jobs list # Mark the job as running job.state = JobState.RUNNING self.running.append(job) if job.id == 22: print('***', job.nodes_required, self.available_nodes, job.scheduled_nodes) def schedule(self, jobs): """Schedule jobs""" for job_info in jobs: job = Job(job_info, self.current_time) self.add_job(job) while self.queue: # Try scheduling the first job in the queue job = self.queue.pop(0) synthetic_bool = len(self.available_nodes) >= job.nodes_required telemetry_bool = job.requested_nodes and job.requested_nodes[0] in self.available_nodes if synthetic_bool or telemetry_bool: # Schedule job self.assign_nodes_to_job(job) if self.debug: scheduled_nodes = summarize_ranges(job.scheduled_nodes) print(f"t={self.current_time}: Scheduled job with wall time {job.wall_time} on nodes {scheduled_nodes}") print(f"t={self.current_time}: Scheduled job with wall time", f"{job.wall_time} on nodes {scheduled_nodes}") else: # If the job cannot be scheduled, either try backfilling or requeue it if self.queue and self.policy == "backfill": self.queue.insert(0, job) backfill_job = find_backfill_job(self.queue, len(self.available_nodes), self.current_time) if backfill_job: self.assign_nodes_to_job(backfill_job) self.queue.remove(backfill_job) if self.debug: scheduled_nodes = summarize_ranges(backfill_job.scheduled_nodes) print(backfill_job) print(f"t={self.current_time}: Backfilling job {backfill_job.id} with wall time", f"{job.wall_time} on nodes {scheduled_nodes}") else: self.queue.append(job) break def tick(self): """Simulate a timestep.""" completed_jobs = [job for job in self.running if job.end_time Loading Loading @@ -310,14 +342,15 @@ class Scheduler: # FMU inputs are N powers and the wetbulb temp fmu_inputs = self.cooling_model.generate_fmu_inputs(runtime_values, \ uncertainties=self.power_manager.uncertainties) cooling_inputs, datacenter_outputs, cep_outputs, pue = self.cooling_model.step(self.current_time, fmu_inputs, FMU_UPDATE_FREQ) cooling_inputs, datacenter_outputs, cep_outputs, pue = \ self.cooling_model.step(self.current_time, fmu_inputs, FMU_UPDATE_FREQ) # Get a dataframe of the power data power_df = self.power_manager.get_power_df(rack_power, rack_loss) if self.layout_manager: self.layout_manager.update_powertemp_array(power_df, datacenter_outputs, pue, pflops, gflop_per_watt,\ self.layout_manager.update_powertemp_array(power_df, \ datacenter_outputs, pue, pflops, gflop_per_watt, \ system_util, uncertainties=self.power_manager.uncertainties) self.layout_manager.update_pressflow_array(datacenter_outputs) Loading Loading @@ -378,10 +411,10 @@ class Scheduler: while self.current_time >= time_to_next_job and jobs: job = jobs.pop(0) self.schedule([job]) if jobs: time_to_next_job = job['submit_time'] # Update time to next job based on the next job's scheduled time else: time_to_next_job = float('inf') # No more jobs, set to infinity or some large number to avoid triggering again if jobs: # Update time to next job based on the next job's scheduled time time_to_next_job = job['submit_time'] else: # No more jobs, set to infinity or some large number to avoid triggering again time_to_next_job = float('inf') yield self.tick() # Stop the simulation if no more jobs running or are in the queue Loading Loading
main.py +1 −1 Original line number Diff line number Diff line Loading @@ -42,7 +42,7 @@ parser.add_argument('-p', '--plot', nargs='+', choices=['power', 'loss', 'pue', choices = ['png', 'svg', 'jpg', 'pdf', 'eps'] parser.add_argument('--imtype', type=str, choices=choices, default=choices[0], help='Plot image type') parser.add_argument('--system', type=str, default='frontier', help='System config to use') choices = ['fcfs', 'sjf', 'prq'] choices = ['fcfs', 'sjf', 'prq', 'backfill'] parser.add_argument('-s', '--schedule', type=str, choices=choices, default=choices[0], help='Schedule policy to use') choices = ['random', 'benchmark', 'peak', 'idle'] parser.add_argument('-w', '--workload', type=str, choices=choices, default=choices[0], help='Type of synthetic workload') Loading
raps/policy.py 0 → 100644 +40 −0 Original line number Diff line number Diff line def find_backfill_job(queue, num_free_nodes, current_time): """ This implementation is based on pseudocode from Leonenkov and Zhumatiy. "Introducing new backfill-based scheduler for slurm resource manager." Procedia computer science 66 (2015): 661-669. """ # Compute shadow time first_job = queue[0] for job in queue: job.end_time = current_time + job.wall_time # Sort jobs according to their termination time (end_time) sorted_queue = sorted(queue, key=lambda job: job.end_time) # Loop over the list and collect nodes until the number of available nodes # is sufficient for the first job in the queue sum_nodes = 0 shadow_time = None for job in sorted_queue: sum_nodes += job.nodes_required if sum_nodes >= first_job.nodes_required: shadow_time = current_time + job.wall_time num_extra_nodes = sum_nodes - job.nodes_required break # Find backfill job backfill_job = None for job in queue: # condition1 checks that the job ends before first_job starts condition1 = job.nodes_required <= num_free_nodes \ and current_time + job.wall_time < shadow_time # condition2 checks that the job does not interfere with first_job condition2 = job.nodes_required <= min(num_free_nodes, num_extra_nodes) if condition1 or condition2: backfill_job = job break return backfill_job
raps/scheduler.py +63 −30 Original line number Diff line number Diff line Loading @@ -48,6 +48,7 @@ import pandas as pd from .config import load_config_variables from .job import Job, JobState from .policy import find_backfill_job from .utils import summarize_ranges, expand_ranges load_config_variables([ Loading Loading @@ -136,11 +137,12 @@ class Scheduler: self.policy = kwargs.get('schedule') self.sys_util_history = [] def add_job(self, job): # add job to queue self.queue.append(job) # sort queue if self.policy == 'fcfs': if self.policy == 'fcfs' or self.policy == 'backfill': self.queue.sort(key=lambda job: job.submit_time) elif self.policy == 'sjf': self.queue.sort(key=lambda job: job.wall_time) Loading @@ -149,47 +151,77 @@ class Scheduler: else: raise ValueError(f"The scheduling policy {self.policy} is not supported.") def schedule(self, jobs): """Schedule jobs.""" for job_info in jobs: job = Job(job_info, self.current_time) self.add_job(job) while self.queue: def assign_nodes_to_job(self, job): """Helper function to assign nodes to a job and update available nodes.""" if len(self.available_nodes) < job.nodes_required: # If there are not enough nodes, return or raise an error (handle as needed) raise ValueError(f"Not enough available nodes to schedule job {job.id}") job = self.queue.pop(0) synthetic_bool = len(self.available_nodes) >= job.nodes_required telemetry_bool = job.requested_nodes and job.requested_nodes[0] in self.available_nodes if synthetic_bool or telemetry_bool: if job.requested_nodes: if job.requested_nodes: # Telemetry replay # If the job has requested specific nodes, assign them job.scheduled_nodes = job.requested_nodes mask = ~np.isin(self.available_nodes, job.scheduled_nodes) self.available_nodes = np.array(self.available_nodes) self.available_nodes = self.available_nodes[mask] self.available_nodes = self.available_nodes.tolist() else: # Assign the nodes to this job and remove them from the available pool else: # Synthetic # Assign the nodes from available pool job.scheduled_nodes = self.available_nodes[:job.nodes_required] self.available_nodes = self.available_nodes[job.nodes_required:] # Set job start and end times job.start_time = self.current_time job.end_time = self.current_time + job.wall_time # Add the job to running jobs list # Mark the job as running job.state = JobState.RUNNING self.running.append(job) if job.id == 22: print('***', job.nodes_required, self.available_nodes, job.scheduled_nodes) def schedule(self, jobs): """Schedule jobs""" for job_info in jobs: job = Job(job_info, self.current_time) self.add_job(job) while self.queue: # Try scheduling the first job in the queue job = self.queue.pop(0) synthetic_bool = len(self.available_nodes) >= job.nodes_required telemetry_bool = job.requested_nodes and job.requested_nodes[0] in self.available_nodes if synthetic_bool or telemetry_bool: # Schedule job self.assign_nodes_to_job(job) if self.debug: scheduled_nodes = summarize_ranges(job.scheduled_nodes) print(f"t={self.current_time}: Scheduled job with wall time {job.wall_time} on nodes {scheduled_nodes}") print(f"t={self.current_time}: Scheduled job with wall time", f"{job.wall_time} on nodes {scheduled_nodes}") else: # If the job cannot be scheduled, either try backfilling or requeue it if self.queue and self.policy == "backfill": self.queue.insert(0, job) backfill_job = find_backfill_job(self.queue, len(self.available_nodes), self.current_time) if backfill_job: self.assign_nodes_to_job(backfill_job) self.queue.remove(backfill_job) if self.debug: scheduled_nodes = summarize_ranges(backfill_job.scheduled_nodes) print(backfill_job) print(f"t={self.current_time}: Backfilling job {backfill_job.id} with wall time", f"{job.wall_time} on nodes {scheduled_nodes}") else: self.queue.append(job) break def tick(self): """Simulate a timestep.""" completed_jobs = [job for job in self.running if job.end_time Loading Loading @@ -310,14 +342,15 @@ class Scheduler: # FMU inputs are N powers and the wetbulb temp fmu_inputs = self.cooling_model.generate_fmu_inputs(runtime_values, \ uncertainties=self.power_manager.uncertainties) cooling_inputs, datacenter_outputs, cep_outputs, pue = self.cooling_model.step(self.current_time, fmu_inputs, FMU_UPDATE_FREQ) cooling_inputs, datacenter_outputs, cep_outputs, pue = \ self.cooling_model.step(self.current_time, fmu_inputs, FMU_UPDATE_FREQ) # Get a dataframe of the power data power_df = self.power_manager.get_power_df(rack_power, rack_loss) if self.layout_manager: self.layout_manager.update_powertemp_array(power_df, datacenter_outputs, pue, pflops, gflop_per_watt,\ self.layout_manager.update_powertemp_array(power_df, \ datacenter_outputs, pue, pflops, gflop_per_watt, \ system_util, uncertainties=self.power_manager.uncertainties) self.layout_manager.update_pressflow_array(datacenter_outputs) Loading Loading @@ -378,10 +411,10 @@ class Scheduler: while self.current_time >= time_to_next_job and jobs: job = jobs.pop(0) self.schedule([job]) if jobs: time_to_next_job = job['submit_time'] # Update time to next job based on the next job's scheduled time else: time_to_next_job = float('inf') # No more jobs, set to infinity or some large number to avoid triggering again if jobs: # Update time to next job based on the next job's scheduled time time_to_next_job = job['submit_time'] else: # No more jobs, set to infinity or some large number to avoid triggering again time_to_next_job = float('inf') yield self.tick() # Stop the simulation if no more jobs running or are in the queue Loading