Commit ebe06dfb authored by Brewer, Wes's avatar Brewer, Wes
Browse files

feat: enforce reserved_nodes capacity limits on named queues



Schedulers now check that total nodes used by running jobs from a named
queue do not exceed its reserved_nodes before scheduling additional jobs.
The metascheduler also respects these limits at dispatch time, preventing
oversized jobs from being sent to a site's urgent queue.

Co-Authored-By: default avatarClaude Opus 4.6 <noreply@anthropic.com>
parent 1c476f3e
Loading
Loading
Loading
Loading
+5 −0
Original line number Diff line number Diff line
@@ -47,3 +47,8 @@ scheduler:
    CANCELLED: 0.12
    TIMEOUT: 0.11
    NODE_FAIL: 0.01
  queues:
    default: {}
    urgent:
      reserved_nodes: 64
      max_runtime: 172800  # 2 days in seconds
+5 −0
Original line number Diff line number Diff line
@@ -49,3 +49,8 @@ scheduler:
    CANCELLED: 0.12
    TIMEOUT: 0.11
    NODE_FAIL: 0.01
  queues:
    default: {}
    urgent:
      reserved_nodes: 64
      max_runtime: 172800  # 2 days in seconds
+5 −2
Original line number Diff line number Diff line
@@ -268,12 +268,14 @@ class Engine:
        )
        # Initialize running and queue, etc.
        self.running = []
        self.queue = []
        # Build named queues from system config (every configured queue gets its own list)
        self.queue_configs = system_config.scheduler.queues
        self.named_queues: dict[str, list[Job]] = {
            name: [] for name in self.queue_configs
        }
        # self.queue is the "default" named queue — same list object, so schedulers
        # that operate on self.queue automatically work with named_queues["default"].
        self.queue = self.named_queues["default"]
        self.accounts = accounts
        self.telemetry = telemetry
        self.job_history_dict = []
@@ -816,7 +818,8 @@ class Engine:
                                        self.current_timestep,
                                        accounts=self.accounts,
                                        sorted=(not has_new_additions),
                                        named_queues=self.named_queues)
                                        named_queues=self.named_queues,
                                        queue_configs=self.queue_configs)

            if self.debug and self.current_timestep % self.config['UI_UPDATE_FREQ'] == 0:
                print(".", end="", flush=True)
+16 −4
Original line number Diff line number Diff line
@@ -57,6 +57,8 @@ def _inject_job(engine, job_dict: dict):
    cpu_trace = (cpu_util * np.ones(num_samples)).tolist()
    gpu_trace = (gpu_util * np.ones(num_samples)).tolist()

    queue_name = job_dict.get('queue', 'urgent')

    # Convert FedJob fields to RAPS Job
    raps_job = Job(make_job_dict(
        nodes_required=job_dict['nodes_required'],
@@ -71,12 +73,13 @@ def _inject_job(engine, job_dict: dict):
        ntx_trace=None,
        nrx_trace=None,
        trace_quanta=trace_quanta,
        queue=queue_name,
    ))

    # Add directly to queue for immediate scheduling consideration
    # Route to the appropriate named queue for immediate scheduling.
    # (Adding to engine.jobs wouldn't work because the simulation loop
    # copies jobs at startup and doesn't see later additions)
    engine.queue.append(raps_job)
    engine._route_job_to_queue(raps_job)

    # Trigger the scheduler to consider the new job immediately
    engine.scheduler.schedule(
@@ -84,7 +87,9 @@ def _inject_job(engine, job_dict: dict):
        engine.running,
        engine.current_timestep,
        accounts=engine.accounts,
        sorted=False
        sorted=False,
        named_queues=engine.named_queues,
        queue_configs=engine.queue_configs,
    )


@@ -113,6 +118,7 @@ def _poll_site_metrics(engine, tick: int, tick_data=None) -> Optional[Dict[str,
            'run_time': getattr(job, 'current_run_time', 0),
            'state': job.current_state.value if hasattr(job.current_state, 'value') else str(job.current_state),
            'account': getattr(job, 'account', ''),
            'queue': getattr(job, 'queue', 'default'),
        })

    # Identify overdue running jobs (run_time > expected_run_time)
@@ -140,6 +146,7 @@ def _poll_site_metrics(engine, tick: int, tick_data=None) -> Optional[Dict[str,
            'wall_time': getattr(job, 'time_limit', getattr(job, 'wall_time', 0)),
            'state': 'PD',
            'account': getattr(job, 'account', ''),
            'queue': getattr(job, 'queue', 'default'),
        })

    # Extract richer metrics from tick_data if available
@@ -196,7 +203,12 @@ def site_worker_main(site_name: str,
    # Create the simulation generator
    sim_gen = engine.run_simulation()

    status_out_q.put({"site": site_name, "event": "READY"})
    # Report queue configs so the metascheduler can enforce capacity limits
    queue_configs = {
        name: {"reserved_nodes": qc.reserved_nodes, "max_runtime": qc.max_runtime}
        for name, qc in engine.queue_configs.items()
    }
    status_out_q.put({"site": site_name, "event": "READY", "queue_configs": queue_configs})

    tick = 0

+16 −1
Original line number Diff line number Diff line
@@ -32,14 +32,29 @@ class Scheduler:
        else:
            raise ValueError(f"Policy not implemented: {self.policy}")

    def schedule(self, queue, running, current_time, accounts=None, sorted=False, named_queues=None):
    def schedule(self, queue, running, current_time, accounts=None, sorted=False,
                 named_queues=None, queue_configs=None):
        # Schedule named queues first (non-default queues, always FCFS).
        if named_queues:
            for name, named_q in named_queues.items():
                if name == "default" or not named_q:
                    continue
                named_q.sort(key=lambda job: job.submit_time)

                # Determine reserved_nodes cap for this queue
                reserved = 0
                if queue_configs and name in queue_configs:
                    reserved = getattr(queue_configs[name], 'reserved_nodes', 0)

                for job in named_q[:]:
                    # Enforce reserved_nodes capacity: count nodes already
                    # consumed by running jobs from this queue.
                    if reserved > 0:
                        used = sum(j.nodes_required for j in running
                                   if getattr(j, 'queue', 'default') == name)
                        if used + job.nodes_required > reserved:
                            continue  # Queue full — job stays pending

                    nodes_available = self.check_available_nodes(job)
                    if nodes_available:
                        self.place_job_and_manage_queues(job, named_q, running, current_time)
Loading