Commit 5fa83847 authored by Brewer, Wes's avatar Brewer, Wes
Browse files

feat: add urgent job queue with priority scheduling

parent 76df563a
Loading
Loading
Loading
Loading
+18 −5
Original line number Diff line number Diff line
@@ -68,6 +68,7 @@ class TickData:
    killed: list[Job]
    running: list[Job]
    queue: list[Job]
    urgent_queue: list[Job]
    down_nodes: list[int]
    power_df: Optional[pd.DataFrame]
    p_flops: Optional[float]
@@ -268,6 +269,7 @@ class Engine:
        # Initialize running and queue, etc.
        self.running = []
        self.queue = []
        self.urgent_queue = []
        self.accounts = accounts
        self.telemetry = telemetry
        self.job_history_dict = []
@@ -358,7 +360,11 @@ class Engine:
                             job.start_time is None
                             or job.start_time >= self.current_timestep]
        # Convert them to Job instances and build list of eligible jobs.
        self.queue += eligible_jobs
        for job in eligible_jobs:
            if job.urgent:
                self.urgent_queue.append(job)
            else:
                self.queue.append(job)

    def add_eligible_jobs_to_queue(self, jobs_to_submit: List):
        """
@@ -374,8 +380,12 @@ class Engine:
        eligible_jobs = [job for job in jobs_to_submit if job.submit_time <= self.current_timestep]
        # Remove those jobs from jobs_to_submit:
        jobs_to_submit[:] = [job for job in jobs_to_submit if job.submit_time > self.current_timestep]
        # Convert them to Job instances and build list of eligible jobs.
        self.queue += eligible_jobs
        # Convert them to Job instances, routing urgent jobs to the urgent queue.
        for job in eligible_jobs:
            if job.urgent:
                self.urgent_queue.append(job)
            else:
                self.queue.append(job)
        if eligible_jobs != []:
            return True
        else:
@@ -502,6 +512,7 @@ class Engine:
        # Stop the simulation if no more jobs are running or in the queue or in the job list.
        if autoshutdown and \
           len(self.queue) == 0 and \
           len(self.urgent_queue) == 0 and \
           len(self.running) == 0 and \
           not replay and \
           len(all_jobs) == cursor and \
@@ -795,12 +806,13 @@ class Engine:
            has_new_additions = self.add_eligible_jobs_to_queue(jobs)
            need_reschedule = need_reschedule or has_new_additions

            # 3. Schedule jobs that are now in the queue.
            # 3. Schedule jobs that are now in the queue (urgent first, then normal).
            if need_reschedule:
                self.scheduler.schedule(self.queue, self.running,
                                        self.current_timestep,
                                        accounts=self.accounts,
                                        sorted=(not has_new_additions))
                                        sorted=(not has_new_additions),
                                        urgent_queue=self.urgent_queue)

            if self.debug and self.current_timestep % self.config['UI_UPDATE_FREQ'] == 0:
                print(".", end="", flush=True)
@@ -818,6 +830,7 @@ class Engine:
                killed=killed_jobs,
                running=self.running,
                queue=self.queue,
                urgent_queue=self.urgent_queue,
                down_nodes=self.down_nodes,
                power_df=tick_return.power_df,
                p_flops=tick_return.p_flops,
+5 −2
Original line number Diff line number Diff line
@@ -57,7 +57,8 @@ def job_dict(*,
             trace_end_time: int | None = 0,
             trace_quanta: int | None = None,
             trace_missing_values: bool | None = False,
             downscale: int = 1
             downscale: int = 1,
             urgent: bool = False
             ):
    """ Return job info dictionary """
    return {
@@ -94,7 +95,8 @@ def job_dict(*,
        'trace_quanta': trace_quanta,
        'trace_missing_values': trace_missing_values,
        'dilated': False,
        'downscale': downscale
        'downscale': downscale,
        'urgent': urgent
    }


@@ -159,6 +161,7 @@ class Job:
        # # current_time unused!
        # Initializations:
        self.power = 0
        self.urgent = False
        self.scheduled_nodes = []  # Explicit list of requested nodes
        self.nodes_required = 0  # If scheduled_nodes is set this can be derived.
        self.cpu_cores_required = 0
+9 −1
Original line number Diff line number Diff line
@@ -32,7 +32,15 @@ class Scheduler:
        else:
            raise ValueError(f"Policy not implemented: {self.policy}")

    def schedule(self, queue, running, current_time, accounts=None, sorted=False):
    def schedule(self, queue, running, current_time, accounts=None, sorted=False, urgent_queue=None):
        # Schedule urgent jobs first (always FCFS).
        if urgent_queue:
            urgent_queue.sort(key=lambda job: job.submit_time)
            for job in urgent_queue[:]:
                nodes_available = self.check_available_nodes(job)
                if nodes_available:
                    self.place_job_and_manage_queues(job, urgent_queue, running, current_time)

        # Sort the queue in place.
        if not sorted:
            queue[:] = self.sort_jobs(queue, accounts)
+9 −1
Original line number Diff line number Diff line
@@ -63,7 +63,15 @@ class Scheduler:
        else:
            raise ValueError(f"Policy not implemented: {self.policy}")

    def schedule(self, queue, running, current_time, accounts=None, sorted=False):
    def schedule(self, queue, running, current_time, accounts=None, sorted=False, urgent_queue=None):
        # Schedule urgent jobs first (always FCFS).
        if urgent_queue:
            urgent_queue.sort(key=lambda job: job.submit_time)
            for job in urgent_queue[:]:
                nodes_available = self.check_available_nodes(job)
                if nodes_available:
                    self.place_job_and_manage_queues(job, urgent_queue, running, current_time)

        # Sort the queue in place.
        if not sorted:
            queue[:] = self.sort_jobs(queue, accounts)
+9 −1
Original line number Diff line number Diff line
@@ -32,7 +32,15 @@ class Scheduler:
        else:
            raise ValueError(f"Policy not implemented: {self.policy}")

    def schedule(self, queue, running, current_time, accounts=None, sorted=False):
    def schedule(self, queue, running, current_time, accounts=None, sorted=False, urgent_queue=None):
        # Schedule urgent jobs first (always FCFS).
        if urgent_queue:
            urgent_queue.sort(key=lambda job: job.submit_time)
            for job in urgent_queue[:]:
                node_id = self.check_available_nodes(job)
                if node_id is not None:
                    self.place_job_and_manage_queues(job, urgent_queue, running, current_time, node_id)

        # Sort the queue in place.
        if not sorted:
            queue[:] = self.sort_jobs(queue, accounts)