Commit cdff947d authored by Brewer, Wes's avatar Brewer, Wes
Browse files

Add docstring to top of file and work on dataloader performance enhancements

parent 80b29996
Loading
Loading
Loading
Loading
+128 −62
Original line number Diff line number Diff line
"""
Main reference to Philly traces:

    Jeon, Myeongjae, et al. "Analysis of Large-Scale Multi-Tenant GPU clusters for DNN training workloads." 
    2019 USENIX Annual Technical Conference (USENIX ATC 19). 2019.
    https://www.usenix.org/system/files/atc19-jeon.pdf

The repository is available here:

    https://github.com/msr-fiddle/philly-traces

The data portion of the repo can be downloaded using one of the following methods:

    git clone https://github.com/msr-fiddle/philly-traces.git
    cd philly-traces
    git lfs pull

    wget https://github.com/msr-fiddle/philly-traces/raw/master/trace-data.tar.gz

    curl -L -o trace-data.tar.gz https://github.com/msr-fiddle/philly-traces/raw/master/trace-data.tar.gz

Once the file is downloaded, assuming its in /opt/data/philly/trace-data directory:

    /opt/data/philly/trace-data/trace-data.tar.gz

    cd /opt/data/philly/trace-data

    run `tar xvfz trace-data.tar.gz` which will unpack the following files:

        cluster_cpu_util     1.5G
        cluster_gpu_util     2.8G
        cluster_mem_util     2.2G
        cluster_job_log      37M
        cluster_machine_list 8K

    then run the following:

        python /path/to/raps/scripts/parse_philly_traces.py cluster_cpu_util
        python /path/to/raps/scripts/parse_philly_traces.py cluster_gpu_util

    this will parse these two files into two directories, cpu_by_day and gpu_by_day,
    creating one file for each day and adding the lines for that day into the files.

    sanity checks:

        wc -l cluster_cpu_util
         45028261 cluster_cpu_util
        wc -l cpu_by_day/*.csv
         45350898 total

        wc -l cluster_gpu_util
         44750641 cluster_gpu_util
        wc -l gpu_by_day/*.csv
         44750640 total

Running a replay simulation:

    python main.py run-parts -x philly -f /opt/data/philly/trace-data --start 2017-10-03T00:00 --end 2017-10-04T00:00

One the dataloader has been run at least once, it will dump npz files into a directory, so 
they can be replayed again without having to go through the expensive extractoin process, using e.g.:

    python main.py run-parts -x philly -f raps-output-5efefa3

Note: it is possible to run simulations for an user-defined length of time between 10/3/2017 to 12/15/2017.

"""
import os
import glob
import json
import csv
import pandas as pd
import warnings

from datetime import datetime, timezone, timedelta
from tqdm import tqdm
@@ -45,7 +111,7 @@ def parse_timestamp(val):

def load_traces_by_day(trace_dir, start_dt, end_dt, colname):
    """Load CPU or GPU traces between start_dt and end_dt."""
    frames = []
    traces = {}
    current = start_dt.date()

    while current <= end_dt.date():
@@ -64,15 +130,15 @@ def load_traces_by_day(trace_dir, start_dt, end_dt, colname):
            # Convert util column to numeric (NA/invalid → NaN)
            df[colname] = pd.to_numeric(df[colname], errors="coerce")

            frames.append(df)
            traces[current] = df
        else:
            print(f"⚠ No trace file for {current}")
        current += timedelta(days=1)

    if not frames:
        return pd.DataFrame(columns=["time", "machineId", colname])
    if not traces:
        return {}

    return pd.concat(frames, ignore_index=True)
    return traces

def parse_date(s):
    if not s or s == "None":
