Commit 012571f3 authored by Maiterth, Matthias's avatar Maiterth, Matthias
Browse files

Added sorting function for the fugaku points.

parent a28d22da
Loading
Loading
Loading
Loading
+6 −12
Original line number Diff line number Diff line
@@ -29,6 +29,7 @@ class TickData:

class Engine:
    """Job scheduling simulation engine."""

    def __init__(self, *, power_manager, flops_manager, cooling_model=None, config, **kwargs):
        self.config = config
        self.down_nodes = summarize_ranges(self.config['DOWN_NODES'])
@@ -62,11 +63,6 @@ class Engine:
        )
        print(f"Using scheduler: {scheduler_type}")

    # Unused!
    def add_job(self, job):
        self.queue.append(job)
        self.queue = self.scheduler.sort_jobs(self.queue)  # No need to sort here!

    def eligible_jobs(self,jobs_to_submit):
        eligible_jobs_list = []
        while jobs_to_submit and jobs_to_submit[0]['submit_time'] <= self.current_time:
@@ -75,7 +71,6 @@ class Engine:
            eligible_jobs_list.append(job)
        return eligible_jobs_list


    def tick(self):
        """Simulate a timestep."""
        completed_jobs = [job for job in self.running if job.end_time is not None and job.end_time <= self.current_time]
@@ -141,7 +136,7 @@ class Engine:
            self.resource_manager.free_nodes_from_job(job)

        # Ask scheduler to schedule any jobs waiting in queue
        self.scheduler.schedule(self.queue, self.running, self.current_time)
        self.scheduler.schedule(self.queue, self.running, self.current_time, self.accounts)

        # Update the power array UI component
        rack_power, rect_losses = self.power_manager.compute_rack_power()
@@ -206,7 +201,6 @@ class Engine:
        self.current_time += 1
        return tick_data


    def run_simulation(self, jobs, timesteps, autoshutdown=False):
        """Generator that yields after each simulation tick."""
        self.timesteps = timesteps
@@ -216,9 +210,9 @@ class Engine:

        for timestep in range(timesteps):

            # identify eligible jobs and add them to the queue.
            # Identify eligible jobs and add them to the queue.
            self.queue += self.eligible_jobs(jobs_to_submit)
            #sort the queue according to the policy
            # Sort the queue according to the policy
            self.queue = self.scheduler.sort_jobs(self.queue, self.accounts)
            # Schedule jobs that are now in the queue.
            self.scheduler.schedule(self.queue, self.running, self.current_time, sorted=True)
+38 −2
Original line number Diff line number Diff line
@@ -9,6 +9,7 @@ class PolicyType(Enum):
    FCFS = 'fcfs'
    BACKFILL = 'backfill'
    PRIORITY = 'priority'
    FUGAKU_PTS = 'fugaku_pts'
    SJF = 'sjf'


@@ -31,13 +32,15 @@ class Scheduler:
            return sorted(queue, key=lambda job: job.wall_time)
        elif self.policy == PolicyType.PRIORITY:
            return sorted(queue, key=lambda job: job.priority, reverse=True)
        elif self.policy == PolicyType.FUGAKU_PTS:
            return self.sort_fugaku_redeeming(queue, accounts)
        else:
            raise ValueError(f"Unknown policy type: {self.policy}")

    def schedule(self, queue, running, current_time, sorted=False, debug=False):
    def schedule(self, queue, running, current_time, accounts=None, sorted=False, debug=False):
        # Sort the queue in place.
        if not sorted:
            queue[:] = self.sort_jobs(queue)
            queue[:] = self.sort_jobs(queue, accounts)

        # Iterate over a copy of the queue since we might remove items
        for job in queue[:]:
@@ -71,6 +74,7 @@ class Scheduler:
                            scheduled_nodes = summarize_ranges(backfill_job.scheduled_nodes)
                            print(f"t={current_time}: Backfilling job {backfill_job.id} with wall time {backfill_job.wall_time} on nodes {scheduled_nodes}")


    def find_backfill_job(self, queue, num_free_nodes, current_time):
        """Finds a backfill job based on available nodes and estimated completion times.

@@ -110,3 +114,35 @@ class Scheduler:
                return job

        return None

    def sort_fugaku_redeeming(self, queue, accounts=None):
        if queue == []:
            return queue
        # Priority queues not yet implemented:
        # Strategy: Sort by Fugaku Points Representing the Priority Queue
        # Everything with negative Fugaku Points get sorted according to normal priority
        priority_triple_list = []
        for job in queue:
            fugaku_priority = accounts.account_dict[job.account].fugaku_points
            # create a tuple of the job and the priority
            priority = job.priority
            priority_triple_list.append((fugaku_priority,priority,job))
        # Sort everythin according to fugaku_points
        priority_triple_list = sorted(priority_triple_list, key=lambda x:x[0], reverse=True)
        # Find the first element with negative fugaku_points
        for cutoff, triple in enumerate(priority_triple_list):
            fugaku_priority, _, _ = triple
            if fugaku_priority < 0:
                break
        first_part = priority_triple_list[:cutoff]
        # Sort everything afterwards according to job priority
        second_part = sorted(priority_triple_list[cutoff:], key=lambda x:x[1], reverse=True)
        queue_a = []
        queue_b = []
        if first_part != []:
            _, _, queue_a = zip(*first_part)
            queue_a = list(queue_a)
        if second_part != []:
            _, _, queue_b = zip(*second_part)
            queue_b = list(queue_b)
        return queue_a + queue_b