Verified Commit 34b3e6f7 authored by Brewer, Wes's avatar Brewer, Wes Committed by Hines, Jesse
Browse files

Fix issues with mit_supercloud dataloader - add exp files for: telemetry and synthetic

Fix some issues with experiments/mit-replay-24hrs.yaml
parent 9f4d5b5a
Loading
Loading
Loading
Loading
+6 −0
Original line number Diff line number Diff line
# python main.py run-multi-part experiments/mit-replay-24hrs.yaml
partitions: ["mit_supercloud/part-cpu", "mit_supercloud/part-gpu"]
replay:
  - /opt/data/mit_supercloud/202201
start: 2021-05-21T00:00
end: 2021-05-22T00:00
+3 −0
Original line number Diff line number Diff line
system: mit_supercloud
# python main.py run-multi-part experiments/mit-synthetic.yaml
partitions: ["mit_supercloud/part-cpu", "mit_supercloud/part-gpu"]
replay:
  - /opt/data/mit_supercloud
start: 2021-05-21T13:00
end: 2021-05-21T14:00
workload: multitenant
+45 −13
Original line number Diff line number Diff line
@@ -116,9 +116,9 @@ import re
from tqdm import tqdm
from typing import Dict, Union, Optional
from collections import Counter
from datetime import datetime, timezone

from raps.job import job_dict, Job
from raps.utils import summarize_ranges, WorkloadData
from raps.utils import summarize_ranges, next_arrival
from .utils import proc_cpu_series, proc_gpu_series, to_epoch
from .utils import DEFAULT_START, DEFAULT_END

@@ -209,6 +209,8 @@ def load_data(local_dataset_path, **kwargs):
       jobs_list, sim_start_time, sim_end_time
    """
    debug = kwargs.get("debug")
    config = kwargs.get("config")
    arrival = kwargs.get("arrival")
    NL_PATH = os.path.dirname(__file__)

    skip_counts = Counter()
@@ -300,6 +302,17 @@ def load_data(local_dataset_path, **kwargs):
    cpu_only = (part == "part-cpu")
    mixed = (part == "part-gpu")

    # handle single-partition configs (e.g., mit_supercloud.yaml)
    if not cpu_only and not mixed:
        gpus_per_node = config.get("GPUS_PER_NODE")

        if gpus_per_node == 0:
            cpu_only = True
            part = "part-cpu"
        else:
            mixed = True
            part = "part-gpu"

    # create nodelist mapping
    if cpu_only:
        with open(os.path.join(NL_PATH, "cpu_nodes.txt")) as f:
@@ -516,7 +529,6 @@ def load_data(local_dataset_path, **kwargs):
    jobs_list = []

    # Get CPUS_PER_NODE and GPUS_PER_NODE from config
    config = kwargs.get('config', {})
    cpus_per_node = config.get('CPUS_PER_NODE')
    cores_per_cpu = config.get('CORES_PER_CPU')
    # gpus_per_node = config.get('GPUS_PER_NODE')  # Unused
@@ -573,7 +585,21 @@ def load_data(local_dataset_path, **kwargs):
        cpu_peak = cpu_cores_req / cores_per_cpu / cpus_per_node  # Is this per CPU?
        cpu_tr = [min(x/cores_per_cpu/cpus_per_node, cpu_peak) for x in cpu_tr]

        submit_time = rec.get("time_submit", t0) - start_ts
        if arrival == "poisson":
            job_arrival_time = config.get("JOB_ARRIVAL_TIME")
            submit_time = next_arrival(1 / job_arrival_time)
            start_time = submit_time
            end_time = None
            scheduled_nodes = None
            telemetry_start = 0
            telemetry_end = 86640
        else:  # replay
            start_time = t0 - start_ts
            end_time = t1 - start_ts
            submit_time = rec.get("time_submit") - start_ts
            scheduled_nodes = rec.get("scheduled_nodes")
            telemetry_start = int(sl.time_start.min())
            telemetry_end = int(sl.time_end.max())

        current_job_dict = job_dict(
            nodes_required=nr,
@@ -587,12 +613,12 @@ def load_data(local_dataset_path, **kwargs):
            nrx_trace=[],
            end_state=rec.get("state_end", "unknown"),
            id=jid,
            scheduled_nodes=rec.get("scheduled_nodes"),
            scheduled_nodes=scheduled_nodes,
            priority=rec.get("priority", 0),
            submit_time=submit_time,
            time_limit=rec.get("time_limit", 0),
            start_time=t0 - start_ts,
            end_time=t1 - start_ts,
            time_limit=rec.get("timelimit") * 60,
            start_time=start_time,
            end_time=end_time,
            expected_run_time=max(0, t1-t0),
            trace_time=len(cpu_tr)*quanta,
            trace_start_time=0,
@@ -602,12 +628,18 @@ def load_data(local_dataset_path, **kwargs):
        job = Job(current_job_dict)
        jobs_list.append(job)

    # Calculate min_overall_utime and max_overall_utime
    # min_overall_utime = int(sl.time_submit.min())
    # max_overall_utime = int(sl.time_submit.max())

    # args_namespace = SimpleNamespace(
    #    fastforward=min_overall_utime,
    #    system='mit_supercloud',
    #    time=max_overall_utime
    # )

    print("\nSkipped jobs summary:")
    for reason, count in skip_counts.items():
        print(f"- {reason}: {count}")

    return WorkloadData(
        jobs=jobs_list,
        telemetry_start=0, telemetry_end=int(end_ts - start_ts),
        start_date=datetime.fromtimestamp(start_ts, timezone.utc),
    )
    return jobs_list, telemetry_start, telemetry_end  # min_overall_utime, max_overall_utime, args_namespace