Commit 591076f9 authored by Maiterth, Matthias's avatar Maiterth, Matthias
Browse files

Refactored engine timestep loop into 3 steps. Updated the Job initilaization...

Refactored engine timestep loop into 3 steps. Updated the Job initilaization and added a priority based sorting function for the default scheduler.

1. add eligible jobs to queue
2. sort the queue
3. schedule.

Sorting and scheduling both sort, this needs to happen in one place. (Discuss with Wes)

The job initialization overwrote values set by the job_dict. This is resolved now.

The default scheduler now takes account information into account.
parent c9b49c74
Loading
Loading
Loading
Loading
+1 −2
Original line number Diff line number Diff line
@@ -31,7 +31,6 @@ from raps.telemetry import Telemetry
from raps.workload import Workload
from raps.weather import Weather
from raps.utils import create_casename, convert_to_seconds, write_dict_to_file, next_arrival
from raps.utils import toJSON

config = ConfigManager(system_name=args.system).get_config()

+1 −1
Original line number Diff line number Diff line
@@ -118,7 +118,7 @@ class Accounts:
        #update specific account associated with job
        if isinstance(jobstats, JobStatistics):
            if jobstats.account not in self.account_dict:
                self.account_dict[jobstats.account] = Account(self._account_id,jobstats.account,0)
                self.account_dict[jobstats.account] = Account(self._account_id,jobstats.account,0)  # new account from job's.account_id with priority 0
                self._account_id += 1
            account = self.account_dict[jobstats.account]
            account.update_statistics(jobstats,self.average_user)
+18 −11
Original line number Diff line number Diff line
@@ -65,10 +65,18 @@ class Engine:
        )
        print(f"Using scheduler: {scheduler_type}")


    # Unused!
    def add_job(self, job):
        self.queue.append(job)
        self.queue = self.scheduler.sort_jobs(self.queue)
        self.queue = self.scheduler.sort_jobs(self.queue)  # No need to sort here!

    def eligible_jobs(self,jobs_to_submit):
        eligible_jobs_list = []
        while jobs_to_submit and jobs_to_submit[0]['submit_time'] <= self.current_time:
            job_info = jobs_to_submit.pop(0)
            job = Job(job_info, self.current_time)
            eligible_jobs_list.append(job)
        return eligible_jobs_list


    def tick(self):
@@ -219,12 +227,11 @@ class Engine:
        jobs_to_submit = sorted(jobs, key=lambda j: j['submit_time'])

        for timestep in range(timesteps):
            # Submit jobs whose submit_time is <= current_time
            while jobs_to_submit and jobs_to_submit[0]['submit_time'] <= self.current_time:
                job_info = jobs_to_submit.pop(0)
                job = Job(job_info, self.current_time)
                self.add_job(job)

            # identify eligible jobs and add them to the queue.
            self.queue += self.eligible_jobs(jobs_to_submit)
            #sort the queue according to the policy
            self.queue = self.scheduler.sort_jobs(self.queue, self.accounts)
            # Schedule jobs that are now in the queue.
            self.scheduler.schedule(self.queue, self.running, self.current_time)

+7 −4
Original line number Diff line number Diff line
@@ -42,8 +42,6 @@ class Job:
    _id_counter = 0

    def __init__(self, job_dict, current_time, state=JobState.PENDING, account=None):
        for key, value in job_dict.items(): setattr(self, key, value)
        if not self.id: self.id = Job._get_next_id()
        # initializations
        self.start_time = None
        self.end_time = None
@@ -53,6 +51,12 @@ class Job:
        self.power_history = []
        self._state = state
        self.account = account
        # if a job dict was given, override the values from the job_dict:
        for key, value in job_dict.items():
            setattr(self, key, value)
        # in any case: provide a job_id!
        if not self.id:
            self.id = Job._get_next_id()

    def __repr__(self):
        """Return a string representation of the job."""
@@ -100,7 +104,6 @@ class Job:
        return JobStatistics(self)



class JobStatistics:
    """
    Reduced class for handling statistics after the job has finished.
+20 −10
Original line number Diff line number Diff line
from enum import Enum
from ..job import Job, JobState
#from ..job import Job, JobState  # Unused
from ..utils import summarize_ranges

from ..workload import MAX_PRIORITY

class PolicyType(Enum):
    """Supported scheduling policies."""
@@ -14,7 +15,6 @@ class PolicyType(Enum):
class Scheduler:
    """ Default job scheduler with various scheduling policies. """


    def __init__(self, config, policy, resource_manager=None):
        self.config = config
        self.policy = PolicyType(policy)
@@ -23,18 +23,29 @@ class Scheduler:
        self.resource_manager = resource_manager
        self.debug = False


    def sort_jobs(self, queue):
    def sort_jobs(self, queue, accounts=None):
        """Sort jobs based on the selected scheduling policy."""
        if self.policy == PolicyType.FCFS or self.policy == PolicyType.BACKFILL:
            return sorted(queue, key=lambda job: job.submit_time)
        elif self.policy == PolicyType.SJF:
            return sorted(queue, key=lambda job: job.wall_time)
        elif self.policy == PolicyType.PRIORITY:
            return sorted(queue, key=lambda job: job.priority, reverse=True)
            return self.sort_by_job_and_account_priority(queue, accounts)
        else:
            raise ValueError(f"Unknown policy type: {self.policy}")

    def sort_by_job_and_account_priority(self, queue, accounts=None):
        priority_tuple_list = []
        for job in queue:
            # create a tuple of the job and the priority
            priority = job.priority
            if job.account in accounts:
                priority += accounts[job.account].priority
                priority = max(priority, MAX_PRIORITY)
            priority_tuple_list.append((priority,job))
        priority_tuple_list = sorted(priority_tuple_list, key=lambda x:x[1], reverse=True)
        _, queue = zip(*priority_tuple_list)
        return queue

    def schedule(self, queue, running, current_time, debug=False):
        # Sort the queue in place.
@@ -63,16 +74,15 @@ class Scheduler:
            else:
                if self.policy == PolicyType.BACKFILL:
                    # Try to find a backfill candidate from the entire queue.
                    backfill_job = self.find_backfill_job(queue, len(available_nodes), current_time)
                    backfill_job = self.find_backfill_job(queue, len(self.resource_manager.available_nodes), current_time)
                    if backfill_job:
                        self.assign_nodes_to_job(backfill_job, available_nodes, current_time)
                        self.assign_nodes_to_job(backfill_job, self.resource_manager.available_nodes, current_time)
                        running.append(backfill_job)
                        queue.remove(backfill_job)
                        if debug:
                            scheduled_nodes = summarize_ranges(backfill_job.scheduled_nodes)
                            print(f"t={current_time}: Backfilling job {backfill_job.id} with wall time {backfill_job.wall_time} on nodes {scheduled_nodes}")


    def find_backfill_job(self, queue, num_free_nodes, current_time):
        """Finds a backfill job based on available nodes and estimated completion times.