Commit f848a543 authored by Hines, Jesse's avatar Hines, Jesse
Browse files

Smarter job parsing

parent 0c577025
Loading
Loading
Loading
Loading
+45 −36
Original line number Diff line number Diff line
@@ -5,7 +5,7 @@ import orjson
from loguru import logger
from raps import Engine
from raps.job import Job as RapsJob
from raps.stats import get_engine_stats, get_job_stats
from raps.stats import get_engine_stats
from ..models.sim import ServerSimConfig
from ..models.output import (
    JobStateEnum, SchedulerSimJob, SchedulerSimJobPowerHistory, SchedulerSimSystem, CoolingSimCDU,
@@ -25,7 +25,7 @@ class SimTickOutput(NamedTuple):
    power_history: list[SchedulerSimJobPowerHistory]


def get_job_state_hash(job: RapsJob):
def get_job_hash(job: RapsJob):
    """ Return string that can be used to check if any meaningful state changed """
    return orjson.dumps([
        str(job.id),
@@ -78,9 +78,32 @@ def run_simulation(sim_config: ServerSimConfig):
        row, col = engine.telemetry.cdu_pos(cdu_index)
        return cdu_name, row, col
    
    def parse_job(job: RapsJob, timestamp: datetime):
        # Output jobs only if something changed
        time_end = offset_to_time(job.end_time)
        # end_time is set to its planned end once its scheduled. Set it to None for
        # unfinished jobs here
        if time_end is not None and (job.start_time is None or time_end > timestamp):
            time_end = None
        return SchedulerSimJob.model_validate({
            "job_id": str(job.id),
            "name": job.name,
            "node_count": job.nodes_required,
            "time_snapshot": timestamp,
            "time_submission": offset_to_time(job.submit_time),
            "time_limit": job.time_limit,
            "time_start": offset_to_time(job.start_time),
            "time_end": time_end,
            "state_current": JobStateEnum(job.current_state.name),
            "nodes": parse_nodes(tuple(job.scheduled_nodes)) if job.scheduled_nodes else None,
            # How does the new job.power attribute work? Is it total_energy?
            # Or just the current wattage?
            # power = job.power,
        })

    job_hashes: dict[int, bytes] = {}
    # Keep record of how many power history steps we've emitted for each job
    power_history_counts: dict[int, int] = {}
    prev_job_hashes: set[str] = set()
    job_power_history_counts: dict[int, int] = {}

    for tick in engine.run_simulation():
        timestamp: datetime = offset_to_time(tick.current_timestep)
@@ -102,7 +125,6 @@ def run_simulation(sim_config: ServerSimConfig):
                "down_nodes": down_nodes,
                "num_samples": engine_stats['num_samples'],

                # Don't call get_job_stats as it is slow
                "jobs_completed": engine.jobs_completed,
                "jobs_running": len(tick.running),
                "jobs_pending": len(tick.queue),
@@ -125,44 +147,31 @@ def run_simulation(sim_config: ServerSimConfig):
        scheduler_sim_jobs: list[SchedulerSimJob] = []
        power_history: list[SchedulerSimJobPowerHistory] = []

        curr_job_hashes = set()
        tick_jobs = itertools.chain(tick.queue, tick.running, tick.completed, tick.killed)
        for job in tick_jobs:
            job_state_hash = get_job_state_hash(job)
            # Output jobs if something other than time_snapshot changed
            if is_last_tick or job_state_hash not in prev_job_hashes:
                time_end = offset_to_time(job.end_time)
                # end_time is set to its planned end once its scheduled. Set it to None for unfinished jobs here
                if time_end is not None and (job.start_time is None or time_end > timestamp):
                    time_end = None

                parsed_job = SchedulerSimJob.model_validate({
                    "job_id": str(job.id),
                    "name": job.name,
                    "node_count": job.nodes_required,
                    "time_snapshot": timestamp,
                    "time_submission": offset_to_time(job.submit_time),
                    "time_limit": job.time_limit,
                    "time_start": offset_to_time(job.start_time),
                    "time_end": time_end,
                    "state_current": JobStateEnum(job.current_state.name),
                    "nodes": parse_nodes(tuple(job.scheduled_nodes)) if job.scheduled_nodes else None,
                    # How does the new job.power attribute work? Is it total_energy?
                    # Or just the current wattage?
                    # power = job.power,
                })
                scheduler_sim_jobs.append(parsed_job)
            curr_job_hashes.add(job_state_hash)
        prev_job_hashes = curr_job_hashes
        # Only output running jobs when the state changes
        for job in tick.queue:
            # Just use a constant as hash for queued jobs to avoid computing the hash repeatedly for
            # them. This assumes queued jobs don't change any meaningful state until they run
            job_hash = b"queued"
            if is_last_tick or job_hashes.get(job.id) != job_hash:
                scheduler_sim_jobs.append(parse_job(job, timestamp))
                job_hashes[job.id] = job_hash
        for job in tick.running:
            job_hash = get_job_hash(job)
            if is_last_tick or job_hashes.get(job.id) != job_hash:
                scheduler_sim_jobs.append(parse_job(job, timestamp))
                job_hashes[job.id] = job_hash
        for job in itertools.chain(tick.completed, tick.killed):
            scheduler_sim_jobs.append(parse_job(job, timestamp))
            job_hashes.pop(job.id, None)

        for job in itertools.chain(tick.running, tick.completed, tick.killed):
            if power_history_counts.get(job.id, 0) < len(job.power_history):
            if job_power_history_counts.get(job.id, 0) < len(job.power_history):
                power_history.append(SchedulerSimJobPowerHistory(
                    timestamp = timestamp,
                    job_id = str(job.id),
                    power = job.power_history[-1],
                ))
                power_history_counts[job.id] = len(job.power_history)
                job_power_history_counts[job.id] = len(job.power_history)

        cooling_sim_cdus: list[CoolingSimCDU] = []
        cooling_sim_cep: list[CoolingSimCEP] = []