Commit 2c8cdc9a authored by Brewer, Wes's avatar Brewer, Wes
Browse files

Add initial config and dataloader for aurora

parent f87cb7a5
Loading
Loading
Loading
Loading

config/aurora.yaml

0 → 100644
+49 −0
Original line number Diff line number Diff line
system:
  num_cdus: 56
  racks_per_cdu: 3
  nodes_per_rack: 64
  chassis_per_rack: 8
  nodes_per_blade: 1
  switches_per_chassis: 4
  nics_per_node: 4
  rectifiers_per_chassis: 4
  nodes_per_rectifier: 4
  missing_racks: [166, 167]
  down_nodes: []
  cpus_per_node: 2
  gpus_per_node: 6
  cpu_peak_flops: 3.33E12
  gpu_peak_flops: 52.43E12
  cpu_fp_ratio: 0.511
  gpu_fp_ratio: 0.511
power:
  power_gpu_idle: 88
  power_gpu_max: 600
  power_cpu_idle: 90
  power_cpu_max: 350
  power_mem: 74.26
  power_nic: 20
  power_nvme: 30
  power_switch: 250
  power_cdu: 8473.47
  power_update_freq: 15
  rectifier_peak_threshold: 13670
  sivoc_loss_constant: 13
  sivoc_efficiency: 0.98
  rectifier_loss_constant: 17
  rectifier_efficiency: 0.96
  power_cost: 0.094
scheduler:
  job_arrival_time: 100
  mtbf: 11
  trace_quanta: 15
  min_wall_time: 60
  max_wall_time: 43200
  ui_update_freq: 900
  max_nodes_per_job: 3000
  job_end_probs:
    COMPLETED: 0.63
    FAILED: 0.13
    CANCELLED: 0.12
    TIMEOUT: 0.11
    NODE_FAIL: 0.01
+94 −0
Original line number Diff line number Diff line
import pandas as pd
from pathlib import Path
from raps.telemetry import Job, job_dict
from raps.utils import WorkloadData
from datetime import datetime, timezone

""" Download DIM_JOB_COMPOSITE dataset from https://reports.alcf.anl.gov/data/aurora.html """

def load_data(local_dataset_path, **kwargs):
    """
    Aurora dataloader.
    """
    if isinstance(local_dataset_path, list):
        filepath = Path(local_dataset_path[0])
    else:
        filepath = Path(local_dataset_path)

    if not filepath.is_file():
        raise FileNotFoundError(f"File not found: {filepath}")

    print(f"Reading data from {filepath}")

    jobs = []
    chunksize = 10000
    
    required_columns = [
        "COBALT_JOBID", "JOB_NAME", "QUEUED_TIMESTAMP", "START_TIMESTAMP", "END_TIMESTAMP",
        "NODES_REQUESTED", "NODES_USED", "CORES_REQUESTED", "CORES_USED",
        "WALLTIME_SECONDS", "RUNTIME_SECONDS", "USERNAME_GENID", "LOCATION"
    ]

    for chunk in pd.read_csv(filepath, chunksize=chunksize, on_bad_lines='warn'):
        # Drop rows where essential timestamp data is missing
        chunk.dropna(subset=['QUEUED_TIMESTAMP', 'START_TIMESTAMP', 'END_TIMESTAMP'], inplace=True)

        for _, row in chunk.iterrows():
            try:
                submit_time = int(pd.to_datetime(row["QUEUED_TIMESTAMP"]).timestamp())
                start_time = int(pd.to_datetime(row["START_TIMESTAMP"]).timestamp())
                end_time = int(pd.to_datetime(row["END_TIMESTAMP"]).timestamp())

                job = job_dict(
                    id=str(row.get("COBALT_JOBID", "N/A")),
                    name=row.get("JOB_NAME", "N/A"),
                    submit_time=submit_time,
                    start_time=start_time,
                    end_time=end_time,
                    time_limit=int(row.get("WALLTIME_SECONDS", 0)),
                    expected_run_time=int(row.get("RUNTIME_SECONDS", 0)),
                    nodes_required=int(row.get("NODES_REQUESTED", 0)),
                    cpu_cores_required=int(row.get("CORES_REQUESTED", 0)),
                    account=str(row.get("USERNAME_GENID", "N/A")),
                    scheduled_nodes=str(row.get("LOCATION", "")).split(','),
                    # The following are placeholders as they are not in the CSV
                    gpu_trace=0,
                    cpu_trace=0,
                    nrx_trace=[],
                    ntx_trace=[],
                    end_state="COMPLETED",
                    priority=0,
                    current_run_time=0,
                    trace_time=submit_time,
                    trace_start_time=start_time,
                    trace_end_time=end_time,
                    trace_quanta=1,
                )
                jobs.append(Job(job))
            except (ValueError, TypeError) as e:
                print(f"Skipping row due to parsing error: {e}. Row: {row}")
                continue

    if not jobs:
        return WorkloadData(jobs=[], telemetry_start=0, telemetry_end=0, start_date=datetime.now(timezone.utc))

    # Normalize times so first start = 0
    t0 = min((j.start_time for j in jobs), default=0)
    for j in jobs:
        j.submit_time -= t0
        j.start_time -= t0
        j.end_time -= t0
        j.trace_time -= t0
        j.trace_start_time -= t0
        j.trace_end_time -= t0

    telemetry_start = 0
    telemetry_end = max((j.end_time for j in jobs), default=0)
    start_date = datetime.fromtimestamp(t0, timezone.utc)

    return WorkloadData(
        jobs=jobs,
        telemetry_start=telemetry_start,
        telemetry_end=telemetry_end,
        start_date=start_date,
    )
+3 −3
Original line number Diff line number Diff line
@@ -298,7 +298,7 @@ class BasicWorkload:
            net_tx, net_rx = None, None

            # Max test
            cpu_util, gpu_util = 1, 4
            cpu_util, gpu_util = config['CPUS_PER_NODE'], config['GPUS_PER_NODE']
            cpu_trace, gpu_trace = self.compute_traces(cpu_util, gpu_util, 10800, config['TRACE_QUANTA'])

            job_time = len(gpu_trace) * config['TRACE_QUANTA']
@@ -330,7 +330,7 @@ class BasicWorkload:
            jobs.append(job)

            # OpenMxP run
            cpu_util, gpu_util = 0, 4
            cpu_util, gpu_util = 0, config['GPUS_PER_NODE']
            cpu_trace, gpu_trace = self.compute_traces(cpu_util, gpu_util, 3600, config['TRACE_QUANTA'])
            job_time = len(gpu_trace) * config['TRACE_QUANTA']

@@ -361,7 +361,7 @@ class BasicWorkload:
            jobs.append(job)

            # HPL run
            cpu_util, gpu_util = 0.33, 0.79 * 4  # based on 24-01-18 run
            cpu_util, gpu_util = 0.33, 0.79 * config['GPUS_PER_NODE']  # based on 24-01-18 run
            cpu_trace, gpu_trace = self.compute_traces(cpu_util, gpu_util, 3600, config['TRACE_QUANTA'])
            job_time = len(gpu_trace) * config['TRACE_QUANTA']
            job_info = job_dict(