Commit 8b0bb67a authored by Brewer, Wes's avatar Brewer, Wes
Browse files

Refactor policy with separate policy module and Policy class

parent 7b87aa7a
Loading
Loading
Loading
Loading
+62 −40
Original line number Diff line number Diff line
from enum import Enum

class PolicyType(Enum):
    FCFS = 'fcfs'
    BACKFILL = 'backfill'
    DEADLINE = 'deadline'
    PRIORITY = 'priority'
    SJF = 'sfj'
    
def find_backfill_job(queue, num_free_nodes, current_time):

class Policy:

    def __init__(self, strategy):
        self.strategy = strategy

    def sort_jobs(self, jobs):
        if self.strategy == PolicyType.FCFS or PolicyType.BACKFILL:
            return sorted(jobs, key=lambda job: job.submit_time)
        elif self.strategy == PolicyType.SJF:
            return sorted(jobs, key=lambda job: job.wall_time)
        elif self.strategy == PolicyType.PRIORITY:
            return sorted(jobs, key=lambda job: job.priority, reverse=True)
        else:
            raise ValueError(f"Unknown policy type: {self.policy_type}")

    def find_backfill_job(self, queue, num_free_nodes, current_time):
        """ This implementation is based on pseudocode from Leonenkov and Zhumatiy.
            "Introducing new backfill-based scheduler for slurm resource manager."
            Procedia computer science 66 (2015): 661-669. """

    # Compute shadow time
        first_job = queue[0]

        for job in queue: job.end_time = current_time + job.wall_time
@@ -13,8 +35,8 @@ def find_backfill_job(queue, num_free_nodes, current_time):
        # Sort jobs according to their termination time (end_time)
        sorted_queue = sorted(queue, key=lambda job: job.end_time)

    # Loop over the list and collect nodes until the number of available nodes
    # is sufficient for the first job in the queue
        # Compute shadow time - loop over the list and collect nodes until the 
        # number of available nodes is sufficient for the first job in the queue
        sum_nodes = 0
        shadow_time = None
        for job in sorted_queue:
+9 −18
Original line number Diff line number Diff line
@@ -48,7 +48,7 @@ import pandas as pd

from .config import load_config_variables
from .job import Job, JobState
from .policy import find_backfill_job
from .policy import Policy, PolicyType
from .utils import summarize_ranges, expand_ranges

load_config_variables([
@@ -134,22 +134,13 @@ class Scheduler:
        self.debug = kwargs.get('debug')
        self.output = kwargs.get('output')
        self.replay = kwargs.get('replay')
        self.policy = kwargs.get('schedule')
        self.policy = Policy(strategy=kwargs.get('schedule'))
        self.sys_util_history = []


    def add_job(self, job):
        # add job to queue
        self.queue.append(job)
        # sort queue
        if self.policy == 'fcfs' or self.policy == 'backfill':
            self.queue.sort(key=lambda job: job.submit_time)
        elif self.policy == 'sjf':
            self.queue.sort(key=lambda job: job.wall_time)
        elif self.policy == 'prq':
            self.queue.sort(key=lambda job: -job.priority)
        else:
            raise ValueError(f"The scheduling policy {self.policy} is not supported.")
        self.queue = self.policy.sort_jobs(self.queue)


    def assign_nodes_to_job(self, job):
@@ -158,14 +149,14 @@ class Scheduler:
            # If there are not enough nodes, return or raise an error (handle as needed)
            raise ValueError(f"Not enough available nodes to schedule job {job.id}")

        if job.requested_nodes: # Telemetry replay
        if job.requested_nodes: # replay case
            # If the job has requested specific nodes, assign them
            job.scheduled_nodes = job.requested_nodes
            mask = ~np.isin(self.available_nodes, job.scheduled_nodes)
            self.available_nodes = np.array(self.available_nodes)
            self.available_nodes = self.available_nodes[mask]
            self.available_nodes = self.available_nodes.tolist()
        else: # Synthetic
        else: # synthetic or reschedule case
            # Assign the nodes from available pool
            job.scheduled_nodes = self.available_nodes[:job.nodes_required]
            self.available_nodes = self.available_nodes[job.nodes_required:]
@@ -206,15 +197,15 @@ class Scheduler:
            else:

                # If the job cannot be scheduled, either try backfilling or requeue it
                if self.queue and self.policy == "backfill":
                if self.queue and self.policy == PolicyType.BACKFILL:
                    self.queue.insert(0, job)
                    backfill_job = find_backfill_job(self.queue, len(self.available_nodes), self.current_time)
                    backfill_job = self.policy.find_backfill_job(self.queue, len(self.available_nodes), self.current_time)
                    if backfill_job:
                        print('here')
                        self.assign_nodes_to_job(backfill_job)
                        self.queue.remove(backfill_job)
                        if self.debug:
                            scheduled_nodes = summarize_ranges(backfill_job.scheduled_nodes)
                            print(backfill_job)
                            print(f"t={self.current_time}: Backfilling job {backfill_job.id} with wall time",
                                  f"{backfill_job.wall_time} on nodes {scheduled_nodes}")
                else: