Commit 5e4cea85 authored by Maiterth, Matthias's avatar Maiterth, Matthias
Browse files

Merge branch 'fastsim-parallel-integration' into 'develop'

Fastsim parallel integration

See merge request !108
parents 73f82cdc dad7a59d
Loading
Loading
Loading
Loading

config/kestrel.yaml

0 → 100644
+53 −0
Original line number Diff line number Diff line
system:
  num_cdus: 6
  racks_per_cdu: 6
  nodes_per_rack: 80
  rectifiers_per_rack: 6
  chassis_per_rack: 1
  nodes_per_blade: 1
  switches_per_chassis: 5
  nics_per_node: 2
  rectifiers_per_chassis: 5
  nodes_per_rectifier: 4
  missing_racks: []
  down_nodes: []
  cpus_per_node: 1
  gpus_per_node: 4
  cpu_peak_flops: 396800000000.0
  gpu_peak_flops: 7800000000000.0
  cpu_fp_ratio: 0.69
  gpu_fp_ratio: 0.69

power:
  power_gpu_idle: 75
  power_gpu_max: 300
  power_cpu_idle: 100
  power_cpu_max: 800
  power_mem: 74.26
  power_nic: 21
  power_nvme: 45
  power_switch: 250
  power_cdu: 0
  power_update_freq: 20
  rectifier_peak_threshold: 13670
  sivoc_loss_constant: 0
  sivoc_efficiency: 1
  rectifier_loss_constant: 0
  rectifier_efficiency: 1
  power_cost: 0.094

scheduler:
  seed: 42
  job_arrival_time: 20
  mtbf: 11
  trace_quanta: 20
  min_wall_time: 3600
  max_wall_time: 43200
  ui_update_freq: 3600
  max_nodes_per_job: 3000
  job_end_probs:
    COMPLETED: 0.63
    FAILED: 0.13
    CANCELLED: 0.12
    TIMEOUT: 0.11
    NODE_FAIL: 0.01
 No newline at end of file
+170 −0
Original line number Diff line number Diff line
"""
    Load data for NREL's Kestrel cluster.
"""
import uuid
import pandas as pd
from tqdm import tqdm

from ..job import job_dict, Job
from ..utils import power_to_utilization, next_arrival


def load_data(jobs_path, **kwargs):
    """
    Reads job and job profile data from parquet files and parses them.

    Parameters
    ----------
    jobs_path : str
        The path to the jobs parquet file.

    Returns
    -------
    list
        The list of parsed jobs.
    """
    jobs_df = pd.read_parquet(jobs_path, engine='pyarrow')
    return load_data_from_df(jobs_df, **kwargs)


