Commit 63ffbc7f authored by Brewer, Wes's avatar Brewer, Wes
Browse files

Refactor to improve pylint score. Move to using absolute times for jobs.

parent f397377b
Loading
Loading
Loading
Loading
+75 −64
Original line number Diff line number Diff line
"""
This is the dataloader for the Philly traces which is documented in this paper:

    Jeon, Myeongjae, et al. "Analysis of Large-Scale Multi-Tenant GPU clusters for DNN training workloads." 
    2019 USENIX Annual Technical Conference (USENIX ATC 19). 2019.
    https://www.usenix.org/system/files/atc19-jeon.pdf
    Jeon, Myeongjae, et al. "Analysis of Large-Scale Multi-Tenant GPU clusters
    for DNN training workloads." 2019 USENIX Annual Technical Conference
    (USENIX ATC 19). 2019. https://www.usenix.org/system/files/atc19-jeon.pdf

Note on hardware specs:

@@ -28,9 +28,10 @@ The data portion of the repo can be downloaded using one of the following method

    wget https://github.com/msr-fiddle/philly-traces/raw/master/trace-data.tar.gz

    curl -L -o trace-data.tar.gz https://github.com/msr-fiddle/philly-traces/raw/master/trace-data.tar.gz
    curl -L -o trace-data.tar.gz \
            https://github.com/msr-fiddle/philly-traces/raw/master/trace-data.tar.gz

Once the file is downloaded, assuming its in /opt/data/philly/trace-data directory:
After the file is downloaded, assuming its in /opt/data/philly/trace-data directory:

    /opt/data/philly/trace-data/trace-data.tar.gz

@@ -66,32 +67,38 @@ Once the file is downloaded, assuming its in /opt/data/philly/trace-data directo

Running a replay simulation:

    python main.py run-parts -x philly -f /opt/data/philly/trace-data --start 2017-10-03T00:00 --end 2017-10-04T00:00
    python main.py run-parts -x philly -f /opt/data/philly/trace-data \
            --start 2017-10-03T00:00 --end 2017-10-04T00:00

One the dataloader has been run at least once, it will dump npz files into a directory, so 
they can be replayed again without having to go through the expensive extractoin process, using e.g.:
Once the dataloader has been run at least once, it will dump npz files into a directory,
so they can be replayed again without having to go through the expensive extractoin process,
using e.g.:

    python main.py run-parts -x philly -f raps-output-5efefa3

Note: it is possible to run simulations for an user-defined length of time between 10/3/2017 to 12/15/2017.
Note: it is possible to run simulations for an user-defined length of time between
10/3/2017 to 12/15/2017.

"""
import os
import glob
import json

import csv
import pandas as pd
import json
import os
from datetime import datetime, timedelta, timezone

from datetime import datetime, timezone, timedelta
import pandas as pd
from tqdm import tqdm
from raps.job import job_dict, Job

from raps.job import Job, job_dict
from raps.utils import WorkloadData

DATE_FORMAT_STR = "%Y-%m-%d %H:%M:%S"
DEFAULT_START = "2017-10-03T00:00"
DEFAULT_END = "2017-10-04T00:00"


def to_epoch(ts_str):
    """Convert a timestamp string or int/float into epoch seconds."""
    if ts_str is None:
        return None
    if isinstance(ts_str, (int, float)):
@@ -102,6 +109,7 @@ def to_epoch(ts_str):
        dt = datetime.strptime(ts_str, DATE_FORMAT_STR)
    return int(dt.timestamp())


