Commit 1c476f3e authored by Brewer, Wes's avatar Brewer, Wes
Browse files

feat: replace urgent boolean with named queue system



Move from a single urgent queue (job.urgent bool) to configurable named
queues defined in system config (scheduler.queues). Each queue has its
own reserved_nodes and max_runtime constraints. Jobs specify their queue
by name (defaulting to "default"), and the engine routes them accordingly.

Co-Authored-By: default avatarClaude Opus 4.6 <noreply@anthropic.com>
parent a280c411
Loading
Loading
Loading
Loading
+5 −0
Original line number Diff line number Diff line
@@ -48,6 +48,11 @@ scheduler:
    CANCELLED: 0.12
    TIMEOUT: 0.11
    NODE_FAIL: 0.01
  queues:
    default: {}
    urgent:
      reserved_nodes: 64
      max_runtime: 172800  # 2 days in seconds
uq:
  power_gpu_uncertainty: 0.05
  power_cpu_uncertainty: 0.05
+19 −17
Original line number Diff line number Diff line
@@ -68,7 +68,7 @@ class TickData:
    killed: list[Job]
    running: list[Job]
    queue: list[Job]
    urgent_queue: list[Job]
    named_queues: dict[str, list[Job]]
    down_nodes: list[int]
    power_df: Optional[pd.DataFrame]
    p_flops: Optional[float]
@@ -269,9 +269,11 @@ class Engine:
        # Initialize running and queue, etc.
        self.running = []
        self.queue = []
        self.urgent_queue = []
        self.urgent_reserved_nodes = sim_config.urgent_reserved_nodes
        self.urgent_max_runtime = sim_config.urgent_max_runtime_int
        # Build named queues from system config (every configured queue gets its own list)
        self.queue_configs = system_config.scheduler.queues
        self.named_queues: dict[str, list[Job]] = {
            name: [] for name in self.queue_configs
        }
        self.accounts = accounts
        self.telemetry = telemetry
        self.job_history_dict = []