def load_data_from_df(jobs_df: pd.DataFrame, **kwargs):
    """
    Reads job and job profile data from parquet files and parses them.

    Requires the following fields in the DataFrame:
    - start_time (timestamp): Time execution begins (actual or expected) 	
    - job_id (int): Job ID
    - node_power_consumption (List[int]): Power consumption of the job, recorded at Node level
    - nodes_required (int): Number of nodes allocated to the job
    - cpu_power_consumption (List[int]): Power consumption of the job, recorded at CPU level (don't have this)
    - mem_power_consumption (List[int]): Power consumption of the job, recorded at Memory level (don't have this)
    - priority (int): Relative priority of the job, 0=held, 1=required nodes DOWN/DRAINED
    - job_state (string): State of the job, see enum job_states for possible values
    - wall_time (int): Actual runtime of job, in seconds
    - nodes (string): List of nodes allocated to job

    Returns
    -------
    list
        The list of parsed jobs.
    """
    config = kwargs.get('config')
    min_time = kwargs.get('min_time', None)
    reschedule = kwargs.get('reschedule')
    fastforward = kwargs.get('fastforward')
    validate = kwargs.get('validate')
    jid = kwargs.get('jid', '*')

    if fastforward: print(f"fast-forwarding {fastforward} seconds")

    # Sort jobs dataframe based on values in time_start column, adjust indices after sorting
    jobs_df = jobs_df.sort_values(by='submit_time')
    jobs_df = jobs_df[(jobs_df.start_time.between(pd.to_datetime('2024-09-01T00:00:00'), 
                                                 pd.to_datetime('2024-09-16T00:00:00'), inclusive='right') | 
                        jobs_df.end_time.between(pd.to_datetime('2024-09-01T00:00:00'), 
                                                 pd.to_datetime('2024-09-16T00:00:00'), inclusive='right') 
                                                 )].copy()
    jobs_df = jobs_df.reset_index(drop=True)

    telemetry_start_timestamp = jobs_df['start_time'].min()
    telemetry_end_timestamp = jobs_df['end_time'].max()
    telemetry_start = 0
    telemetry_end = int((telemetry_end_timestamp - telemetry_start_timestamp).total_seconds())

    # Take earliest time as baseline reference
    # We can use the start time of the first job.
    if min_time:
        time_zero = min_time
    else:
        time_zero = jobs_df['submit_time'].min()

    num_jobs = len(jobs_df)
    print("time_zero:", time_zero, "num_jobs", num_jobs)

    jobs = []

    # Map dataframe to job state. Add results to jobs list
    for jidx in tqdm(range(num_jobs - 1), total=num_jobs, desc="Processing Kestrel Jobs"):

        job_id = jobs_df.loc[jidx, 'job_id']
        account = jobs_df.loc[jidx, 'account']

        if not jid == '*': 
            if int(jid) == int(job_id): 
                print(f'Extracting {job_id} profile')
            else:
                continue
        nodes_required = jobs_df.loc[jidx, 'nodes_required']

        name = str(uuid.uuid4())[:6]
            
        if validate:
            cpu_power = jobs_df.loc[jidx, 'power_per_node']
            cpu_trace = cpu_power

        else:                
            cpu_power = jobs_df.loc[jidx, 'power_per_node']
            cpu_power_array = [600] if (pd.isna(cpu_power) or cpu_power == 0) else cpu_power.tolist()
            cpu_min_power = nodes_required * config['POWER_CPU_IDLE'] * config['CPUS_PER_NODE']
            cpu_max_power = nodes_required * config['POWER_CPU_MAX'] * config['CPUS_PER_NODE']
            cpu_util = power_to_utilization(cpu_power_array, cpu_min_power, cpu_max_power)
            cpu_trace = cpu_util * config['CPUS_PER_NODE']
            gpu_trace = 0
                
        # Priority sorting doesn't seem to be implemented at the moment
        priority = 0
            
        wall_time = jobs_df.loc[jidx, 'wall_time']
        end_state = jobs_df.loc[jidx, 'job_state']
        time_submit = jobs_df.loc[jidx+1, 'submit_time']
        diff = time_submit - time_zero

        if jid == '*': 
            time_offset = max(diff.total_seconds(), 0)
        else:
            # When extracting out a single job, run one iteration past the end of the job
            time_offset = config['UI_UPDATE_FREQ']

        if fastforward: time_offset -= fastforward

        if reschedule: # Let the scheduler reschedule the jobs
            scheduled_nodes = None
            time_offset = next_arrival(1/config['JOB_ARRIVAL_TIME'])
        else: # Prescribed replay
            scheduled_nodes = None
            time_offset = next_arrival(1/config['JOB_ARRIVAL_TIME'])

        trace_quanta = config['TRACE_QUANTA']
            
        if cpu_trace.size > 0 and time_offset >= 0:
            job_info = job_dict(nodes_required = nodes_required, 
                                name = name, 
                                account = account, 
                                cpu_trace = cpu_trace, 
                                gpu_trace = gpu_trace, 
                                ntx_trace = [], 
                                nrx_trace = [],
                                end_state = end_state, 
                                scheduled_nodes = scheduled_nodes,
                                id = job_id, 
                                priority = priority, 
                                submit_time = time_offset, 
                                time_limit = wall_time,
                                trace_quanta=trace_quanta)
            jobs.append(Job(job_info))

    return jobs, telemetry_start, telemetry_end


