Commit 84846573 authored by Brewer, Wes's avatar Brewer, Wes
Browse files

Fix injected job traces and reset FLOPS per tick

parent d6803184
Loading
Loading
Loading
Loading
+2 −0
Original line number Diff line number Diff line
@@ -10,6 +10,8 @@ class FLOPSManager():
        self.flop_state = np.zeros(self.config['SC_SHAPE'])

    def update_flop_state(self, scheduled_nodes, cpu_util, gpu_util):
        # Reset per-tick FLOP state so completed jobs don't accumulate forever.
        self.flop_state.fill(0)
        if len(scheduled_nodes) == 0:
            return
        cpu_util = np.asarray(cpu_util)
+23 −4
Original line number Diff line number Diff line
@@ -47,12 +47,15 @@ def _inject_job(engine, job_dict: dict):
    import numpy as np

    wall_time = job_dict['wall_time_s']
    trace_quanta = 15  # 15-second samples (matches RAPS convention)
    trace_quanta = engine.config.get('TRACE_QUANTA', 15)
    num_samples = max(1, wall_time // trace_quanta)

    # Generate random realistic traces
    cpu_trace = np.random.uniform(30, 90, num_samples).tolist()
    gpu_trace = np.random.uniform(40, 95, num_samples).tolist()
    # Generate traces consistent with internal random workloads:
    # utilizations are in [0, CPUS_PER_NODE] / [0, GPUS_PER_NODE]
    cpu_util = np.random.random() * engine.config.get('CPUS_PER_NODE', 1)
    gpu_util = np.random.random() * engine.config.get('GPUS_PER_NODE', 1)
    cpu_trace = (cpu_util * np.ones(num_samples)).tolist()
    gpu_trace = (gpu_util * np.ones(num_samples)).tolist()

    # Convert FedJob fields to RAPS Job
    raps_job = Job(make_job_dict(
@@ -112,6 +115,21 @@ def _poll_site_metrics(engine, tick: int, tick_data=None) -> Optional[Dict[str,
            'account': getattr(job, 'account', ''),
        })

    # Identify overdue running jobs (run_time > expected_run_time)
    overdue_jobs = []
    for job in engine.running:
        expected = getattr(job, 'expected_run_time', None)
        if expected is None:
            continue
        if job.current_run_time > expected:
            overdue_jobs.append({
                'id': job.id,
                'name': str(job.name),
                'nodes_required': job.nodes_required,
                'run_time': getattr(job, 'current_run_time', 0),
                'expected_run_time': expected,
            })

    # Build top queued jobs list (up to 5)
    top_queued_jobs = []
    for job in engine.queue[:5]:
@@ -151,6 +169,7 @@ def _poll_site_metrics(engine, tick: int, tick_data=None) -> Optional[Dict[str,
        'g_flops_w': g_flops_w,
        'top_running_jobs': top_running_jobs,
        'top_queued_jobs': top_queued_jobs,
        'overdue_jobs': overdue_jobs[:5],
        'sim_time': _get_sim_time(engine),
        'tick': tick,
    }