def parse_timestamp(val):
    """
    Convert Philly job log timestamps to datetime.
@@ -120,6 +128,7 @@ def parse_timestamp(val):
            return None
    return None


def load_traces_by_day(trace_dir, start_dt, end_dt, colname):
    """Load CPU or GPU traces between start_dt and end_dt."""
    traces = {}
@@ -136,7 +145,9 @@ def load_traces_by_day(trace_dir, start_dt, end_dt, colname):

            # Normalize time column (strip PST/PDT, parse datetime)
            df["time"] = df["time"].str.replace(" PST", "").str.replace(" PDT", "")
            df["time"] = pd.to_datetime(df["time"], errors="coerce", format=DATE_FORMAT_STR)
            df["time"] = pd.to_datetime(
                df["time"], errors="coerce", format=DATE_FORMAT_STR
            )

            # Convert util column to numeric (NA/invalid → NaN)
            df[colname] = pd.to_numeric(df[colname], errors="coerce")
@@ -151,13 +162,16 @@ def load_traces_by_day(trace_dir, start_dt, end_dt, colname):

    return traces


def parse_date(s):
    """Parse a Philly trace date string into a datetime object."""
    if not s or s == "None":
        return None
    # strip possible timezone labels like "PST"/"PDT"
    s = s.replace(" PST", "").replace(" PDT", "")
    return datetime.strptime(s, DATE_FORMAT_STR)


def load_data(files, **kwargs):
    """
    Load Philly trace into ExaDigiT Job objects.
@@ -178,7 +192,7 @@ def load_data(files, **kwargs):
    assert len(files) == 1, "Expecting a single directory path"
    trace_dir = files[0]
    gpu_trace_dir = os.path.join(files[0], "gpu_by_day")
    config = kwargs.get('config')
    config = kwargs.get("config")
    gpus_per_node = config.get("GPUS_PER_NODE")
    if gpus_per_node is None:
        raise ValueError("Must pass gpus_per_node (2 or 8)")
@@ -186,23 +200,23 @@ def load_data(files, **kwargs):
    # --- 1. Machine list ---
    machine_file = os.path.join(trace_dir, "cluster_machine_list")
    machines = {}
    with open(machine_file) as f:
    with open(machine_file, encoding="utf-8") as f:
        reader = csv.DictReader(f)
        for row in reader:
            mid = row["machineId"]
            machines[mid] = {
                "num_gpus": int(row[" number of GPUs"]),
                "gpu_mem": row[" single GPU mem"].strip()
                "gpu_mem": row[" single GPU mem"].strip(),
            }

    partition_machines = {
        mid: info for mid, info in machines.items()
        if info["num_gpus"] == gpus_per_node
        mid: info for mid, info in machines.items() if info["num_gpus"] == gpus_per_node
    }

    # Build node → index mapping for this partition
    node_mapping = {mid: idx for idx, mid in enumerate(sorted(partition_machines.keys()))}
    max_nodes = len(node_mapping)
    node_mapping = {
        mid: idx for idx, mid in enumerate(sorted(partition_machines.keys()))
    }

    # Assign partition ID (e.g. 0 for 2-GPU, 1 for 8-GPU)
    partition_id = 0 if gpus_per_node == 2 else 1
@@ -213,7 +227,7 @@ def load_data(files, **kwargs):

    # --- 4. Job log ---
    job_file = os.path.join(trace_dir, "cluster_job_log")
    with open(job_file) as f:
    with open(job_file, encoding="utf-8") as f:
        job_log = json.load(f)

    # --- First pass: filter jobs by date range ---
@@ -237,7 +251,9 @@ def load_data(files, **kwargs):
            attempts = raw.get("attempts", [])
            if attempts and "detail" in attempts[0]:
                # Count GPUs from the first detail
                gpus = sum(len(detail.get("gpus", [])) for detail in attempts[0]["detail"])
                gpus = sum(
                    len(detail.get("gpus", [])) for detail in attempts[0]["detail"]
                )
                if gpus > 0 and (gpus % gpus_per_node == 0):
                    filtered_log.append(raw)
        job_log = filtered_log
@@ -337,9 +353,9 @@ def load_data(files, **kwargs):
        else:
            job_cpu_df = pd.concat(cpu_dfs, ignore_index=True)
            mask = (
                (job_cpu_df["machineId"].isin(machine_ids)) &
                (job_cpu_df["time"] >= start) &
                (job_cpu_df["time"] <= end)
                (job_cpu_df["machineId"].isin(machine_ids))
                & (job_cpu_df["time"] >= start)
                & (job_cpu_df["time"] <= end)
            )
            job_cpu = job_cpu_df.loc[mask].copy()