def node_index_to_name(index: int, config: dict):
    """ Converts an index value back to an name string based on system configuration. """
    return f"node{index:04d}"


def cdu_index_to_name(index: int, config: dict):
    return f"cdu{index:02d}"


def cdu_pos(index: int, config: dict) -> tuple[int, int]:
    """ Return (row, col) tuple for a cdu index """
    return (0, index) # TODO
 No newline at end of file
+19 −9
Original line number Diff line number Diff line
@@ -42,6 +42,7 @@ from raps.weather import Weather
from raps.sim_config import SimConfig
from raps.system_config import SystemConfig

from bisect import bisect_right

@dataclasses.dataclass
class TickData:
@@ -413,7 +414,6 @@ class Engine:
            self.power_manager.set_idle(job.scheduled_nodes)
            job.current_state = JobState.COMPLETED
            job.end_time = self.current_timestep

            self.running.remove(job)
            self.jobs_completed += 1
            job_stats = job.statistics()
@@ -424,7 +424,7 @@ class Engine:
            self.resource_manager.free_nodes_from_job(job)

        killed_jobs = [job for job in self.running if
                       job.start_time + job.time_limit <= self.current_timestep]
                       job.end_time is not None and job.start_time + job.time_limit <= self.current_timestep]

        for job in killed_jobs:
            self.power_manager.set_idle(job.scheduled_nodes)
@@ -489,7 +489,8 @@ class Engine:
                          actively_considered_jobs: List,
                          all_jobs: List,
                          replay: bool,
                          autoshutdown: bool):
                          autoshutdown: bool,
                          cursor: int):
        # 1 update running time of all running jobs
        # 2 update the current_timestep of the engine (this serves as reference for most computations)
        # 3 Check if simulation should shutdown
