Commit 408a3ea8 authored by Brewer, Wes's avatar Brewer, Wes
Browse files

Get backfill working... usage: python main.py -s backfill

parent 4106d2af
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -42,7 +42,7 @@ parser.add_argument('-p', '--plot', nargs='+', choices=['power', 'loss', 'pue',
choices = ['png', 'svg', 'jpg', 'pdf', 'eps']
parser.add_argument('--imtype', type=str, choices=choices, default=choices[0], help='Plot image type')
parser.add_argument('--system', type=str, default='frontier', help='System config to use')
choices = ['fcfs', 'sjf', 'prq']
choices = ['fcfs', 'sjf', 'prq', 'backfill']
parser.add_argument('-s', '--schedule', type=str, choices=choices, default=choices[0], help='Schedule policy to use')
choices = ['random', 'benchmark', 'peak', 'idle']
parser.add_argument('-w', '--workload', type=str, choices=choices, default=choices[0], help='Type of synthetic workload')

raps/policy.py

0 → 100644
+40 −0
Original line number Diff line number Diff line


def find_backfill_job(queue, num_free_nodes, current_time):
    """ This implementation is based on pseudocode from Leonenkov and Zhumatiy.
        "Introducing new backfill-based scheduler for slurm resource manager."
        Procedia computer science 66 (2015): 661-669. """

    # Compute shadow time
    first_job = queue[0]

    for job in queue: job.end_time = current_time + job.wall_time

    # Sort jobs according to their termination time (end_time)
    sorted_queue = sorted(queue, key=lambda job: job.end_time)

    # Loop over the list and collect nodes until the number of available nodes
    # is sufficient for the first job in the queue
    sum_nodes = 0
    shadow_time = None
    for job in sorted_queue:
        sum_nodes += job.nodes_required
        if sum_nodes >= first_job.nodes_required:
            shadow_time = current_time + job.wall_time
            num_extra_nodes = sum_nodes - job.nodes_required
            break

    # Find backfill job
    backfill_job = None
    for job in queue:
        # condition1 checks that the job ends before first_job starts
        condition1 = job.nodes_required <= num_free_nodes \
                     and current_time + job.wall_time < shadow_time
        # condition2 checks that the job does not interfere with first_job
        condition2 = job.nodes_required <= min(num_free_nodes, num_extra_nodes)

        if condition1 or condition2:
            backfill_job = job
            break

    return backfill_job
+63 −50
Original line number Diff line number Diff line
@@ -48,6 +48,7 @@ import pandas as pd

from .config import load_config_variables
from .job import Job, JobState
from .policy import find_backfill_job
from .utils import summarize_ranges, expand_ranges

load_config_variables([
@@ -69,26 +70,6 @@ load_config_variables([
], globals())


def shadow_time(queue):
    first_job = queue[0]  
    # Sort jobs according to their termination time (end_time)
    sorted_queue = sorted(queue, key=lambda job: job.end_time)
    # Loop over the list and collect nodes until the number of available nodes
    # is sufficient for the first job in the queue 
    sum_nodes = 0
    shadow_time = None
    for job in sorted_queue:
        sum_nodes += job.nodes_required
        if sum_nodes >= first_job.nodes_required:
            shadow_time = job.end_time
            break
    return shadow_time


def find_backfill_job(queue):
    pass


@dataclasses.dataclass
class TickData:
    """ Represents the state output from the simulation each tick """
@@ -156,11 +137,12 @@ class Scheduler:
        self.policy = kwargs.get('schedule')
        self.sys_util_history = []


    def add_job(self, job):
        # add job to queue
        self.queue.append(job)
        # sort queue
        if self.policy == 'fcfs':
        if self.policy == 'fcfs' or self.policy == 'backfill':
            self.queue.sort(key=lambda job: job.submit_time)
        elif self.policy == 'sjf':
            self.queue.sort(key=lambda job: job.wall_time)
@@ -169,47 +151,77 @@ class Scheduler:
        else:
            raise ValueError(f"The scheduling policy {self.policy} is not supported.")
    
    def schedule(self, jobs):
        """Schedule jobs."""
        for job_info in jobs:
            job = Job(job_info, self.current_time)
            self.add_job(job)

        while self.queue:

            job = self.queue.pop(0)
            synthetic_bool = len(self.available_nodes) >= job.nodes_required
            telemetry_bool = job.requested_nodes and job.requested_nodes[0] in self.available_nodes

            if synthetic_bool or telemetry_bool:
    def assign_nodes_to_job(self, job):
        """Helper function to assign nodes to a job and update available nodes."""
        if len(self.available_nodes) < job.nodes_required:
            # If there are not enough nodes, return or raise an error (handle as needed)
            raise ValueError(f"Not enough available nodes to schedule job {job.id}")

                if job.requested_nodes:
        if job.requested_nodes: # Telemetry replay
            # If the job has requested specific nodes, assign them
            job.scheduled_nodes = job.requested_nodes
            mask = ~np.isin(self.available_nodes, job.scheduled_nodes)
            self.available_nodes = np.array(self.available_nodes)
            self.available_nodes = self.available_nodes[mask]
            self.available_nodes = self.available_nodes.tolist()

                else:
                    # Assign the nodes to this job and remove them from the available pool
        else: # Synthetic
            # Assign the nodes from available pool
            job.scheduled_nodes = self.available_nodes[:job.nodes_required]
            self.available_nodes = self.available_nodes[job.nodes_required:]

        # Set job start and end times
        job.start_time = self.current_time
        job.end_time = self.current_time + job.wall_time

                # Add the job to running jobs list
        # Mark the job as running
        job.state = JobState.RUNNING
        self.running.append(job)
        if job.id == 22: print('***', job.nodes_required, self.available_nodes, job.scheduled_nodes)


    def schedule(self, jobs):
        """Schedule jobs"""
        for job_info in jobs:
            job = Job(job_info, self.current_time)
            self.add_job(job)

        while self.queue:

            # Try scheduling the first job in the queue
            job = self.queue.pop(0)
            synthetic_bool = len(self.available_nodes) >= job.nodes_required
            telemetry_bool = job.requested_nodes and job.requested_nodes[0] in self.available_nodes

            if synthetic_bool or telemetry_bool:

                # Schedule job
                self.assign_nodes_to_job(job)

                if self.debug:
                    scheduled_nodes = summarize_ranges(job.scheduled_nodes)
                    print(f"t={self.current_time}: Scheduled job with wall time {job.wall_time} on nodes {scheduled_nodes}")
                    print(f"t={self.current_time}: Scheduled job with wall time",
                          f"{job.wall_time} on nodes {scheduled_nodes}")

            else:

                # If the job cannot be scheduled, either try backfilling or requeue it
                if self.queue and self.policy == "backfill":
                    self.queue.insert(0, job)
                    backfill_job = find_backfill_job(self.queue, len(self.available_nodes), self.current_time)
                    if backfill_job:
                        self.assign_nodes_to_job(backfill_job)
                        self.queue.remove(backfill_job)
                        if self.debug:
                            scheduled_nodes = summarize_ranges(backfill_job.scheduled_nodes)
                            print(backfill_job)
                            print(f"t={self.current_time}: Backfilling job {backfill_job.id} with wall time",
                                  f"{job.wall_time} on nodes {scheduled_nodes}")
                else:
                    self.queue.append(job)
                break


    def tick(self):
        """Simulate a timestep."""
        completed_jobs = [job for job in self.running if job.end_time
@@ -330,14 +342,15 @@ class Scheduler:
                # FMU inputs are N powers and the wetbulb temp
                fmu_inputs = self.cooling_model.generate_fmu_inputs(runtime_values, \
                             uncertainties=self.power_manager.uncertainties)
                cooling_inputs, datacenter_outputs, cep_outputs, pue = self.cooling_model.step(self.current_time,
                                                           fmu_inputs, FMU_UPDATE_FREQ)
                cooling_inputs, datacenter_outputs, cep_outputs, pue = \
                    self.cooling_model.step(self.current_time, fmu_inputs, FMU_UPDATE_FREQ)
                
                # Get a dataframe of the power data
                power_df = self.power_manager.get_power_df(rack_power, rack_loss)

                if self.layout_manager:
                    self.layout_manager.update_powertemp_array(power_df, datacenter_outputs, pue, pflops, gflop_per_watt,\
                    self.layout_manager.update_powertemp_array(power_df, \
                               datacenter_outputs, pue, pflops, gflop_per_watt, \
                               system_util, uncertainties=self.power_manager.uncertainties)
                    self.layout_manager.update_pressflow_array(datacenter_outputs)

@@ -398,10 +411,10 @@ class Scheduler:
            while self.current_time >= time_to_next_job and jobs:
                job = jobs.pop(0)
                self.schedule([job])
                if jobs:
                    time_to_next_job = job['submit_time']  # Update time to next job based on the next job's scheduled time
                else:
                    time_to_next_job = float('inf')  # No more jobs, set to infinity or some large number to avoid triggering again
                if jobs: # Update time to next job based on the next job's scheduled time
                    time_to_next_job = job['submit_time']
                else: # No more jobs, set to infinity or some large number to avoid triggering again
                    time_to_next_job = float('inf')
            yield self.tick()

            # Stop the simulation if no more jobs running or are in the queue