@@ -91,13 +157,16 @@ def load_data(files, **kwargs):
    Returns:
        list[Job]
    """
    debug = kwargs.get("debug")
    print("started reading of philly traces... please be patient...", flush=True)

    # extract --start from kwargs
    start_ts = to_epoch(kwargs.get("start", DEFAULT_START))
    end_ts = to_epoch(kwargs.get("end", DEFAULT_END))

    assert len(files) == 1, "Expecting a single directory path"
    trace_dir = files[0]
    gpu_trace_dir = os.path.join(files[0], "dist", "gpu_by_day")
    gpu_trace_dir = os.path.join(files[0], "gpu_by_day")
    config = kwargs.get('config')
    gpus_per_node = config.get("GPUS_PER_NODE")
    if gpus_per_node is None:
@@ -127,19 +196,10 @@ def load_data(files, **kwargs):
    # Assign partition ID (e.g. 0 for 2-GPU, 1 for 8-GPU)
    partition_id = 0 if gpus_per_node == 2 else 1

    # --- 2. CPU util ---
    cpu_file = os.path.join(trace_dir, "cluster_cpu_util")
    cpu_util = pd.read_csv(cpu_file)
    cpu_util["time"] = cpu_util["time"].str.replace(" PST","").str.replace(" PDT","")
    cpu_util["time"] = cpu_util["time"].apply(parse_date)

    # --- 3. GPU util ---
    start_dt = datetime.fromtimestamp(start_ts)
    end_dt   = datetime.fromtimestamp(end_ts)

    cpu_trace_dir = os.path.join(trace_dir, "dist", "cpu_by_day")
    gpu_trace_dir = os.path.join(trace_dir, "dist", "gpu_by_day")

    # --- 4. Job log ---
    job_file = os.path.join(trace_dir, "cluster_job_log")
    with open(job_file) as f:
@@ -174,14 +234,6 @@ def load_data(files, **kwargs):
    # --- First pass: find earliest submit time ---
    start_ts = None

    ### debug
    print("num jobs found", len(job_log))
    for job in job_log[:100]:
        print(f"Job {job['jobid']}:")
        for attempt in job["attempts"]:
            print("  Start:", attempt["start_time"])
    ### end debug

    for raw in job_log:
        submitted = raw.get("submitted_time")
        if submitted is None or submitted == "None":
@@ -196,12 +248,15 @@ def load_data(files, **kwargs):
        if start_ts is None or t < start_ts:
            start_ts = t

        # debug
        print(f"Job {job['jobid']}: submit_time {submitted}, start_ts: {start_ts}")

    if start_ts is None:
        raise ValueError("No valid submitted_time found in Philly traces")

    # --- Pre-load all traces for the given date range ---
    cpu_trace_dir = os.path.join(trace_dir, "cpu_by_day")
    gpu_trace_dir = os.path.join(trace_dir, "gpu_by_day")
    all_cpu_traces = load_traces_by_day(cpu_trace_dir, start_dt, end_dt, "cpu_util")
    all_gpu_traces = load_traces_by_day(gpu_trace_dir, start_dt, end_dt, "gpu_util")

    # --- Second pass: build jobs ---
    jobs_list = []
    for raw in tqdm(job_log, desc="Building Job objects"):
@@ -252,42 +307,55 @@ def load_data(files, **kwargs):
        # --- absolute datetimes (used for filtering traces) ---
        submitted_dt = parse_timestamp(raw.get("submitted_time"))

        # Clamp to global CLI window - this should be fixed later to include the actual
        # trace start and end times (trace_start_time? and trace_end_time?) 
        #job_start = max(start, start_dt) if start else start_dt
        #job_end   = min(end, end_dt) if end else end_dt
        job_start = start
        job_end = end

        # CPU utilization traces
        cpu_trace = load_traces_by_day(cpu_trace_dir, job_start, job_end, "cpu_util")
        if not job_start or not job_end:
            continue

        # --- CPU utilization traces ---
        cpu_dfs = []
        current_date = job_start.date()
        while current_date <= job_end.date():
            if current_date in all_cpu_traces:
                cpu_dfs.append(all_cpu_traces[current_date])
            current_date += timedelta(days=1)

        if not cpu_dfs:
            job_cpu_trace = []
        else:
            job_cpu_df = pd.concat(cpu_dfs, ignore_index=True)
            mask = (
            (cpu_trace["machineId"].isin(machine_ids)) &
            (cpu_trace["time"] >= start) &
            (cpu_trace["time"] <= end)
                (job_cpu_df["machineId"].isin(machine_ids)) &
                (job_cpu_df["time"] >= start) &
                (job_cpu_df["time"] <= end)
            )
        job_cpu = cpu_trace.loc[mask].copy()
            job_cpu = job_cpu_df.loc[mask].copy()

        # Aggregate across machines if >1 machine
            if len(machine_ids) > 1:
                job_cpu = job_cpu.groupby("time")["cpu_util"].mean().reset_index()

        # Convert from percentage to fraction
            job_cpu_trace = (job_cpu["cpu_util"].to_numpy() * 0.01).tolist()

        # Extract GPU utilization traces
        gpu_trace = load_traces_by_day(gpu_trace_dir, job_start, job_end, "gpu_util")
        # --- GPU utilization traces ---
        gpu_dfs = []
        current_date = job_start.date()
        while current_date <= job_end.date():
            if current_date in all_gpu_traces:
                gpu_dfs.append(all_gpu_traces[current_date])
            current_date += timedelta(days=1)

        if not gpu_dfs:
            job_gpu_trace = []
        else:
            job_gpu_df = pd.concat(gpu_dfs, ignore_index=True)
            mask = (
            (gpu_trace["machineId"].isin(machine_ids)) &
            (gpu_trace["time"] >= start) &
            (gpu_trace["time"] <= end)
                (job_gpu_df["machineId"].isin(machine_ids)) &
                (job_gpu_df["time"] >= start) &
                (job_gpu_df["time"] <= end)
            )
        # Convert traces from percent to fraction of gpus_per_node, e.g., 8 gpus at 100% is 8, at 50% is 4, etc.
        job_gpu = gpu_trace.loc[mask].copy()
            job_gpu = job_gpu_df.loc[mask].copy()

        # Aggregate across machines if >1 machine
            if len(machine_ids) > 1:
                job_gpu = job_gpu.groupby("time")["gpu_util"].mean().reset_index()

@@ -301,10 +369,7 @@ def load_data(files, **kwargs):
            end_time = end.timestamp() - start_ts if end else None

            if not submit_time or not start_time or not end_time:
                warnings.warn(
                    f"skipped {jobid} b/c missing submit_time, start_time, or end_time",
                    UserWarning
                )
                tqdm.write(f"skipped {jobid} b/c missing submit_time, start_time, or end_time")
                continue
       
            scheduled_nodes = [node_mapping[mid] for mid in machine_ids if mid in node_mapping]
@@ -350,6 +415,7 @@ def load_data(files, **kwargs):
                else:
                    tqdm.write(f"skipping {job['id']} b/c either no cpu or gpu trace")

            if debug:
                tqdm.write(f"abs start time: {start_ts} rel job start: {job['start_time']} rel job end: {job['end_time']}")

    return WorkloadData(