Commit 02b9a3d6 authored by Brewer, Wes's avatar Brewer, Wes
Browse files

Merge branch 'refactor-policy' into 'main'

Refactor policy

See merge request !58
parents 7b87aa7a a09af71f
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -42,7 +42,7 @@ parser.add_argument('-p', '--plot', nargs='+', choices=['power', 'loss', 'pue',
choices = ['png', 'svg', 'jpg', 'pdf', 'eps']
parser.add_argument('--imtype', type=str, choices=choices, default=choices[0], help='Plot image type')
parser.add_argument('--system', type=str, default='frontier', help='System config to use')
choices = ['fcfs', 'sjf', 'prq', 'backfill']
choices = ['fcfs', 'sjf', 'priority', 'backfill']
parser.add_argument('-s', '--schedule', type=str, choices=choices, default=choices[0], help='Schedule policy to use')
choices = ['random', 'benchmark', 'peak', 'idle']
parser.add_argument('-w', '--workload', type=str, choices=choices, default=choices[0], help='Type of synthetic workload')
+62 −40
Original line number Diff line number Diff line
from enum import Enum

class PolicyType(Enum):
    FCFS = 'fcfs'
    BACKFILL = 'backfill'
    DEADLINE = 'deadline'
    PRIORITY = 'priority'
    SJF = 'sjf'
    
def find_backfill_job(queue, num_free_nodes, current_time):

class Policy:

    def __init__(self, strategy):
        self.strategy = PolicyType(strategy)

    def sort_jobs(self, jobs):
        if self.strategy == PolicyType.FCFS or self.strategy == PolicyType.BACKFILL:
            return sorted(jobs, key=lambda job: job.submit_time)
        elif self.strategy == PolicyType.SJF:
            return sorted(jobs, key=lambda job: job.wall_time)
        elif self.strategy == PolicyType.PRIORITY:
            return sorted(jobs, key=lambda job: job.priority, reverse=True)
        else:
            raise ValueError(f"Unknown policy type: {self.policy_type}")

    def find_backfill_job(self, queue, num_free_nodes, current_time):
        """ This implementation is based on pseudocode from Leonenkov and Zhumatiy.
            "Introducing new backfill-based scheduler for slurm resource manager."
            Procedia computer science 66 (2015): 661-669. """

    # Compute shadow time
        first_job = queue[0]

        for job in queue: job.end_time = current_time + job.wall_time
@@ -13,8 +35,8 @@ def find_backfill_job(queue, num_free_nodes, current_time):
        # Sort jobs according to their termination time (end_time)
        sorted_queue = sorted(queue, key=lambda job: job.end_time)

    # Loop over the list and collect nodes until the number of available nodes
    # is sufficient for the first job in the queue
        # Compute shadow time - loop over the list and collect nodes until the 
        # number of available nodes is sufficient for the first job in the queue
        sum_nodes = 0
        shadow_time = None
        for job in sorted_queue:
+8 −21
Original line number Diff line number Diff line
@@ -48,7 +48,7 @@ import pandas as pd

from .config import load_config_variables
from .job import Job, JobState
from .policy import find_backfill_job
from .policy import Policy, PolicyType
from .utils import summarize_ranges, expand_ranges

load_config_variables([
@@ -134,22 +134,13 @@ class Scheduler:
        self.debug = kwargs.get('debug')
        self.output = kwargs.get('output')
        self.replay = kwargs.get('replay')
        self.policy = kwargs.get('schedule')
        self.policy = Policy(strategy=kwargs.get('schedule'))
        self.sys_util_history = []


    def add_job(self, job):
        # add job to queue
        self.queue.append(job)
        # sort queue
        if self.policy == 'fcfs' or self.policy == 'backfill':
            self.queue.sort(key=lambda job: job.submit_time)
        elif self.policy == 'sjf':
            self.queue.sort(key=lambda job: job.wall_time)
        elif self.policy == 'prq':
            self.queue.sort(key=lambda job: -job.priority)
        else:
            raise ValueError(f"The scheduling policy {self.policy} is not supported.")
        self.queue = self.policy.sort_jobs(self.queue)


    def assign_nodes_to_job(self, job):
@@ -158,14 +149,14 @@ class Scheduler:
            # If there are not enough nodes, return or raise an error (handle as needed)
            raise ValueError(f"Not enough available nodes to schedule job {job.id}")

        if job.requested_nodes: # Telemetry replay
        if job.requested_nodes: # replay case
            # If the job has requested specific nodes, assign them
            job.scheduled_nodes = job.requested_nodes
            mask = ~np.isin(self.available_nodes, job.scheduled_nodes)
            self.available_nodes = np.array(self.available_nodes)
            self.available_nodes = self.available_nodes[mask]
            self.available_nodes = self.available_nodes.tolist()
        else: # Synthetic
        else: # synthetic or reschedule case
            # Assign the nodes from available pool
            job.scheduled_nodes = self.available_nodes[:job.nodes_required]
            self.available_nodes = self.available_nodes[job.nodes_required:]
@@ -177,7 +168,6 @@ class Scheduler:
        # Mark the job as running
        job.state = JobState.RUNNING
        self.running.append(job)
        if job.id == 22: print('***', job.nodes_required, self.available_nodes, job.scheduled_nodes)


    def schedule(self, jobs):
@@ -204,17 +194,15 @@ class Scheduler:
                          f"{job.wall_time} on nodes {scheduled_nodes}")

            else:

                # If the job cannot be scheduled, either try backfilling or requeue it
                if self.queue and self.policy == "backfill":
                if self.queue and self.policy.strategy == PolicyType.BACKFILL:
                    self.queue.insert(0, job)
                    backfill_job = find_backfill_job(self.queue, len(self.available_nodes), self.current_time)
                    backfill_job = self.policy.find_backfill_job(self.queue, len(self.available_nodes), self.current_time)
                    if backfill_job:
                        self.assign_nodes_to_job(backfill_job)
                        self.queue.remove(backfill_job)
                        if self.debug:
                            scheduled_nodes = summarize_ranges(backfill_job.scheduled_nodes)
                            print(backfill_job)
                            print(f"t={self.current_time}: Backfilling job {backfill_job.id} with wall time",
                                  f"{backfill_job.wall_time} on nodes {scheduled_nodes}")
                else:
@@ -242,7 +230,6 @@ class Scheduler:
                job.state = JobState.COMPLETED

            if job.state == JobState.RUNNING:

                # Deal with node that fails during the course of a running job
                #if any(node in job.scheduled_nodes for node in newly_downed_nodes):
                if False: # currently disabled b/c not working correctly