@@ -361,29 +377,33 @@ def load_data(files, **kwargs):
        else:
            job_gpu_df = pd.concat(gpu_dfs, ignore_index=True)
            mask = (
                (job_gpu_df["machineId"].isin(machine_ids)) &
                (job_gpu_df["time"] >= start) &
                (job_gpu_df["time"] <= end)
                (job_gpu_df["machineId"].isin(machine_ids))
                & (job_gpu_df["time"] >= start)
                & (job_gpu_df["time"] <= end)
            )
            job_gpu = job_gpu_df.loc[mask].copy()

            if len(machine_ids) > 1:
                job_gpu = job_gpu.groupby("time")["gpu_util"].mean().reset_index()

            job_gpu_trace = (job_gpu["gpu_util"].to_numpy() * 0.01 * gpus_per_node).tolist()

            job_gpu_trace = (
                job_gpu["gpu_util"].to_numpy() * 0.01 * gpus_per_node
            ).tolist()

        if machine_ids:
            # Shift times relative to start_ts
            submit_time = submitted.timestamp() - start_ts if submitted else None
            start_time = start.timestamp() - start_ts if start else None
            end_time = end.timestamp() - start_ts if end else None
            submit_time = submitted.timestamp()
            start_time = start.timestamp()
            end_time = end.timestamp()

            if not submit_time or not start_time or not end_time:
                tqdm.write(f"skipped {jobid} b/c missing submit_time, start_time, or end_time")
                tqdm.write(
                    f"skipped {jobid} b/c missing submit_time, start_time, or end_time"
                )
                continue

            scheduled_nodes = [node_mapping[mid] for mid in machine_ids if mid in node_mapping]
            scheduled_nodes = [
                node_mapping[mid] for mid in machine_ids if mid in node_mapping
            ]

            if submit_time and start_time and end_time:

@@ -391,35 +411,25 @@ def load_data(files, **kwargs):
                    id=jobid,
                    name=f"philly-{jobid}",
                    account=user if user else "unknown",

                    nodes_required=len(machine_ids),
                    partition=partition_id,
                    priority=0,

                    cpu_cores_required=1,
                    gpu_units_required=gpus_per_node,

                    end_state=status,
                    scheduled_nodes=scheduled_nodes,

                    cpu_trace=job_cpu_trace,
                    gpu_trace=job_gpu_trace,
                    ntx_trace=None,
                    nrx_trace=None,

                    ntx_trace=[],
                    nrx_trace=[],
                    submit_time=submit_time,
                    start_time=start_time,
                    end_time=end_time,
                    #time_limit=end_time - start_time,
                    time_limit=end_time,
                    expected_run_time=wall_time if wall_time else 0,
                    current_run_time=0,
                    trace_time=None,
                    trace_start_time=0, #None,
                    trace_start_time=start_time,  # None,
                    trace_end_time=end_time,  # None,
                    trace_quanta=60,
                    trace_missing_values=False,
                    downscale=1
                    trace_missing_values=False
                )
                if job_cpu_trace and job_gpu_trace:
                    jobs_list.append(Job(job))
@@ -427,10 +437,11 @@ def load_data(files, **kwargs):
                    tqdm.write(f"skipping {job['id']} b/c either no cpu or gpu trace")

            if debug:
                tqdm.write(f"abs start time: {start_ts} rel job start: {job['start_time']} rel job end: {job['end_time']}")
                tqdm.write(f"{job['id']} start: {job['start_time']} end: {job['end_time']}")

    return WorkloadData(
        jobs=jobs_list,
        telemetry_start=0, telemetry_end=int(end_ts - start_ts),
        telemetry_start=start_ts,
        telemetry_end=end_ts,
        start_date=datetime.fromtimestamp(start_ts, timezone.utc),
    )