@@ -363,10 +365,12 @@ class Engine:
                             or job.start_time >= self.current_timestep]
        # Convert them to Job instances and build list of eligible jobs.
        for job in eligible_jobs:
            if job.urgent:
                self.urgent_queue.append(job)
            else:
                self.queue.append(job)
            self._route_job_to_queue(job)

    def _route_job_to_queue(self, job):
        """Route a job to its named queue, falling back to 'default'."""
        queue_name = job.queue if job.queue in self.named_queues else "default"
        self.named_queues[queue_name].append(job)

    def add_eligible_jobs_to_queue(self, jobs_to_submit: List):
        """
@@ -382,12 +386,9 @@ class Engine:
        eligible_jobs = [job for job in jobs_to_submit if job.submit_time <= self.current_timestep]
        # Remove those jobs from jobs_to_submit:
        jobs_to_submit[:] = [job for job in jobs_to_submit if job.submit_time > self.current_timestep]
        # Convert them to Job instances, routing urgent jobs to the urgent queue.
        # Route jobs to their named queues.
        for job in eligible_jobs:
            if job.urgent:
                self.urgent_queue.append(job)
            else:
                self.queue.append(job)
            self._route_job_to_queue(job)
        if eligible_jobs != []:
            return True
        else:
@@ -512,9 +513,10 @@ class Engine:
                job.current_run_time = self.current_timestep - job.start_time

        # Stop the simulation if no more jobs are running or in the queue or in the job list.
        all_named_queues_empty = all(len(q) == 0 for q in self.named_queues.values())
        if autoshutdown and \
           len(self.queue) == 0 and \
           len(self.urgent_queue) == 0 and \
           all_named_queues_empty and \
           len(self.running) == 0 and \
           not replay and \
           len(all_jobs) == cursor and \
@@ -808,13 +810,13 @@ class Engine:
            has_new_additions = self.add_eligible_jobs_to_queue(jobs)
            need_reschedule = need_reschedule or has_new_additions

            # 3. Schedule jobs that are now in the queue (urgent first, then normal).
            # 3. Schedule jobs that are now in the queue (named queues first, then default).
            if need_reschedule:
                self.scheduler.schedule(self.queue, self.running,
                                        self.current_timestep,
                                        accounts=self.accounts,
                                        sorted=(not has_new_additions),
                                        urgent_queue=self.urgent_queue)
                                        named_queues=self.named_queues)

            if self.debug and self.current_timestep % self.config['UI_UPDATE_FREQ'] == 0:
                print(".", end="", flush=True)
@@ -832,7 +834,7 @@ class Engine:
                killed=killed_jobs,
                running=self.running,
                queue=self.queue,
                urgent_queue=self.urgent_queue,
                named_queues=self.named_queues,
                down_nodes=self.down_nodes,
                power_df=tick_return.power_df,
                p_flops=tick_return.p_flops,
+3 −3
Original line number Diff line number Diff line
@@ -58,7 +58,7 @@ def job_dict(*,
             trace_quanta: int | None = None,
             trace_missing_values: bool | None = False,
             downscale: int = 1,
             urgent: bool = False
             queue: str = "default"
             ):
    """ Return job info dictionary """
    return {
@@ -96,7 +96,7 @@ def job_dict(*,
        'trace_missing_values': trace_missing_values,
        'dilated': False,
        'downscale': downscale,
        'urgent': urgent
        'queue': queue
    }


@@ -161,7 +161,7 @@ class Job:
        # # current_time unused!
        # Initializations:
        self.power = 0
        self.urgent = False
        self.queue = "default"
        self.scheduled_nodes = []  # Explicit list of requested nodes
        self.nodes_required = 0  # If scheduled_nodes is set this can be derived.
        self.cpu_cores_required = 0
+11 −8
Original line number Diff line number Diff line
@@ -32,14 +32,17 @@ class Scheduler:
        else:
            raise ValueError(f"Policy not implemented: {self.policy}")

    def schedule(self, queue, running, current_time, accounts=None, sorted=False, urgent_queue=None):
        # Schedule urgent jobs first (always FCFS).
        if urgent_queue:
            urgent_queue.sort(key=lambda job: job.submit_time)
            for job in urgent_queue[:]:
    def schedule(self, queue, running, current_time, accounts=None, sorted=False, named_queues=None):
        # Schedule named queues first (non-default queues, always FCFS).
        if named_queues:
            for name, named_q in named_queues.items():
                if name == "default" or not named_q:
                    continue
                named_q.sort(key=lambda job: job.submit_time)
                for job in named_q[:]:
                    nodes_available = self.check_available_nodes(job)
                    if nodes_available:
                    self.place_job_and_manage_queues(job, urgent_queue, running, current_time)
                        self.place_job_and_manage_queues(job, named_q, running, current_time)

        # Sort the queue in place.
        if not sorted:
+11 −8
Original line number Diff line number Diff line
@@ -63,14 +63,17 @@ class Scheduler:
        else:
            raise ValueError(f"Policy not implemented: {self.policy}")

    def schedule(self, queue, running, current_time, accounts=None, sorted=False, urgent_queue=None):
        # Schedule urgent jobs first (always FCFS).
        if urgent_queue:
            urgent_queue.sort(key=lambda job: job.submit_time)
            for job in urgent_queue[:]:
    def schedule(self, queue, running, current_time, accounts=None, sorted=False, named_queues=None):
        # Schedule named queues first (non-default queues, always FCFS).
        if named_queues:
            for name, named_q in named_queues.items():
                if name == "default" or not named_q:
                    continue
                named_q.sort(key=lambda job: job.submit_time)
                for job in named_q[:]:
                    nodes_available = self.check_available_nodes(job)
                    if nodes_available:
                    self.place_job_and_manage_queues(job, urgent_queue, running, current_time)
                        self.place_job_and_manage_queues(job, named_q, running, current_time)

        # Sort the queue in place.
        if not sorted:
Loading