Commit c91a8190 authored by Brewer, Wes's avatar Brewer, Wes
Browse files

Initial config and dataloader for philly traces (dataloader not yet working)

parent 82f348a2
Loading
Loading
Loading
Loading
+8 −0
Original line number Diff line number Diff line
@@ -77,6 +77,14 @@ For MIT Supercloud
    # Reinforcement learning test case
    raps train-rl --system mit_supercloud/part-cpu -f /opt/data/mit_supercloud/202201

Microsoft Azure - 2017 Philly Traces

    # Synthetic
    python main.py run-parts -x philly -w multitenant

    # Telemetry replay
    python main.py run-parts -x philly -f /opt/data/philly/trace-data

For Lumi

    # Synthetic test for Lumi:
+51 −0
Original line number Diff line number Diff line
system:
  num_cdus: 1
  racks_per_cdu: 1
  nodes_per_rack: 321
  chassis_per_rack: 3
  nodes_per_blade: 2
  switches_per_chassis: 4
  nics_per_node: 4
  rectifiers_per_chassis: 4
  nodes_per_rectifier: 4
  missing_racks: []
  down_nodes: []
  cpus_per_node: 2
  cores_per_cpu: 20
  gpus_per_node: 2
  cpu_peak_flops: 1248000000000.0
  gpu_peak_flops: 7800000000000.0
  cpu_fp_ratio: 0.667
  gpu_fp_ratio: 0.667
power:
  power_gpu_idle: 75
  power_gpu_max: 300
  power_cpu_idle: 90
  power_cpu_max: 280
  power_mem: 74.26
  power_nvme: 30
  power_nic: 20
  power_cdu: 8473.47
  power_switch: 250
  power_update_freq: 15
  rectifier_peak_threshold: 13670
  sivoc_loss_constant: 13
  sivoc_efficiency: 0.98
  rectifier_loss_constant: 17
  rectifier_efficiency: 0.96
  power_cost: 0.094
scheduler:
  multitenant: true
  job_arrival_time: 900
  mtbf: 11
  trace_quanta: 20
  min_wall_time: 3600
  max_wall_time: 43200
  ui_update_freq: 900
  max_nodes_per_job: 192
  job_end_probs:
    COMPLETED: 0.63
    FAILED: 0.13
    CANCELLED: 0.12
    TIMEOUT: 0.11
    NODE_FAIL: 0.01
+51 −0
Original line number Diff line number Diff line
system:
  num_cdus: 1
  racks_per_cdu: 1
  nodes_per_rack: 231
  chassis_per_rack: 3
  nodes_per_blade: 2
  switches_per_chassis: 4
  nics_per_node: 4
  rectifiers_per_chassis: 4
  nodes_per_rectifier: 4
  missing_racks: []
  down_nodes: []
  cpus_per_node: 2
  cores_per_cpu: 20
  gpus_per_node: 8
  cpu_peak_flops: 1248000000000.0
  gpu_peak_flops: 7800000000000.0
  cpu_fp_ratio: 0.667
  gpu_fp_ratio: 0.667
power:
  power_gpu_idle: 75
  power_gpu_max: 300
  power_cpu_idle: 90
  power_cpu_max: 280
  power_mem: 74.26
  power_nvme: 30
  power_nic: 20
  power_cdu: 8473.47
  power_switch: 250
  power_update_freq: 15
  rectifier_peak_threshold: 13670
  sivoc_loss_constant: 13
  sivoc_efficiency: 0.98
  rectifier_loss_constant: 17
  rectifier_efficiency: 0.96
  power_cost: 0.094
scheduler:
  multitenant: true
  job_arrival_time: 900
  mtbf: 11
  trace_quanta: 20
  min_wall_time: 3600
  max_wall_time: 43200
  ui_update_freq: 900
  max_nodes_per_job: 192
  job_end_probs:
    COMPLETED: 0.63
    FAILED: 0.13
    CANCELLED: 0.12
    TIMEOUT: 0.11
    NODE_FAIL: 0.01
+116 −0
Original line number Diff line number Diff line
import os
import json
import csv
import datetime
import pandas as pd
import warnings
from raps.job import Job

DATE_FORMAT_STR = "%Y-%m-%d %H:%M:%S"

def parse_date(s):
    if not s or s == "None":
        return None
    # strip possible timezone labels like "PST"/"PDT"
    s = s.replace(" PST", "").replace(" PDT", "")
    return datetime.datetime.strptime(s, DATE_FORMAT_STR)

def load_data(files, **kwargs):
    """
    Load Philly trace into ExaDigiT Job objects.

    Args:
        files (list[str]): A list with one directory path (e.g., ['/opt/data/philly/trace-data']).

    Returns:
        list[Job]
    """
    assert len(files) == 1, "Expecting a single directory path"
    trace_dir = files[0]

    # --- 1. Machine list ---
    machine_file = os.path.join(trace_dir, "cluster_machine_list")
    machines = {}
    with open(machine_file) as f:
        reader = csv.DictReader(f)
        for row in reader:
            mid = row["machineId"]
            machines[mid] = {
                "num_gpus": int(row[" number of GPUs"]),
                "gpu_mem": row[" single GPU mem"].strip()
            }

    # --- 2. CPU util ---
    cpu_file = os.path.join(trace_dir, "cluster_cpu_util")
    cpu_util = pd.read_csv(cpu_file)
    # cpu_util has columns: time, machine_id, cpu_util

    # --- 3. GPU util ---
    gpu_file = os.path.join(trace_dir, "cluster_gpu_util")

    with warnings.catch_warnings(record=True) as wlist:
        gpu_util = pd.read_csv(
            gpu_file,
            engine="python",
            on_bad_lines="skip"
        )

        if wlist:
            warnings.warn(
                f"cluster_gpu_util: skipped {len(wlist)} malformed lines while reading {gpu_file}",
                UserWarning
            )

    # --- 4. Job log ---
    job_file = os.path.join(trace_dir, "cluster_job_log")
    with open(job_file) as f:
        job_log = json.load(f)

    jobs = []
    for raw in job_log:
        jobid = raw.get("jobid")
        user = raw.get("user")
        status = raw.get("status")
        submitted = parse_date(raw.get("submitted_time"))

        attempts = raw.get("attempts", [])
        start, end = None, None
        if attempts:
            start = parse_date(attempts[0].get("start_time"))
            end = parse_date(attempts[-1].get("end_time"))

        wall_time = None
        if start and end:
            wall_time = (end - start).total_seconds()

        # Which machines did this job run on?
        machine_ids = []
        gpus = 0
        if attempts and "detail" in attempts[0]:
            for detail in attempts[0]["detail"]:
                mid = detail["ip"]
                machine_ids.append(mid)
                gpus += len(detail.get("gpus", []))

        # Collect utilization traces for each machine this job touched
        job_cpu = cpu_util[cpu_util["machine_id"].isin(machine_ids)]
        job_gpu = gpu_util[gpu_util["machineId"].isin(machine_ids)]

        job = Job(
            job_id=jobid,
            name=f"philly-{jobid}",
            user=user,
            nodes_required=len(machine_ids) if machine_ids else None,
            wall_time=wall_time,
            start_time=start,
            end_time=end,
            queue_time=submitted,
            scheduled_nodes=machine_ids,
            cpu_trace=job_cpu if not job_cpu.empty else None,
            gpu_trace=job_gpu if not job_gpu.empty else None,
            priority=None,
            end_state=status
        )
        jobs.append(job)

    return jobs