Commit 10134923 authored by Maiterth, Matthias's avatar Maiterth, Matthias
Browse files

Removed Backfill Scheduling, and added backfill policy as BackfillType...

Removed Backfill Scheduling, and added backfill policy as BackfillType independent of scheduling policy.

Added argument --backfill
Augmented default scheduler
Implemented Backfill, and refactored.
The following are working: firstfit, EASY
The following are not implemnted: bestfit, greedy and conservative, and are left for other schedulers to implement at the moment.
parent f0fe1ab7
Loading
Loading
Loading
Loading
+2 −0
Original line number Diff line number Diff line
@@ -46,6 +46,8 @@ parser.add_argument('-w', '--workload', type=str, choices=choices, default=choic
# Scheduling options
choices = ['default', 'replay', 'nrel', 'anl', 'flux']
parser.add_argument('--scheduler', type=str, choices=choices, default=choices[0], help='Name of scheduler')
choices = [None, 'firstfit', 'bestfit', 'greedy', 'easy', 'conservative']
parser.add_argument('--backfill', type=str, choices=choices, default=None, help='Backfill Policy')
policies = [policy.value for policy in PolicyType]
choices = ['prescribed', 'poisson']
parser.add_argument('--arrival', default=choices[0], type=str, choices=choices, help=f'Modify arrival distribution ({choices[1]}) or use the original submit times ({choices[0]})')
+5 −1
Original line number Diff line number Diff line
@@ -59,12 +59,16 @@ class Engine:

        # Get scheduler type from command-line args or default
        scheduler_type = kwargs.get('scheduler', 'default')
        policy_type = kwargs.get('policy', None)
        backfill_type = kwargs.get('backfill', None)

        self.scheduler = load_scheduler(scheduler_type)(
            config=self.config,
            policy=kwargs.get('policy'),
            bfpolicy=kwargs.get('backfill'),
            resource_manager=self.resource_manager
        )
        print(f"Using scheduler: {scheduler_type}")
        print(f"Using scheduler: {scheduler_type}, with policy {policy_type} and backfill {backfill_type}")


    def add_running_jobs_to_queue(self, jobs_to_submit: List):
+9 −1
Original line number Diff line number Diff line
@@ -4,9 +4,17 @@ from enum import Enum
class PolicyType(Enum):
    """Supported scheduling policies."""
    FCFS = 'fcfs'
    BACKFILL = 'backfill'
    PRIORITY = 'priority'
    FUGAKU_PTS = 'fugaku_pts'
    REPLAY = 'replay'
    SJF = 'sjf'
    LJF = 'ljf'

class BackfillType(Enum):
    """Supported backfilling policies."""
    NONE = None
    FIRSTFIT = 'firstfit'
    BESTFIT = 'bestfit'
    GREEDY = 'greedy'
    EASY = 'easy'  # Earliest Available Start Time Yielding
    CONSERVATIVE = 'conservative'
+105 −79
Original line number Diff line number Diff line
@@ -4,15 +4,16 @@ from ..utils import summarize_ranges

from ..workload import MAX_PRIORITY

from ..policy import PolicyType
from ..policy import PolicyType, BackfillType


class Scheduler:
    """ Default job scheduler with various scheduling policies. """

    def __init__(self, config, policy, resource_manager=None):
    def __init__(self, config, policy, bfpolicy=None, resource_manager=None):
        self.config = config
        self.policy = PolicyType(policy)
        self.bfpolicy = BackfillType(bfpolicy)
        if resource_manager is None:
            raise ValueError("Scheduler requires a ResourceManager instance")
        self.resource_manager = resource_manager
@@ -20,7 +21,7 @@ class Scheduler:

    def sort_jobs(self, queue, accounts=None):
        """Sort jobs based on the selected scheduling policy."""
        if self.policy == PolicyType.FCFS or self.policy == PolicyType.BACKFILL:
        if self.policy == PolicyType.FCFS:
            return sorted(queue, key=lambda job: job.submit_time)
        elif self.policy == PolicyType.PRIORITY:
            return sorted(queue, key=lambda job: job.priority, reverse=True)
@@ -35,7 +36,7 @@ class Scheduler:
        else:
            raise ValueError(f"Policy not implemented: {self.policy}")

    def schedule(self, queue, running, current_time, accounts=None, sorted=False, debug=False):
    def schedule(self, queue, running, current_time, accounts=None, sorted=False):
        # Sort the queue in place.
        if not sorted:
            queue[:] = self.sort_jobs(queue, accounts)
@@ -44,61 +45,28 @@ class Scheduler:
        for job in queue[:]:
            if self.policy == PolicyType.REPLAY:
                if job.start_time > current_time:
                    continue
                    continue  # Replay: Job didn't start yet. Next!
                else:
                    pass
            else:
                pass
            # Make sure the requested nodes are available.
            nodes_available = False
            if job.requested_nodes:  # nodes specified, i.e., telemetry replay
                if len(job.requested_nodes) <= len(self.resource_manager.available_nodes):
                    if self.policy == PolicyType.REPLAY:  # Check if exact set is available:
                        nodes_available = set(job.requested_nodes).issubset(set(self.resource_manager.available_nodes))
                    else:
                        # Sufficiently large number of nodes available
                        # but no exact set is required!
                        nodes_available = True
                        # remove the request for specific nodes and ask for n nodes
                        job.nodes_required = len(job.requested_nodes)
                        job.requested_nodes = []
                else:
                    # Next we check if we continue or abort.
                    # This may be policy dependent. I break by default but this may not be correct.
                    if self.policy == PolicyType.FCFS or \
                       self.policy == PolicyType.PRIORITY or\
                       self.policy == PolicyType.FUGAKU_PTS or \
                       self.policy == PolicyType.LJF or \
                       False:  # self.policy == PolicyType ??
                        break  # The job at the front of the queue doesnt fit, wait until it fits.
                    elif self.policy == PolicyType.REPLAY or \
                         self.policy == PolicyType.BACKFILL or \
                         self.policy == PolicyType.SJF or\
                         False:
                        continue  # The job at the front of the queue doesn't fit, but there are other jobs that may fit, look at the next one.
                    else:
                        raise NotImplementedError("Depending on the Policy this choice should be explicit. Add the implementation above!")
            else:  # synthetic jobs dont have nodes assigned:
                nodes_available = len(self.resource_manager.available_nodes) >= job.nodes_required

            nodes_available = self.check_available_nodes(job)

            if nodes_available:
                self.resource_manager.assign_nodes_to_job(job, current_time)
                running.append(job)
                queue.remove(job)
                if debug:
                    scheduled_nodes = summarize_ranges(job.scheduled_nodes)
                    print(f"t={current_time}: Scheduled job {job.id} with wall time {job.wall_time} on nodes {scheduled_nodes}")
                self.place_job_and_manage_queues(job, queue, running, current_time)
            else:  # In case the job was not placed, see how we should continue:
                if self.bfpolicy is not None:
                    self.backfill(queue, running, current_time)

                # After backfill dedice continue processing the queue or wait, continuing may result in fairness issues.
                if self.policy in [PolicyType.REPLAY]:
                    continue  # Regardless if the job at the front of the queue doenst fit, try placing all of them.
                elif self.policy in [PolicyType.FCFS, PolicyType.PRIORITY,
                                     PolicyType.FUGAKU_PTS, PolicyType.LJF]:
                    break  # The job at the front of the queue doesnt fit stop processing the queue.
                else:
                # not sure if this does what it should!
                if self.policy == PolicyType.BACKFILL:
                    # Try to find a backfill candidate from the entire queue.
                    backfill_job = self.find_backfill_job(queue, len(self.resource_manager.available_nodes), current_time)
                    if backfill_job:
                        self.assign_nodes_to_job(backfill_job, self.resource_manager.available_nodes, current_time)
                        running.append(backfill_job)
                        queue.remove(backfill_job)
                        if debug:
                            scheduled_nodes = summarize_ranges(backfill_job.scheduled_nodes)
                            print(f"t={current_time}: Backfilling job {backfill_job.id} with wall time {backfill_job.wall_time} on nodes {scheduled_nodes}")
                    raise NotImplementedError("Depending on the Policy this choice should be explicit. Add the implementation above!")

    def prepare_system_state(self,jobs_to_submit:List, running, timestep_start):
        # def schedule(self, queue, running, current_time, accounts=None, sorted=False, debug=False):
@@ -130,46 +98,104 @@ class Scheduler:
        else:
            return jobs_to_submit

    def find_backfill_job(self, queue, num_free_nodes, current_time):
    def place_job_and_manage_queues(self, job, queue,running, current_time):
        self.resource_manager.assign_nodes_to_job(job, current_time)
        running.append(job)
        queue.remove(job)
        if self.debug:
            scheduled_nodes = summarize_ranges(job.scheduled_nodes)
            print(f"t={current_time}: Scheduled job {job.id} with wall time {job.wall_time} on nodes {scheduled_nodes}")

    def check_available_nodes(self,job):
        nodes_available = False
        if job.requested_nodes:  # nodes specified, i.e., telemetry replay
            if len(job.requested_nodes) <= len(self.resource_manager.available_nodes):
                if self.policy == PolicyType.REPLAY:  # Check if exact set is available:
                    nodes_available = set(job.requested_nodes).issubset(set(self.resource_manager.available_nodes))
                else:
                    # Sufficiently large number of nodes available
                    # but no exact set is required!
                    nodes_available = True
                    # remove the request for specific nodes and ask for n nodes
                    job.nodes_required = len(job.requested_nodes)
                    job.requested_nodes = []
            else:
                pass
        else:  # Exact nodes not specified (e.g. synthetic jobs dont have nodes assigned)
            nodes_available = len(self.resource_manager.available_nodes) >= job.nodes_required

        return nodes_available

    def backfill(self,queue:List, running:List, current_time):
        # Try to find a backfill candidate from the entire queue.
        while queue:
            backfill_job = self.find_backfill_job(queue, running, current_time)
            if backfill_job:
                self.place_job_and_manage_queues(backfill_job, queue, running, current_time)
            else:
                break

    def find_backfill_job(self, queue, running, current_time):
        """Finds a backfill job based on available nodes and estimated completion times.

        Based on pseudocode from Leonenkov and Zhumatiy, 'Introducing new backfill-based
        Loosely based on pseudocode from Leonenkov and Zhumatiy, 'Introducing new backfill-based
        scheduler for slurm resource manager.' Procedia computer science 66 (2015): 661-669.
        """

        if not queue:
            return None

        # Identify when the nex job in the queue could run as a time limit:
        first_job = queue[0]
        nodes_required = 0
        if first_job.requested_nodes:
            nodes_required = len(first_job.requested_nodes)
        else:
            nodes_required = first_job.nodes_required

        for job in queue:
            job.end_time = current_time + job.wall_time  # Estimate end time

        # Sort jobs according to their termination time (end_time)
        sorted_queue = sorted(queue, key=lambda job: job.end_time)

        # Compute shadow time by accumulating nodes
        sum_nodes = 0
        shadow_time = None
        num_extra_nodes = 0
        sorted_running = sorted(running, key=lambda job: job.end_time)

        for job in sorted_queue:
            sum_nodes += job.nodes_required
            if sum_nodes >= first_job.nodes_required:
                shadow_time = current_time + job.wall_time
                num_extra_nodes = sum_nodes - job.nodes_required
        # Identify when we have enough nodes therefore the start time of the first_job in line
        shadow_time_end = 0
        shadow_nodes_avail = len(self.resource_manager.available_nodes)
        for job in sorted_running:
            if shadow_nodes_avail >= nodes_required:
                break
            else:
                shadow_nodes_avail += job.nodes_required
                shadow_time_end = job.end_time

        time_limit = shadow_time_end - current_time
        # We now have the time_limit after which no backfilled job should end
        # as the next job in line has the necessary resrouces after this time limit.

        # Find and return the first job that fits
        if self.bfpolicy == BackfillType.EASY:
            queue[:] = sorted(queue, key=lambda job: job.submit_time)
            return self.return_first_fit(queue,time_limit)
        elif self.bfpolicy == BackfillType.FIRSTFIT:
            pass  # Stay with the prioritization!
            return self.return_first_fit(queue,time_limit)
        elif self.bfpolicy in [BackfillType.BESTFIT,
                               BackfillType.GREEDY,
                               BackfillType.CONSERVATIVE,
                               ]:
            raise NotImplementedError(f"{self.bfpolicy} not implemented! Please implement!")
        else:
            raise NotImplementedError(f"{self.bfpolicy} not implemented.")

        # Find backfill job
    def return_first_fit(self, queue, time_limit):
        for job in queue:
            condition1 = job.nodes_required <= num_free_nodes and current_time + job.wall_time < shadow_time
            condition2 = job.nodes_required <= min(num_free_nodes, num_extra_nodes)

            if condition1 or condition2:
            if job.time_limit <= time_limit:
                nodes_available = self.check_available_nodes(job)
                if nodes_available:
                    return job

                else:
                    continue
            else:
                continue
        return None


    def sort_fugaku_redeeming(self, queue, accounts=None):
        if queue == []:
            return queue