@@ -504,7 +505,7 @@ class Engine:
           len(self.queue) == 0 and \
           len(self.running) == 0 and \
           not replay and \
           len(all_jobs) == 0 and \
           len(all_jobs) == cursor and \
           len(actively_considered_jobs) == 0:
            if self.debug:
                print(f"Simulaiton completed early: {self.config['system_name']} - "
@@ -555,10 +556,10 @@ class Engine:
            job.running_time = self.current_timestep - job.start_time

            if job.current_state != JobState.RUNNING:
                raise ValueError(f"Job is in running list, but state is not RUNNING: job.state == {job.currentstate}")
                raise ValueError(f"Job {job.id} is in running list, but state is not RUNNING: job.state == {job.current_state}")
            else:  # if job.state == JobState.RUNNING:
                # Error checks
                if job.running_time > job.time_limit:
                if job.running_time > job.time_limit and job.end_time is not None:
                    raise Exception(f"Job exceded time limit! "
                                    f"{job.running_time} > {job.time_limit}"
                                    f"\n{job}"
@@ -754,6 +755,9 @@ class Engine:

        # Process jobs in batches for better performance of timestep loop
        all_jobs = jobs.copy()
        submit_times = [j.submit_time for j in all_jobs]
        cursor = 0

        jobs = []
        # Batch Jobs into 6h windows based on submit_time or twice the time_delta if larger
        batch_window = max(60 * 60 * 6, 2 * time_delta)  # at least 6h
@@ -772,8 +776,13 @@ class Engine:

            if (self.current_timestep % batch_window == 0) or (self.current_timestep == timestep_start):
                # Add jobs that are within the batching window and remove them from all jobs
                jobs += [job for job in all_jobs if job.submit_time <= self.current_timestep + batch_window]
                all_jobs[:] = [job for job in all_jobs if job.submit_time > self.current_timestep + batch_window]
                # jobs += [job for job in all_jobs if job.submit_time <= self.current_timestep + batch_window]
                # all_jobs[:] = [job for job in all_jobs if job.submit_time > self.current_timestep + batch_window]
                cutoff = self.current_timestep + batch_window
                r = bisect_right(submit_times, cutoff, lo=cursor)
                if r > cursor:
                    jobs.extend(all_jobs[cursor:r])
                    cursor = r

            # 1. Prepare Timestep:
            completed_jobs, killed_jobs, newly_downed_nodes, need_reschedule = \
@@ -812,7 +821,8 @@ class Engine:
            simulation_done = self.complete_timestep(actively_considered_jobs=jobs,
                                                     all_jobs=all_jobs,
                                                     replay=replay,
                                                     autoshutdown=autoshutdown)
                                                     autoshutdown=autoshutdown,
                                                     cursor=cursor)
            if simulation_done:
                break
            yield tick_data
+2 −1
Original line number Diff line number Diff line
@@ -65,7 +65,8 @@ class ExclusiveNodeResourceManager:
                if n not in self.available_nodes:
                    self.available_nodes.append(n)
                else:
                    raise KeyError(f"node was free but already in available nodes: {n.id}")
                    raise KeyError((f"Atempting to free node {n} after completion of job {job.id}. " +
                                     "Node is already free (in available nodes)!"))
            self.available_nodes = sorted(self.available_nodes)

    def update_system_utilization(self, current_time, running_jobs):
+163 −0
Original line number Diff line number Diff line
import pandas as pd
import sys
import os
import zmq

from ..policy import PolicyType, BackfillType
from raps.telemetry import Telemetry
from ..job import JobState
from raps.sim_config import args
from raps.system_config import get_system_config

# Run with this command:
# python main.py --system kestrel -f ../data/fastsim_jobs_output.parquet --scheduler fastsim --policy priority --start 2024-09-01T00:00 --end 2024-09-15T00:00

class Scheduler():
    """
    FastSim-backed scheduler (strict lockstep via ZeroMQ).

    Protocol (server side is FastSim --serve):
      - INIT                       -> { init_time }
      - GET { t }                  -> { t, running_ids }  (server acks t after reply)
      - END (on shutdown)          -> { ok: true }

    Semantics at engine second t:
      - R_t := authoritative running IDs from FastSim for t
      - started  = R_t - prev_R
        -> stamp start_time=t (once), assign nodes once, mark RUNNING
      - finished = prev_R - R_t
        -> stamp end_time=t (engine will finalize next tick in prepare_timestep)

      running list for this tick = R_t & finished  (so those finishing at t remain
      visible for one more scheduler call; engine completes them on next second).
    """

    def __init__(self, config, resource_manager, **kwargs):
        self.config = config
        self.policy = PolicyType(kwargs.get('policy'))
        self.bfpolicy = BackfillType(kwargs.get('backfill'))
        self.debug = bool(kwargs.get('debug', False))

        # ---- ZeroMQ client ----
        self.endpoint = kwargs.get('plugin_endpoint', 'ipc:///tmp/fastsim.sock')
        self._ctx = zmq.Context.instance()
        self._sock = self._ctx.socket(zmq.REQ)
        self._sock.setsockopt(zmq.LINGER, 0)
        self._sock.connect(self.endpoint)

        # INIT handshake: fetch FastSim's init_time (ISO string).
        self.init_time_iso = self._rpc('INIT').get('init_time')

        self.resource_manager = resource_manager

        # Job metadata: id -> Job
        self.jobids_to_jobs = {}
        self.allocated_jobs = set()   # job_ids we have assigned nodes for
        self.prev_running_ids = set() # R_{t-1}

        # Build the Job objects from RAPS Telemetry (needed so ExaDigiT subsystems have objects)
        args_dict = vars(args)
        config = get_system_config(args.system).get_legacy()
        args_dict['config'] = config
        td = Telemetry(**args_dict)

        print("...Now loading jobs to FastSim scheduler.")
        jobs, _, _ = td.load_data(args.replay)
        for job in jobs:
            self.jobids_to_jobs[job.id] = job

        if self.debug:
            print(f"[RAPS-FastSim] Connected to {self.endpoint}; init_time={self.init_time_iso}", file=sys.stderr)

    def _rpc(self, op, **payload):
        """Send a JSON request and return the JSON reply (dict)."""
        try:
            msg = {'op': op}
            msg.update(payload)
            self._sock.send_json(msg)
            rep = self._sock.recv_json()
        except Exception as e:
            raise RuntimeError(f"[RAPS-FastSim] RPC {op} failed: {e}") from e
        if isinstance(rep, dict) and 'error' in rep:
            raise RuntimeError(f"[RAPS-FastSim] RPC {op} error: {rep['error']}")
        return rep

    def _fastsim_running_ids(self, t: int):
        """Blocking call: get authoritative running job IDs for second t."""
        rep = self._rpc('GET', t=int(t))
        rids = rep.get('running_ids', [])
        return set(rids)

    def schedule(self, queue=None, running=None, current_time=None, accounts=None, sorted=False):
        """
        Called by Engine when RAPS detects an event.
        """
        running = running if running is not None else []

        t = int(current_time)

        # Get authoritative running set for second t (blocks until available)
        R_t = self._fastsim_running_ids(t)

        # Diff vs previous second
        started_ids  = R_t - self.prev_running_ids
        finished_ids = self.prev_running_ids - R_t  # these end at t; engine finalizes next tick

        # Handle starts: stamp start_time, assign nodes, mark RUNNING
        for jid in started_ids:
            job = self.jobids_to_jobs.get(jid)
            if job is None:
                if self.debug:
                    print(f"[RAPS-FastSim][WARN] Unknown job id from FastSim: {jid}", file=sys.stderr)
                continue

            # Assign nodes exactly once
            if jid not in self.allocated_jobs:
                self.resource_manager.assign_nodes_to_job(job, t, self.policy)
                self.allocated_jobs.add(jid)

            # FastSim is authoritative
            job.start_time = t
            # IMPORTANT: prevent premature completion by RM’s default behavior
            job.end_time = None # Prevents RAPS from removing job
            job.state = JobState.RUNNING

        # Handle finishes: stamp end_time=t (engine.prepare_timestep next tick completes)
        running.clear()
        for jid in finished_ids:
            job = self.jobids_to_jobs.get(jid)
            if job is not None:
                # overwrite any prior value; FastSim is the source of truth
                # job.end_time = t
                if job.start_time is not None:
                    observed = t - job.start_time
                    if (job.time_limit is None) or (job.time_limit < observed):
                        # This is necessary since RAPS is handling finishing jobs, but schedule is not always
                        # called at every tick, even though the job may have finished in FastSim during that tick.
                        # TODO: Deal with this, because it messes up the end time of some jobs.
                        # print(f"Extending {job.id} runtime {job.time_limit} to match observed {observed} at finish.")
                        job.time_limit = observed
                # print((f"Job {job.id} is finished, start time: {job.start_time}, wall time: {job.time_limit},"  
                #        f"end time: {job.end_time}, at time {t}. With nodes {job.scheduled_nodes}."))
                job.end_time = t
                job.time_limit = t - job.start_time
                running.append(job)

        # Running list reflects exactly FastSim’s R_t
        for jid in R_t:
            job = self.jobids_to_jobs.get(jid)
            if job is not None:
                # defensively ensure state isn’t stuck at COMPLETED
                if job.state != JobState.RUNNING:
                    job.state = JobState.RUNNING
                running.append(job)

        # Update prev
        self.prev_running_ids = R_t

    def end_sim(self):
        # Ask server to stop
        try:
            self._rpc('END')
        except Exception:
            pass
 No newline at end of file
Loading