From c91a819000611981d90b8dd50d1b5e9c2af4e9e2 Mon Sep 17 00:00:00 2001 From: Wes Brewer Date: Wed, 17 Sep 2025 23:20:07 -0400 Subject: [PATCH 01/26] Initial config and dataloader for philly traces (dataloader not yet working) --- README.md | 8 +++ config/philly/2-gpu.yaml | 51 ++++++++++++++++ config/philly/8-gpu.yaml | 51 ++++++++++++++++ raps/dataloaders/philly.py | 116 +++++++++++++++++++++++++++++++++++++ 4 files changed, 226 insertions(+) create mode 100644 config/philly/2-gpu.yaml create mode 100644 config/philly/8-gpu.yaml create mode 100644 raps/dataloaders/philly.py diff --git a/README.md b/README.md index 9c708af..92c68e0 100644 --- a/README.md +++ b/README.md @@ -77,6 +77,14 @@ For MIT Supercloud # Reinforcement learning test case raps train-rl --system mit_supercloud/part-cpu -f /opt/data/mit_supercloud/202201 +Microsoft Azure - 2017 Philly Traces + + # Synthetic + python main.py run-parts -x philly -w multitenant + + # Telemetry replay + python main.py run-parts -x philly -f /opt/data/philly/trace-data + For Lumi # Synthetic test for Lumi: diff --git a/config/philly/2-gpu.yaml b/config/philly/2-gpu.yaml new file mode 100644 index 0000000..0622605 --- /dev/null +++ b/config/philly/2-gpu.yaml @@ -0,0 +1,51 @@ +system: + num_cdus: 1 + racks_per_cdu: 1 + nodes_per_rack: 321 + chassis_per_rack: 3 + nodes_per_blade: 2 + switches_per_chassis: 4 + nics_per_node: 4 + rectifiers_per_chassis: 4 + nodes_per_rectifier: 4 + missing_racks: [] + down_nodes: [] + cpus_per_node: 2 + cores_per_cpu: 20 + gpus_per_node: 2 + cpu_peak_flops: 1248000000000.0 + gpu_peak_flops: 7800000000000.0 + cpu_fp_ratio: 0.667 + gpu_fp_ratio: 0.667 +power: + power_gpu_idle: 75 + power_gpu_max: 300 + power_cpu_idle: 90 + power_cpu_max: 280 + power_mem: 74.26 + power_nvme: 30 + power_nic: 20 + power_cdu: 8473.47 + power_switch: 250 + power_update_freq: 15 + rectifier_peak_threshold: 13670 + sivoc_loss_constant: 13 + sivoc_efficiency: 0.98 + rectifier_loss_constant: 17 + rectifier_efficiency: 0.96 + power_cost: 0.094 +scheduler: + multitenant: true + job_arrival_time: 900 + mtbf: 11 + trace_quanta: 20 + min_wall_time: 3600 + max_wall_time: 43200 + ui_update_freq: 900 + max_nodes_per_job: 192 + job_end_probs: + COMPLETED: 0.63 + FAILED: 0.13 + CANCELLED: 0.12 + TIMEOUT: 0.11 + NODE_FAIL: 0.01 diff --git a/config/philly/8-gpu.yaml b/config/philly/8-gpu.yaml new file mode 100644 index 0000000..aae80ee --- /dev/null +++ b/config/philly/8-gpu.yaml @@ -0,0 +1,51 @@ +system: + num_cdus: 1 + racks_per_cdu: 1 + nodes_per_rack: 231 + chassis_per_rack: 3 + nodes_per_blade: 2 + switches_per_chassis: 4 + nics_per_node: 4 + rectifiers_per_chassis: 4 + nodes_per_rectifier: 4 + missing_racks: [] + down_nodes: [] + cpus_per_node: 2 + cores_per_cpu: 20 + gpus_per_node: 8 + cpu_peak_flops: 1248000000000.0 + gpu_peak_flops: 7800000000000.0 + cpu_fp_ratio: 0.667 + gpu_fp_ratio: 0.667 +power: + power_gpu_idle: 75 + power_gpu_max: 300 + power_cpu_idle: 90 + power_cpu_max: 280 + power_mem: 74.26 + power_nvme: 30 + power_nic: 20 + power_cdu: 8473.47 + power_switch: 250 + power_update_freq: 15 + rectifier_peak_threshold: 13670 + sivoc_loss_constant: 13 + sivoc_efficiency: 0.98 + rectifier_loss_constant: 17 + rectifier_efficiency: 0.96 + power_cost: 0.094 +scheduler: + multitenant: true + job_arrival_time: 900 + mtbf: 11 + trace_quanta: 20 + min_wall_time: 3600 + max_wall_time: 43200 + ui_update_freq: 900 + max_nodes_per_job: 192 + job_end_probs: + COMPLETED: 0.63 + FAILED: 0.13 + CANCELLED: 0.12 + TIMEOUT: 0.11 + NODE_FAIL: 0.01 diff --git a/raps/dataloaders/philly.py b/raps/dataloaders/philly.py new file mode 100644 index 0000000..08c02e1 --- /dev/null +++ b/raps/dataloaders/philly.py @@ -0,0 +1,116 @@ +import os +import json +import csv +import datetime +import pandas as pd +import warnings +from raps.job import Job + +DATE_FORMAT_STR = "%Y-%m-%d %H:%M:%S" + +def parse_date(s): + if not s or s == "None": + return None + # strip possible timezone labels like "PST"/"PDT" + s = s.replace(" PST", "").replace(" PDT", "") + return datetime.datetime.strptime(s, DATE_FORMAT_STR) + +def load_data(files, **kwargs): + """ + Load Philly trace into ExaDigiT Job objects. + + Args: + files (list[str]): A list with one directory path (e.g., ['/opt/data/philly/trace-data']). + + Returns: + list[Job] + """ + assert len(files) == 1, "Expecting a single directory path" + trace_dir = files[0] + + # --- 1. Machine list --- + machine_file = os.path.join(trace_dir, "cluster_machine_list") + machines = {} + with open(machine_file) as f: + reader = csv.DictReader(f) + for row in reader: + mid = row["machineId"] + machines[mid] = { + "num_gpus": int(row[" number of GPUs"]), + "gpu_mem": row[" single GPU mem"].strip() + } + + # --- 2. CPU util --- + cpu_file = os.path.join(trace_dir, "cluster_cpu_util") + cpu_util = pd.read_csv(cpu_file) + # cpu_util has columns: time, machine_id, cpu_util + + # --- 3. GPU util --- + gpu_file = os.path.join(trace_dir, "cluster_gpu_util") + + with warnings.catch_warnings(record=True) as wlist: + gpu_util = pd.read_csv( + gpu_file, + engine="python", + on_bad_lines="skip" + ) + + if wlist: + warnings.warn( + f"cluster_gpu_util: skipped {len(wlist)} malformed lines while reading {gpu_file}", + UserWarning + ) + + # --- 4. Job log --- + job_file = os.path.join(trace_dir, "cluster_job_log") + with open(job_file) as f: + job_log = json.load(f) + + jobs = [] + for raw in job_log: + jobid = raw.get("jobid") + user = raw.get("user") + status = raw.get("status") + submitted = parse_date(raw.get("submitted_time")) + + attempts = raw.get("attempts", []) + start, end = None, None + if attempts: + start = parse_date(attempts[0].get("start_time")) + end = parse_date(attempts[-1].get("end_time")) + + wall_time = None + if start and end: + wall_time = (end - start).total_seconds() + + # Which machines did this job run on? + machine_ids = [] + gpus = 0 + if attempts and "detail" in attempts[0]: + for detail in attempts[0]["detail"]: + mid = detail["ip"] + machine_ids.append(mid) + gpus += len(detail.get("gpus", [])) + + # Collect utilization traces for each machine this job touched + job_cpu = cpu_util[cpu_util["machine_id"].isin(machine_ids)] + job_gpu = gpu_util[gpu_util["machineId"].isin(machine_ids)] + + job = Job( + job_id=jobid, + name=f"philly-{jobid}", + user=user, + nodes_required=len(machine_ids) if machine_ids else None, + wall_time=wall_time, + start_time=start, + end_time=end, + queue_time=submitted, + scheduled_nodes=machine_ids, + cpu_trace=job_cpu if not job_cpu.empty else None, + gpu_trace=job_gpu if not job_gpu.empty else None, + priority=None, + end_state=status + ) + jobs.append(job) + + return jobs -- GitLab From f5b9fc4124c8eb29390ee55ccd26789a9469a092 Mon Sep 17 00:00:00 2001 From: Wes Brewer Date: Thu, 18 Sep 2025 12:09:31 -0400 Subject: [PATCH 02/26] Add in all the args for job_dict call --- raps/dataloaders/philly.py | 64 ++++++++++++++++++++++++++++---------- 1 file changed, 47 insertions(+), 17 deletions(-) diff --git a/raps/dataloaders/philly.py b/raps/dataloaders/philly.py index 08c02e1..1601eee 100644 --- a/raps/dataloaders/philly.py +++ b/raps/dataloaders/philly.py @@ -4,7 +4,7 @@ import csv import datetime import pandas as pd import warnings -from raps.job import Job +from raps.job import job_dict, Job DATE_FORMAT_STR = "%Y-%m-%d %H:%M:%S" @@ -96,21 +96,51 @@ def load_data(files, **kwargs): job_cpu = cpu_util[cpu_util["machine_id"].isin(machine_ids)] job_gpu = gpu_util[gpu_util["machineId"].isin(machine_ids)] - job = Job( - job_id=jobid, - name=f"philly-{jobid}", - user=user, - nodes_required=len(machine_ids) if machine_ids else None, - wall_time=wall_time, - start_time=start, - end_time=end, - queue_time=submitted, - scheduled_nodes=machine_ids, - cpu_trace=job_cpu if not job_cpu.empty else None, - gpu_trace=job_gpu if not job_gpu.empty else None, - priority=None, - end_state=status - ) - jobs.append(job) + print("***", len(machine_ids), machine_ids) + + if machine_ids: + + job = job_dict( + # Core identity + id=jobid, + name=f"philly-{jobid}", + account=user if user else "unknown", # Philly log has user + + # Partition & priority + nodes_required=len(machine_ids) if machine_ids else 0, + partition=0, + priority=0, + + # Resource requests + cpu_cores_required=0, # Philly logs don’t track cores + gpu_units_required=gpus, # we can count GPUs from attempts + allocated_cpu_cores=0, + allocated_gpu_units=gpus, + + # State + end_state=status, + + # Traces + cpu_trace=job_cpu if not job_cpu.empty else None, + gpu_trace=job_gpu if not job_gpu.empty else None, + ntx_trace=None, + nrx_trace=None, + + # Timing + submit_time=submitted.timestamp() if submitted else 0, + start_time=start.timestamp() if start else 0, + end_time=end.timestamp() if end else 0, + time_limit=0, + expected_run_time=wall_time if wall_time else 0, + current_run_time=0, + trace_time=None, + trace_start_time=None, + trace_end_time=None, + trace_quanta=None, + trace_missing_values=False, + downscale=1 + ) + print(job) + jobs.append(Job(job)) return jobs -- GitLab From 55d02cebaa08179aa73a7ff4554b04e10c6b6c5d Mon Sep 17 00:00:00 2001 From: Wes Brewer Date: Thu, 18 Sep 2025 12:28:54 -0400 Subject: [PATCH 03/26] Shift jobs relative to earliest submitted time --- raps/dataloaders/philly.py | 144 +++++++++++++++++++++++-------------- 1 file changed, 92 insertions(+), 52 deletions(-) diff --git a/raps/dataloaders/philly.py b/raps/dataloaders/philly.py index 1601eee..399c546 100644 --- a/raps/dataloaders/philly.py +++ b/raps/dataloaders/philly.py @@ -4,6 +4,7 @@ import csv import datetime import pandas as pd import warnings +from tqdm import tqdm from raps.job import job_dict, Job DATE_FORMAT_STR = "%Y-%m-%d %H:%M:%S" @@ -66,81 +67,120 @@ def load_data(files, **kwargs): with open(job_file) as f: job_log = json.load(f) - jobs = [] + # --- First pass: find earliest submit time --- + earliest_submit = None for raw in job_log: + submitted = raw.get("submitted_time") + if submitted is None or submitted == "None": + continue + + # Philly uses either string dates or epoch ints + if isinstance(submitted, (int, float)): + t = int(submitted) + else: + t = parse_date(submitted).timestamp() + + if earliest_submit is None or t < earliest_submit: + earliest_submit = t + + if earliest_submit is None: + raise ValueError("No valid submitted_time found in Philly traces") + + # --- Second pass: build jobs --- + jobs = [] + for raw in tqdm(job_log, desc="Building Job objects"): jobid = raw.get("jobid") user = raw.get("user") status = raw.get("status") - submitted = parse_date(raw.get("submitted_time")) + + # Submitted time + submitted = raw.get("submitted_time") + if isinstance(submitted, (int, float)): + submitted = datetime.datetime.fromtimestamp(int(submitted)) + else: + submitted = parse_date(submitted) attempts = raw.get("attempts", []) start, end = None, None if attempts: - start = parse_date(attempts[0].get("start_time")) - end = parse_date(attempts[-1].get("end_time")) + st = attempts[0].get("start_time") + et = attempts[-1].get("end_time") + + if isinstance(st, (int, float)): + start = datetime.datetime.fromtimestamp(int(st)) + elif st: + start = parse_date(st) + + if isinstance(et, (int, float)): + end = datetime.datetime.fromtimestamp(int(et)) + elif et: + end = parse_date(et) wall_time = None if start and end: wall_time = (end - start).total_seconds() # Which machines did this job run on? - machine_ids = [] - gpus = 0 + machine_ids, gpus = [], 0 if attempts and "detail" in attempts[0]: for detail in attempts[0]["detail"]: mid = detail["ip"] machine_ids.append(mid) gpus += len(detail.get("gpus", [])) - # Collect utilization traces for each machine this job touched + # Collect utilization traces job_cpu = cpu_util[cpu_util["machine_id"].isin(machine_ids)] job_gpu = gpu_util[gpu_util["machineId"].isin(machine_ids)] - print("***", len(machine_ids), machine_ids) - if machine_ids: - - job = job_dict( - # Core identity - id=jobid, - name=f"philly-{jobid}", - account=user if user else "unknown", # Philly log has user - - # Partition & priority - nodes_required=len(machine_ids) if machine_ids else 0, - partition=0, - priority=0, - - # Resource requests - cpu_cores_required=0, # Philly logs don’t track cores - gpu_units_required=gpus, # we can count GPUs from attempts - allocated_cpu_cores=0, - allocated_gpu_units=gpus, - - # State - end_state=status, - - # Traces - cpu_trace=job_cpu if not job_cpu.empty else None, - gpu_trace=job_gpu if not job_gpu.empty else None, - ntx_trace=None, - nrx_trace=None, - - # Timing - submit_time=submitted.timestamp() if submitted else 0, - start_time=start.timestamp() if start else 0, - end_time=end.timestamp() if end else 0, - time_limit=0, - expected_run_time=wall_time if wall_time else 0, - current_run_time=0, - trace_time=None, - trace_start_time=None, - trace_end_time=None, - trace_quanta=None, - trace_missing_values=False, - downscale=1 - ) - print(job) - jobs.append(Job(job)) + # Shift times relative to earliest_submit + submit_time = submitted.timestamp() - earliest_submit if submitted else None + start_time = start.timestamp() - earliest_submit if start else None + end_time = end.timestamp() - earliest_submit if end else None + + if not submit_time or not start_time or not end_time: + warnings.warn( + f"skipped {jobid} b/c missing submit_time, start_time, or end_time", + UserWarning + ) + + + if submit_time and start_time and end_time: + + job = job_dict( + id=jobid, + name=f"philly-{jobid}", + account=user if user else "unknown", + + nodes_required=len(machine_ids), + partition=0, + priority=0, + + cpu_cores_required=0, + gpu_units_required=gpus, + allocated_cpu_cores=0, + allocated_gpu_units=gpus, + + end_state=status, + + cpu_trace=job_cpu if not job_cpu.empty else None, + gpu_trace=job_gpu if not job_gpu.empty else None, + ntx_trace=None, + nrx_trace=None, + + submit_time=submit_time, + start_time=start_time, + end_time=end_time, + time_limit=0, + expected_run_time=wall_time if wall_time else 0, + current_run_time=0, + trace_time=None, + trace_start_time=None, + trace_end_time=None, + trace_quanta=None, + trace_missing_values=False, + downscale=1 + ) + jobs.append(Job(job)) return jobs -- GitLab From a3a9a9d843d469c0ec4b58826d2edd3d10ca96d8 Mon Sep 17 00:00:00 2001 From: Wes Brewer Date: Thu, 18 Sep 2025 16:16:15 -0400 Subject: [PATCH 04/26] Work on getting cpu and gpu traces working correctly... but not yet --- raps/dataloaders/philly.py | 107 +++++++++++++++++++++++++++++-------- 1 file changed, 85 insertions(+), 22 deletions(-) diff --git a/raps/dataloaders/philly.py b/raps/dataloaders/philly.py index 399c546..0e9eaa8 100644 --- a/raps/dataloaders/philly.py +++ b/raps/dataloaders/philly.py @@ -1,11 +1,13 @@ import os import json import csv -import datetime import pandas as pd import warnings + +from datetime import datetime, timezone from tqdm import tqdm from raps.job import job_dict, Job +from raps.utils import WorkloadData DATE_FORMAT_STR = "%Y-%m-%d %H:%M:%S" @@ -14,7 +16,7 @@ def parse_date(s): return None # strip possible timezone labels like "PST"/"PDT" s = s.replace(" PST", "").replace(" PDT", "") - return datetime.datetime.strptime(s, DATE_FORMAT_STR) + return datetime.strptime(s, DATE_FORMAT_STR) def load_data(files, **kwargs): """ @@ -41,10 +43,17 @@ def load_data(files, **kwargs): "gpu_mem": row[" single GPU mem"].strip() } + # build node → index mapping + node_mapping = {mid: idx for idx, mid in enumerate(sorted(machines.keys()))} + # --- 2. CPU util --- cpu_file = os.path.join(trace_dir, "cluster_cpu_util") cpu_util = pd.read_csv(cpu_file) # cpu_util has columns: time, machine_id, cpu_util + cpu_util["time"] = cpu_util["time"].str.replace(" PST","").str.replace(" PDT","") + #cpu_util["time"] = pd.to_datetime(cpu_util["time"], format="%Y-%m-%d %H:%M:%S") + # now cpu_util has: time (datetime), machine_id, cpu_util + cpu_util["time"] = cpu_util["time"].apply(parse_date) # --- 3. GPU util --- gpu_file = os.path.join(trace_dir, "cluster_gpu_util") @@ -55,20 +64,33 @@ def load_data(files, **kwargs): engine="python", on_bad_lines="skip" ) - if wlist: warnings.warn( f"cluster_gpu_util: skipped {len(wlist)} malformed lines while reading {gpu_file}", UserWarning ) + # Convert time to datetime + gpu_util["time"] = pd.to_datetime(gpu_util["time"], errors="coerce").dt.tz_localize(None) + + # Identify GPU columns + gpu_cols = [c for c in gpu_util.columns if c.startswith("gpu")] + + # Collapse per row: sum all GPU utilizations and divide by 100 + gpu_util["gpu_util"] = gpu_util[gpu_cols].sum(axis=1) / 100.0 + + # Keep only collapsed util plus metadata + gpu_util = gpu_util[["time", "machineId", "gpu_util"]] + + print("Sample GPU util after preprocess:", gpu_util.head()) + # --- 4. Job log --- job_file = os.path.join(trace_dir, "cluster_job_log") with open(job_file) as f: job_log = json.load(f) # --- First pass: find earliest submit time --- - earliest_submit = None + start_ts = None for raw in job_log: submitted = raw.get("submitted_time") if submitted is None or submitted == "None": @@ -80,14 +102,15 @@ def load_data(files, **kwargs): else: t = parse_date(submitted).timestamp() - if earliest_submit is None or t < earliest_submit: - earliest_submit = t + if start_ts is None or t < start_ts: + start_ts = t - if earliest_submit is None: + if start_ts is None: raise ValueError("No valid submitted_time found in Philly traces") + # --- Second pass: build jobs --- - jobs = [] + jobs_list = [] for raw in tqdm(job_log, desc="Building Job objects"): jobid = raw.get("jobid") user = raw.get("user") @@ -96,7 +119,7 @@ def load_data(files, **kwargs): # Submitted time submitted = raw.get("submitted_time") if isinstance(submitted, (int, float)): - submitted = datetime.datetime.fromtimestamp(int(submitted)) + submitted = datetime.fromtimestamp(int(submitted)) else: submitted = parse_date(submitted) @@ -107,12 +130,12 @@ def load_data(files, **kwargs): et = attempts[-1].get("end_time") if isinstance(st, (int, float)): - start = datetime.datetime.fromtimestamp(int(st)) + start = datetime.fromtimestamp(int(st)) elif st: start = parse_date(st) if isinstance(et, (int, float)): - end = datetime.datetime.fromtimestamp(int(et)) + end = datetime.fromtimestamp(int(et)) elif et: end = parse_date(et) @@ -128,15 +151,44 @@ def load_data(files, **kwargs): machine_ids.append(mid) gpus += len(detail.get("gpus", [])) - # Collect utilization traces - job_cpu = cpu_util[cpu_util["machine_id"].isin(machine_ids)] - job_gpu = gpu_util[gpu_util["machineId"].isin(machine_ids)] + # CPU utilization traces + if machine_ids and start and end: + mask = ( + cpu_util["machine_id"].isin(machine_ids) & + (cpu_util["time"] >= start) & + (cpu_util["time"] <= end) + ) + job_cpu = cpu_util.loc[mask].copy() + + # Aggregate across machines if >1 machine + if len(machine_ids) > 1: + job_cpu = job_cpu.groupby("time")["cpu_util"].mean().reset_index() + + print("Job", jobid) + print("machine_ids from job:", machine_ids[:5]) + print("gpu_util machineId sample:", gpu_util["machineId"].unique()[:5]) + print("start, end:", start, end) + print("gpu_util time range:", gpu_util["time"].min(), gpu_util["time"].max()) + + # GPU utilization traces + job_gpu = None + if machine_ids and start and end: + mask = ( + gpu_util["machineId"].isin(machine_ids) & + (gpu_util["time"] >= start) & + (gpu_util["time"] <= end) + ) + job_gpu = gpu_util.loc[mask].copy() + + # Aggregate across machines if >1 machine + if len(machine_ids) > 1: + job_gpu = job_gpu.groupby("time")["gpu_util"].sum().reset_index() if machine_ids: - # Shift times relative to earliest_submit - submit_time = submitted.timestamp() - earliest_submit if submitted else None - start_time = start.timestamp() - earliest_submit if start else None - end_time = end.timestamp() - earliest_submit if end else None + # Shift times relative to start_ts + submit_time = submitted.timestamp() - start_ts if submitted else None + start_time = start.timestamp() - start_ts if start else None + end_time = end.timestamp() - start_ts if end else None if not submit_time or not start_time or not end_time: warnings.warn( @@ -144,6 +196,7 @@ def load_data(files, **kwargs): UserWarning ) + scheduled_nodes = [node_mapping[mid] for mid in machine_ids if mid in node_mapping] if submit_time and start_time and end_time: @@ -162,9 +215,10 @@ def load_data(files, **kwargs): allocated_gpu_units=gpus, end_state=status, + scheduled_nodes=scheduled_nodes, - cpu_trace=job_cpu if not job_cpu.empty else None, - gpu_trace=job_gpu if not job_gpu.empty else None, + cpu_trace=job_cpu, + gpu_trace=job_gpu, ntx_trace=None, nrx_trace=None, @@ -181,6 +235,15 @@ def load_data(files, **kwargs): trace_missing_values=False, downscale=1 ) - jobs.append(Job(job)) + jobs_list.append(Job(job)) + + print(job) + + # Find max end timestamp across jobs + end_ts = max(j.end_time for j in jobs_list if j.end_time is not None) - return jobs + return WorkloadData( + jobs=jobs_list, + telemetry_start=0, telemetry_end=int(end_ts - start_ts), + start_date=datetime.fromtimestamp(start_ts, timezone.utc), + ) -- GitLab From de2aba18e8ebbc755d192bc93cc56df27adc11d1 Mon Sep 17 00:00:00 2001 From: Wes Brewer Date: Thu, 18 Sep 2025 23:32:14 -0400 Subject: [PATCH 05/26] Fix some issues so that both partitions work --- raps/dataloaders/philly.py | 34 +++++++++++++++++++++++----------- 1 file changed, 23 insertions(+), 11 deletions(-) diff --git a/raps/dataloaders/philly.py b/raps/dataloaders/philly.py index 0e9eaa8..789633f 100644 --- a/raps/dataloaders/philly.py +++ b/raps/dataloaders/philly.py @@ -30,6 +30,10 @@ def load_data(files, **kwargs): """ assert len(files) == 1, "Expecting a single directory path" trace_dir = files[0] + config = kwargs.get('config') + gpus_per_node = config.get("GPUS_PER_NODE") + if gpus_per_node is None: + raise ValueError("Must pass gpus_per_node (2 or 8)") # --- 1. Machine list --- machine_file = os.path.join(trace_dir, "cluster_machine_list") @@ -43,8 +47,17 @@ def load_data(files, **kwargs): "gpu_mem": row[" single GPU mem"].strip() } - # build node → index mapping - node_mapping = {mid: idx for idx, mid in enumerate(sorted(machines.keys()))} + partition_machines = { + mid: info for mid, info in machines.items() + if info["num_gpus"] == gpus_per_node + } + + # Build node → index mapping for this partition + node_mapping = {mid: idx for idx, mid in enumerate(sorted(partition_machines.keys()))} + max_nodes = len(node_mapping) + + # Assign partition ID (e.g. 0 for 2-GPU, 1 for 8-GPU) + partition_id = 0 if gpus_per_node == 2 else 1 # --- 2. CPU util --- cpu_file = os.path.join(trace_dir, "cluster_cpu_util") @@ -82,7 +95,7 @@ def load_data(files, **kwargs): # Keep only collapsed util plus metadata gpu_util = gpu_util[["time", "machineId", "gpu_util"]] - print("Sample GPU util after preprocess:", gpu_util.head()) + #print("Sample GPU util after preprocess:", gpu_util.head()) # --- 4. Job log --- job_file = os.path.join(trace_dir, "cluster_job_log") @@ -108,10 +121,9 @@ def load_data(files, **kwargs): if start_ts is None: raise ValueError("No valid submitted_time found in Philly traces") - # --- Second pass: build jobs --- jobs_list = [] - for raw in tqdm(job_log, desc="Building Job objects"): + for raw in tqdm(job_log[:1000], desc="Building Job objects"): jobid = raw.get("jobid") user = raw.get("user") status = raw.get("status") @@ -164,11 +176,11 @@ def load_data(files, **kwargs): if len(machine_ids) > 1: job_cpu = job_cpu.groupby("time")["cpu_util"].mean().reset_index() - print("Job", jobid) - print("machine_ids from job:", machine_ids[:5]) - print("gpu_util machineId sample:", gpu_util["machineId"].unique()[:5]) - print("start, end:", start, end) - print("gpu_util time range:", gpu_util["time"].min(), gpu_util["time"].max()) + #print("Job", jobid) + #print("machine_ids from job:", machine_ids[:5]) + #print("gpu_util machineId sample:", gpu_util["machineId"].unique()[:5]) + #print("start, end:", start, end) + #print("gpu_util time range:", gpu_util["time"].min(), gpu_util["time"].max()) # GPU utilization traces job_gpu = None @@ -206,7 +218,7 @@ def load_data(files, **kwargs): account=user if user else "unknown", nodes_required=len(machine_ids), - partition=0, + partition=partition_id, priority=0, cpu_cores_required=0, -- GitLab From 90fe862830dfbff162b33a37698866e3335ab9eb Mon Sep 17 00:00:00 2001 From: Wes Brewer Date: Mon, 29 Sep 2025 01:49:16 +0300 Subject: [PATCH 06/26] Work towards support for reading philly traces that are already parsed into daily files --- raps/dataloaders/philly.py | 168 ++++++++++++++++++++++++++----------- 1 file changed, 120 insertions(+), 48 deletions(-) diff --git a/raps/dataloaders/philly.py b/raps/dataloaders/philly.py index 789633f..aa1a8ae 100644 --- a/raps/dataloaders/philly.py +++ b/raps/dataloaders/philly.py @@ -1,15 +1,98 @@ import os +import glob import json import csv import pandas as pd import warnings -from datetime import datetime, timezone +from datetime import datetime, timezone, timedelta from tqdm import tqdm from raps.job import job_dict, Job from raps.utils import WorkloadData DATE_FORMAT_STR = "%Y-%m-%d %H:%M:%S" +DEFAULT_START = "2017-10-03T00:00" +DEFAULT_END = "2017-10-04T00:00" + +def to_epoch(ts_str): + if ts_str is None: + return None + if isinstance(ts_str, (int, float)): + return int(ts_str) + if "T" in ts_str: + dt = datetime.fromisoformat(ts_str) + else: + dt = datetime.strptime(ts_str, DATE_FORMAT_STR) + return int(dt.timestamp()) + +def parse_timestamp(val): + """ + Convert Philly job log timestamps to datetime. + Handles integers (epoch) and strings with PST/PDT. + Returns datetime or None. + """ + if val is None or val == "None": + return None + if isinstance(val, (int, float)): + return datetime.fromtimestamp(int(val), tz=timezone.utc).replace(tzinfo=None) + if isinstance(val, str): + val = val.replace(" PST", "").replace(" PDT", "") + try: + return datetime.strptime(val, DATE_FORMAT_STR).replace(tzinfo=None) + except ValueError: + return None + return None + +def load_gpu_traces_by_dayXX(gpu_trace_dir, machine_ids, job_start_dt, job_end_dt): + """ + Load GPU utilization for specific machines and time range, + using preprocessed per-day CSVs (gpu_by_day/). + """ + dfs = [] + current = job_start_dt.date() + while current <= job_end_dt.date(): + day_file = os.path.join(gpu_trace_dir, f"{current}.csv") + if os.path.exists(day_file): + df = pd.read_csv( + day_file, + names=["time", "machineId", "gpu_util"], + parse_dates=["time"], + on_bad_lines="skip" + ) + df = df[df["machineId"].isin(machine_ids)] + df = df[(df["time"] >= job_start_dt) & (df["time"] <= job_end_dt)] + if not df.empty: + dfs.append(df) + current += timedelta(days=1) + + if dfs: + return pd.concat(dfs, ignore_index=True) + return pd.DataFrame(columns=["time", "machineId", "gpu_util"]) + + +def load_gpu_traces_by_day(trace_dir, start_dt, end_dt): + """Load GPU traces only for the days between start_dt and end_dt.""" + gpu_dir = os.path.join(trace_dir, "dist/gpu_by_day") + frames = [] + + current = start_dt.date() + while current <= end_dt.date(): + daily_file = os.path.join(gpu_dir, f"{current}.csv") + if os.path.exists(daily_file): + df = pd.read_csv( + daily_file, + names=["time", "machineId", "gpu_util"], # no header in daily CSVs + parse_dates=["time"] + ) + frames.append(df) + else: + print(f"⚠ No trace file for {current}") + current += timedelta(days=1) + + if not frames: + return pd.DataFrame(columns=["time", "machineId", "gpu_util"]) + + return pd.concat(frames, ignore_index=True) def parse_date(s): if not s or s == "None": @@ -28,8 +111,12 @@ def load_data(files, **kwargs): Returns: list[Job] """ + # extract --start from kwargs + start_ts = to_epoch(kwargs.get("start", DEFAULT_START)) + end_ts = to_epoch(kwargs.get("end", DEFAULT_END)) assert len(files) == 1, "Expecting a single directory path" trace_dir = files[0] + gpu_trace_dir = os.path.join(files[0], "dist", "gpu_by_day") config = kwargs.get('config') gpus_per_node = config.get("GPUS_PER_NODE") if gpus_per_node is None: @@ -62,46 +149,36 @@ def load_data(files, **kwargs): # --- 2. CPU util --- cpu_file = os.path.join(trace_dir, "cluster_cpu_util") cpu_util = pd.read_csv(cpu_file) - # cpu_util has columns: time, machine_id, cpu_util cpu_util["time"] = cpu_util["time"].str.replace(" PST","").str.replace(" PDT","") - #cpu_util["time"] = pd.to_datetime(cpu_util["time"], format="%Y-%m-%d %H:%M:%S") - # now cpu_util has: time (datetime), machine_id, cpu_util cpu_util["time"] = cpu_util["time"].apply(parse_date) # --- 3. GPU util --- - gpu_file = os.path.join(trace_dir, "cluster_gpu_util") + start_dt = datetime.fromtimestamp(start_ts, tz=timezone.utc) + end_dt = datetime.fromtimestamp(end_ts, tz=timezone.utc) - with warnings.catch_warnings(record=True) as wlist: - gpu_util = pd.read_csv( - gpu_file, - engine="python", - on_bad_lines="skip" - ) - if wlist: - warnings.warn( - f"cluster_gpu_util: skipped {len(wlist)} malformed lines while reading {gpu_file}", - UserWarning - ) - - # Convert time to datetime - gpu_util["time"] = pd.to_datetime(gpu_util["time"], errors="coerce").dt.tz_localize(None) - - # Identify GPU columns - gpu_cols = [c for c in gpu_util.columns if c.startswith("gpu")] + gpu_trace_dir = os.path.join(trace_dir, "dist", "gpu_by_day") - # Collapse per row: sum all GPU utilizations and divide by 100 - gpu_util["gpu_util"] = gpu_util[gpu_cols].sum(axis=1) / 100.0 - - # Keep only collapsed util plus metadata - gpu_util = gpu_util[["time", "machineId", "gpu_util"]] - - #print("Sample GPU util after preprocess:", gpu_util.head()) + # For each job: + gpu_trace = load_gpu_traces_by_day(gpu_trace_dir, start_dt, end_dt) + job_gpu = load_gpu_traces_by_day(gpu_trace_dir, start_dt, end_dt) # --- 4. Job log --- job_file = os.path.join(trace_dir, "cluster_job_log") with open(job_file) as f: job_log = json.load(f) + # Filter job_log to only jobs matching the partition's gpus_per_node + if gpus_per_node is not None: + filtered_log = [] + for raw in job_log: + attempts = raw.get("attempts", []) + if attempts and "detail" in attempts[0]: + # Count GPUs from the first detail + gpus = sum(len(detail.get("gpus", [])) for detail in attempts[0]["detail"]) + if gpus > 0 and (gpus % gpus_per_node == 0): + filtered_log.append(raw) + job_log = filtered_log + # --- First pass: find earliest submit time --- start_ts = None for raw in job_log: @@ -176,25 +253,19 @@ def load_data(files, **kwargs): if len(machine_ids) > 1: job_cpu = job_cpu.groupby("time")["cpu_util"].mean().reset_index() - #print("Job", jobid) - #print("machine_ids from job:", machine_ids[:5]) - #print("gpu_util machineId sample:", gpu_util["machineId"].unique()[:5]) - #print("start, end:", start, end) - #print("gpu_util time range:", gpu_util["time"].min(), gpu_util["time"].max()) + # --- absolute datetimes (used for filtering traces) --- + submitted_dt = parse_timestamp(raw.get("submitted_time")) - # GPU utilization traces - job_gpu = None - if machine_ids and start and end: - mask = ( - gpu_util["machineId"].isin(machine_ids) & - (gpu_util["time"] >= start) & - (gpu_util["time"] <= end) - ) - job_gpu = gpu_util.loc[mask].copy() + mask = ( + (gpu_trace["machineId"].isin(machine_ids)) & + (gpu_trace["time"] >= start_dt) & + (gpu_trace["time"] <= end_dt) + ) + job_gpu = gpu_trace.loc[mask].copy() - # Aggregate across machines if >1 machine - if len(machine_ids) > 1: - job_gpu = job_gpu.groupby("time")["gpu_util"].sum().reset_index() + print(f" job_gpu shape after filtering: {job_gpu.shape}") + if job_gpu.empty: + print(" ⚠ No GPU rows matched this job") if machine_ids: # Shift times relative to start_ts @@ -229,7 +300,7 @@ def load_data(files, **kwargs): end_state=status, scheduled_nodes=scheduled_nodes, - cpu_trace=job_cpu, + cpu_trace=0, gpu_trace=job_gpu, ntx_trace=None, nrx_trace=None, @@ -252,7 +323,8 @@ def load_data(files, **kwargs): print(job) # Find max end timestamp across jobs - end_ts = max(j.end_time for j in jobs_list if j.end_time is not None) + #end_ts = max(j.end_time for j in jobs_list if j.end_time is not None) + end_ts = 3600 return WorkloadData( jobs=jobs_list, -- GitLab From abb0ed312cc2ca8fe0fcb6bf4f54ce6c632cb158 Mon Sep 17 00:00:00 2001 From: Wes Brewer Date: Mon, 29 Sep 2025 02:53:16 +0300 Subject: [PATCH 07/26] Get gpu trace working but exit after first job - still lots of work to do --- README.md | 2 +- raps/dataloaders/philly.py | 86 +++++++++++++++----------------------- 2 files changed, 34 insertions(+), 54 deletions(-) diff --git a/README.md b/README.md index 92c68e0..3684594 100644 --- a/README.md +++ b/README.md @@ -83,7 +83,7 @@ Microsoft Azure - 2017 Philly Traces python main.py run-parts -x philly -w multitenant # Telemetry replay - python main.py run-parts -x philly -f /opt/data/philly/trace-data + python main.py run-parts -x philly -f /opt/data/philly/trace-data --start 2017-10-03T00:00 --end 2017-10-04T00:00 For Lumi diff --git a/raps/dataloaders/philly.py b/raps/dataloaders/philly.py index aa1a8ae..8747e16 100644 --- a/raps/dataloaders/philly.py +++ b/raps/dataloaders/philly.py @@ -43,41 +43,14 @@ def parse_timestamp(val): return None return None -def load_gpu_traces_by_dayXX(gpu_trace_dir, machine_ids, job_start_dt, job_end_dt): - """ - Load GPU utilization for specific machines and time range, - using preprocessed per-day CSVs (gpu_by_day/). - """ - dfs = [] - current = job_start_dt.date() - while current <= job_end_dt.date(): - day_file = os.path.join(gpu_trace_dir, f"{current}.csv") - if os.path.exists(day_file): - df = pd.read_csv( - day_file, - names=["time", "machineId", "gpu_util"], - parse_dates=["time"], - on_bad_lines="skip" - ) - df = df[df["machineId"].isin(machine_ids)] - df = df[(df["time"] >= job_start_dt) & (df["time"] <= job_end_dt)] - if not df.empty: - dfs.append(df) - current += timedelta(days=1) - - if dfs: - return pd.concat(dfs, ignore_index=True) - return pd.DataFrame(columns=["time", "machineId", "gpu_util"]) - - def load_gpu_traces_by_day(trace_dir, start_dt, end_dt): """Load GPU traces only for the days between start_dt and end_dt.""" - gpu_dir = os.path.join(trace_dir, "dist/gpu_by_day") frames = [] current = start_dt.date() + while current <= end_dt.date(): - daily_file = os.path.join(gpu_dir, f"{current}.csv") + daily_file = os.path.join(trace_dir, f"{current}.csv") if os.path.exists(daily_file): df = pd.read_csv( daily_file, @@ -114,6 +87,7 @@ def load_data(files, **kwargs): # extract --start from kwargs start_ts = to_epoch(kwargs.get("start", DEFAULT_START)) end_ts = to_epoch(kwargs.get("end", DEFAULT_END)) + assert len(files) == 1, "Expecting a single directory path" trace_dir = files[0] gpu_trace_dir = os.path.join(files[0], "dist", "gpu_by_day") @@ -153,15 +127,12 @@ def load_data(files, **kwargs): cpu_util["time"] = cpu_util["time"].apply(parse_date) # --- 3. GPU util --- - start_dt = datetime.fromtimestamp(start_ts, tz=timezone.utc) - end_dt = datetime.fromtimestamp(end_ts, tz=timezone.utc) + PDT = timezone(timedelta(hours=-7)) + start_dt = datetime.fromtimestamp(start_ts, tz=PDT) + end_dt = datetime.fromtimestamp(end_ts, tz=PDT) gpu_trace_dir = os.path.join(trace_dir, "dist", "gpu_by_day") - # For each job: - gpu_trace = load_gpu_traces_by_day(gpu_trace_dir, start_dt, end_dt) - job_gpu = load_gpu_traces_by_day(gpu_trace_dir, start_dt, end_dt) - # --- 4. Job log --- job_file = os.path.join(trace_dir, "cluster_job_log") with open(job_file) as f: @@ -241,31 +212,38 @@ def load_data(files, **kwargs): gpus += len(detail.get("gpus", [])) # CPU utilization traces - if machine_ids and start and end: - mask = ( - cpu_util["machine_id"].isin(machine_ids) & - (cpu_util["time"] >= start) & - (cpu_util["time"] <= end) - ) - job_cpu = cpu_util.loc[mask].copy() - - # Aggregate across machines if >1 machine - if len(machine_ids) > 1: - job_cpu = job_cpu.groupby("time")["cpu_util"].mean().reset_index() + #if machine_ids and start and end: + # mask = ( + # cpu_util["machine_id"].isin(machine_ids) & + # (cpu_util["time"] >= start) & + # (cpu_util["time"] <= end) + # ) + # job_cpu = cpu_util.loc[mask].copy() + # + # # Aggregate across machines if >1 machine + # if len(machine_ids) > 1: + # job_cpu = job_cpu.groupby("time")["cpu_util"].mean().reset_index() # --- absolute datetimes (used for filtering traces) --- submitted_dt = parse_timestamp(raw.get("submitted_time")) + print("***", machine_ids, start, end) + + gpu_trace = load_gpu_traces_by_day(gpu_trace_dir, start, end) + mask = ( (gpu_trace["machineId"].isin(machine_ids)) & - (gpu_trace["time"] >= start_dt) & - (gpu_trace["time"] <= end_dt) + (gpu_trace["time"] >= start) & + (gpu_trace["time"] <= end) ) - job_gpu = gpu_trace.loc[mask].copy() + #job_gpu = gpu_trace.loc[mask].copy() + #job_gpu_series = job_gpu["gpu_util"].tolist() + #job_gpu_series = (job_gpu["gpu_util"].to_numpy() * 0.01).tolist() + job_gpu_trace = (gpu_trace.loc[mask, "gpu_util"].to_numpy() * 0.01).tolist() - print(f" job_gpu shape after filtering: {job_gpu.shape}") - if job_gpu.empty: - print(" ⚠ No GPU rows matched this job") + #print(f" job_gpu shape after filtering: {job_gpu_trace.shape}") + #if job_gpu_trace.empty: + # print(" ⚠ No GPU rows matched this job") if machine_ids: # Shift times relative to start_ts @@ -301,7 +279,7 @@ def load_data(files, **kwargs): scheduled_nodes=scheduled_nodes, cpu_trace=0, - gpu_trace=job_gpu, + gpu_trace=job_gpu_trace, ntx_trace=None, nrx_trace=None, @@ -321,6 +299,8 @@ def load_data(files, **kwargs): jobs_list.append(Job(job)) print(job) + + exit() # Find max end timestamp across jobs #end_ts = max(j.end_time for j in jobs_list if j.end_time is not None) -- GitLab From 89a03fe92fd80c59f54de480ca249cb0e2334024 Mon Sep 17 00:00:00 2001 From: Wes Brewer Date: Mon, 29 Sep 2025 02:57:55 +0300 Subject: [PATCH 08/26] Remove the job_log[:1000] slice to use full job_log --- raps/dataloaders/philly.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/raps/dataloaders/philly.py b/raps/dataloaders/philly.py index 8747e16..8d6f1fe 100644 --- a/raps/dataloaders/philly.py +++ b/raps/dataloaders/philly.py @@ -171,7 +171,7 @@ def load_data(files, **kwargs): # --- Second pass: build jobs --- jobs_list = [] - for raw in tqdm(job_log[:1000], desc="Building Job objects"): + for raw in tqdm(job_log, desc="Building Job objects"): jobid = raw.get("jobid") user = raw.get("user") status = raw.get("status") -- GitLab From 273cd768c0f274002f0bec1c0e389f2b019993e7 Mon Sep 17 00:00:00 2001 From: Wes Brewer Date: Tue, 30 Sep 2025 00:40:51 +0300 Subject: [PATCH 09/26] Get gpu utilization fully working correctly --- raps/dataloaders/philly.py | 19 ++++++------------- 1 file changed, 6 insertions(+), 13 deletions(-) diff --git a/raps/dataloaders/philly.py b/raps/dataloaders/philly.py index 8d6f1fe..473f426 100644 --- a/raps/dataloaders/philly.py +++ b/raps/dataloaders/philly.py @@ -210,6 +210,9 @@ def load_data(files, **kwargs): mid = detail["ip"] machine_ids.append(mid) gpus += len(detail.get("gpus", [])) + + num_nodes = len(machine_ids) + gpus_per_node = gpus // num_nodes # CPU utilization traces #if machine_ids and start and end: @@ -227,8 +230,6 @@ def load_data(files, **kwargs): # --- absolute datetimes (used for filtering traces) --- submitted_dt = parse_timestamp(raw.get("submitted_time")) - print("***", machine_ids, start, end) - gpu_trace = load_gpu_traces_by_day(gpu_trace_dir, start, end) mask = ( @@ -236,14 +237,8 @@ def load_data(files, **kwargs): (gpu_trace["time"] >= start) & (gpu_trace["time"] <= end) ) - #job_gpu = gpu_trace.loc[mask].copy() - #job_gpu_series = job_gpu["gpu_util"].tolist() - #job_gpu_series = (job_gpu["gpu_util"].to_numpy() * 0.01).tolist() - job_gpu_trace = (gpu_trace.loc[mask, "gpu_util"].to_numpy() * 0.01).tolist() - - #print(f" job_gpu shape after filtering: {job_gpu_trace.shape}") - #if job_gpu_trace.empty: - # print(" ⚠ No GPU rows matched this job") + # Convert traces from percent to fraction of gpus_per_node, e.g., 8 gpus at 100% is 8, at 50% is 4, etc. + job_gpu_trace = (gpu_trace.loc[mask, "gpu_util"].to_numpy() * 0.01 * gpus_per_node).tolist() if machine_ids: # Shift times relative to start_ts @@ -271,9 +266,7 @@ def load_data(files, **kwargs): priority=0, cpu_cores_required=0, - gpu_units_required=gpus, - allocated_cpu_cores=0, - allocated_gpu_units=gpus, + gpu_units_required=gpus_per_node, end_state=status, scheduled_nodes=scheduled_nodes, -- GitLab From ad076e7a65147c334494ee5f8277c1269067dbc2 Mon Sep 17 00:00:00 2001 From: Wes Brewer Date: Tue, 30 Sep 2025 01:52:56 +0300 Subject: [PATCH 10/26] Get cpu traces also working (currently just single job test case) --- raps/dataloaders/philly.py | 110 +++++++++++++++++++++++++++++-------- 1 file changed, 88 insertions(+), 22 deletions(-) diff --git a/raps/dataloaders/philly.py b/raps/dataloaders/philly.py index 473f426..d8b065e 100644 --- a/raps/dataloaders/philly.py +++ b/raps/dataloaders/philly.py @@ -43,10 +43,9 @@ def parse_timestamp(val): return None return None -def load_gpu_traces_by_day(trace_dir, start_dt, end_dt): - """Load GPU traces only for the days between start_dt and end_dt.""" +def load_traces_by_day(trace_dir, start_dt, end_dt, colname): + """Load CPU or GPU traces between start_dt and end_dt.""" frames = [] - current = start_dt.date() while current <= end_dt.date(): @@ -54,19 +53,81 @@ def load_gpu_traces_by_day(trace_dir, start_dt, end_dt): if os.path.exists(daily_file): df = pd.read_csv( daily_file, - names=["time", "machineId", "gpu_util"], # no header in daily CSVs - parse_dates=["time"] + names=["time", "machineId", colname], # no header in daily CSVs + dtype={"machineId": str, colname: str}, # avoid DtypeWarning ) + + # Normalize time column (strip PST/PDT, parse datetime) + df["time"] = df["time"].str.replace(" PST", "").str.replace(" PDT", "") + df["time"] = pd.to_datetime(df["time"], errors="coerce", format=DATE_FORMAT_STR) + + # Convert util column to numeric (NA/invalid → NaN) + df[colname] = pd.to_numeric(df[colname], errors="coerce") + frames.append(df) else: print(f"⚠ No trace file for {current}") current += timedelta(days=1) if not frames: - return pd.DataFrame(columns=["time", "machineId", "gpu_util"]) + return pd.DataFrame(columns=["time", "machineId", colname]) return pd.concat(frames, ignore_index=True) +#def load_traces_by_day(trace_dir, start_dt, end_dt, colname): +# """Load CPU or GPU traces between start_dt and end_dt. +# +# Args: +# trace_dir (str): Directory containing daily CSV files. +# start_dt (datetime): Start datetime. +# end_dt (datetime): End datetime. +# colname (str): Name of the utilization column (e.g., 'cpu_util' or 'gpu_util'). +# """ +# frames = [] +# current = start_dt.date() +# +# while current <= end_dt.date(): +# daily_file = os.path.join(trace_dir, f"{current}.csv") +# if os.path.exists(daily_file): +# df = pd.read_csv( +# daily_file, +# names=["time", "machineId", colname], # no header in daily CSVs +# parse_dates=["time"] +# ) +# frames.append(df) +# else: +# print(f"⚠ No trace file for {current}") +# current += timedelta(days=1) +# +# if not frames: +# return pd.DataFrame(columns=["time", "machineId", colname]) +# +# return pd.concat(frames, ignore_index=True) + +#def load_gpu_traces_by_day(trace_dir, start_dt, end_dt): +# """Load GPU traces only for the days between start_dt and end_dt.""" +# frames = [] +# +# current = start_dt.date() +# +# while current <= end_dt.date(): +# daily_file = os.path.join(trace_dir, f"{current}.csv") +# if os.path.exists(daily_file): +# df = pd.read_csv( +# daily_file, +# names=["time", "machineId", "gpu_util"], # no header in daily CSVs +# parse_dates=["time"] +# ) +# frames.append(df) +# else: +# print(f"⚠ No trace file for {current}") +# current += timedelta(days=1) +# +# if not frames: +# return pd.DataFrame(columns=["time", "machineId", "gpu_util"]) +# +# return pd.concat(frames, ignore_index=True) + def parse_date(s): if not s or s == "None": return None @@ -131,6 +192,7 @@ def load_data(files, **kwargs): start_dt = datetime.fromtimestamp(start_ts, tz=PDT) end_dt = datetime.fromtimestamp(end_ts, tz=PDT) + cpu_trace_dir = os.path.join(trace_dir, "dist", "cpu_by_day") gpu_trace_dir = os.path.join(trace_dir, "dist", "gpu_by_day") # --- 4. Job log --- @@ -214,23 +276,27 @@ def load_data(files, **kwargs): num_nodes = len(machine_ids) gpus_per_node = gpus // num_nodes - # CPU utilization traces - #if machine_ids and start and end: - # mask = ( - # cpu_util["machine_id"].isin(machine_ids) & - # (cpu_util["time"] >= start) & - # (cpu_util["time"] <= end) - # ) - # job_cpu = cpu_util.loc[mask].copy() - # - # # Aggregate across machines if >1 machine - # if len(machine_ids) > 1: - # job_cpu = job_cpu.groupby("time")["cpu_util"].mean().reset_index() - # --- absolute datetimes (used for filtering traces) --- submitted_dt = parse_timestamp(raw.get("submitted_time")) - gpu_trace = load_gpu_traces_by_day(gpu_trace_dir, start, end) + # CPU utilization traces + cpu_trace = load_traces_by_day(cpu_trace_dir, start, end, "cpu_util") + + mask = ( + (cpu_trace["machineId"].isin(machine_ids)) & + (cpu_trace["time"] >= start) & + (cpu_trace["time"] <= end) + ) + job_cpu = cpu_trace.loc[mask].copy() + + # Aggregate across machines if >1 machine + if len(machine_ids) > 1: + job_cpu = job_cpu.groupby("time")["cpu_util"].mean().reset_index() + + job_cpu_trace = (job_cpu["cpu_util"].to_numpy() * 0.01).tolist() + + # Extract GPU utilization traces + gpu_trace = load_traces_by_day(gpu_trace_dir, start, end, "gpu_util") mask = ( (gpu_trace["machineId"].isin(machine_ids)) & @@ -265,13 +331,13 @@ def load_data(files, **kwargs): partition=partition_id, priority=0, - cpu_cores_required=0, + cpu_cores_required=1, gpu_units_required=gpus_per_node, end_state=status, scheduled_nodes=scheduled_nodes, - cpu_trace=0, + cpu_trace=job_cpu_trace, gpu_trace=job_gpu_trace, ntx_trace=None, nrx_trace=None, -- GitLab From a83a5aaa287e327adcb47226da4775c41ad11f54 Mon Sep 17 00:00:00 2001 From: Wes Brewer Date: Tue, 30 Sep 2025 02:22:18 +0300 Subject: [PATCH 11/26] A few more fixes and cleanups to try to get working for a single day --- raps/dataloaders/philly.py | 85 +++++++++++--------------------------- 1 file changed, 23 insertions(+), 62 deletions(-) diff --git a/raps/dataloaders/philly.py b/raps/dataloaders/philly.py index d8b065e..840f566 100644 --- a/raps/dataloaders/philly.py +++ b/raps/dataloaders/philly.py @@ -74,60 +74,6 @@ def load_traces_by_day(trace_dir, start_dt, end_dt, colname): return pd.concat(frames, ignore_index=True) -#def load_traces_by_day(trace_dir, start_dt, end_dt, colname): -# """Load CPU or GPU traces between start_dt and end_dt. -# -# Args: -# trace_dir (str): Directory containing daily CSV files. -# start_dt (datetime): Start datetime. -# end_dt (datetime): End datetime. -# colname (str): Name of the utilization column (e.g., 'cpu_util' or 'gpu_util'). -# """ -# frames = [] -# current = start_dt.date() -# -# while current <= end_dt.date(): -# daily_file = os.path.join(trace_dir, f"{current}.csv") -# if os.path.exists(daily_file): -# df = pd.read_csv( -# daily_file, -# names=["time", "machineId", colname], # no header in daily CSVs -# parse_dates=["time"] -# ) -# frames.append(df) -# else: -# print(f"⚠ No trace file for {current}") -# current += timedelta(days=1) -# -# if not frames: -# return pd.DataFrame(columns=["time", "machineId", colname]) -# -# return pd.concat(frames, ignore_index=True) - -#def load_gpu_traces_by_day(trace_dir, start_dt, end_dt): -# """Load GPU traces only for the days between start_dt and end_dt.""" -# frames = [] -# -# current = start_dt.date() -# -# while current <= end_dt.date(): -# daily_file = os.path.join(trace_dir, f"{current}.csv") -# if os.path.exists(daily_file): -# df = pd.read_csv( -# daily_file, -# names=["time", "machineId", "gpu_util"], # no header in daily CSVs -# parse_dates=["time"] -# ) -# frames.append(df) -# else: -# print(f"⚠ No trace file for {current}") -# current += timedelta(days=1) -# -# if not frames: -# return pd.DataFrame(columns=["time", "machineId", "gpu_util"]) -# -# return pd.concat(frames, ignore_index=True) - def parse_date(s): if not s or s == "None": return None @@ -188,9 +134,11 @@ def load_data(files, **kwargs): cpu_util["time"] = cpu_util["time"].apply(parse_date) # --- 3. GPU util --- - PDT = timezone(timedelta(hours=-7)) - start_dt = datetime.fromtimestamp(start_ts, tz=PDT) - end_dt = datetime.fromtimestamp(end_ts, tz=PDT) + #PDT = timezone(timedelta(hours=-7)) + #start_dt = datetime.fromtimestamp(start_ts, tz=PDT) + #end_dt = datetime.fromtimestamp(end_ts, tz=PDT) + start_dt = datetime.fromtimestamp(start_ts) # naive datetime + end_dt = datetime.fromtimestamp(end_ts) cpu_trace_dir = os.path.join(trace_dir, "dist", "cpu_by_day") gpu_trace_dir = os.path.join(trace_dir, "dist", "gpu_by_day") @@ -279,8 +227,15 @@ def load_data(files, **kwargs): # --- absolute datetimes (used for filtering traces) --- submitted_dt = parse_timestamp(raw.get("submitted_time")) + # Clamp to global CLI window - this should be fixed later to include the actual + # trace start and end times (trace_start_time? and trace_end_time?) + #job_start = max(start, start_dt) if start else start_dt + #job_end = min(end, end_dt) if end else end_dt + job_start = start + job_end = end + # CPU utilization traces - cpu_trace = load_traces_by_day(cpu_trace_dir, start, end, "cpu_util") + cpu_trace = load_traces_by_day(cpu_trace_dir, job_start, job_end, "cpu_util") mask = ( (cpu_trace["machineId"].isin(machine_ids)) & @@ -293,10 +248,11 @@ def load_data(files, **kwargs): if len(machine_ids) > 1: job_cpu = job_cpu.groupby("time")["cpu_util"].mean().reset_index() + # Convert from percentage to fraction job_cpu_trace = (job_cpu["cpu_util"].to_numpy() * 0.01).tolist() # Extract GPU utilization traces - gpu_trace = load_traces_by_day(gpu_trace_dir, start, end, "gpu_util") + gpu_trace = load_traces_by_day(gpu_trace_dir, job_start, job_end, "gpu_util") mask = ( (gpu_trace["machineId"].isin(machine_ids)) & @@ -304,7 +260,14 @@ def load_data(files, **kwargs): (gpu_trace["time"] <= end) ) # Convert traces from percent to fraction of gpus_per_node, e.g., 8 gpus at 100% is 8, at 50% is 4, etc. - job_gpu_trace = (gpu_trace.loc[mask, "gpu_util"].to_numpy() * 0.01 * gpus_per_node).tolist() + job_gpu = gpu_trace.loc[mask].copy() + + # Aggregate across machines if >1 machine + if len(machine_ids) > 1: + job_gpu = job_gpu.groupby("time")["gpu_util"].mean().reset_index() + + job_gpu_trace = (job_gpu["gpu_util"].to_numpy() * 0.01 * gpus_per_node).tolist() + if machine_ids: # Shift times relative to start_ts @@ -359,8 +322,6 @@ def load_data(files, **kwargs): print(job) - exit() - # Find max end timestamp across jobs #end_ts = max(j.end_time for j in jobs_list if j.end_time is not None) end_ts = 3600 -- GitLab From f709929b5a8ce9805b755d42cc7e4a385f4ac9b8 Mon Sep 17 00:00:00 2001 From: Wes Brewer Date: Tue, 30 Sep 2025 02:25:15 +0300 Subject: [PATCH 12/26] Add scripts/parse_philly_traces.py --- scripts/parse_philly_traces.py | 63 ++++++++++++++++++++++++++++++++++ 1 file changed, 63 insertions(+) create mode 100644 scripts/parse_philly_traces.py diff --git a/scripts/parse_philly_traces.py b/scripts/parse_philly_traces.py new file mode 100644 index 0000000..5693e4e --- /dev/null +++ b/scripts/parse_philly_traces.py @@ -0,0 +1,63 @@ +import os +import sys +from datetime import datetime +from tqdm import tqdm + +if len(sys.argv) < 2: + print("Usage: python parse_by_day.py ") + sys.exit(1) + +input_file = sys.argv[1] + +with open(input_file) as f: + total_lines = sum(1 for _ in f) - 1 + +with open(input_file) as f: + header = f.readline().strip().split(",") + print("Header:", header) + + # detect file type from header + is_cpu = "cpu_util" in [h.lower() for h in header] + + # pick output dir name based on file type + output_dir = "cpu_by_day" if is_cpu else "gpu_by_day" + os.makedirs(output_dir, exist_ok=True) + + #for i, line in enumerate(f, 1): + for line in tqdm(f, total=total_lines, desc="Processing lines"): + parts = line.strip().split(",") + + if len(parts) < 3: + continue + + raw_time = parts[0].replace(" PST", "").replace(" PDT", "") + try: + ts = datetime.strptime(raw_time, "%Y-%m-%d %H:%M:%S") + except ValueError: + continue + + machine_id = parts[1] + + if is_cpu: + try: + value = float(parts[2]) + except ValueError: + value = 0.0 + label = "cpu_util" + else: + utils = [] + for v in parts[2:]: + try: + utils.append(float(v)) + except ValueError: + pass + value = sum(utils) / max(1, len([u for u in utils if u > 0])) + label = "gpu_util" + + day_str = ts.strftime("%Y-%m-%d") + out_path = os.path.join(output_dir, f"{day_str}.csv") + + with open(out_path, "a") as out: + if out.tell() == 0: # only write header if file is new + out.write(f"time,machine_id,{label}\n") + out.write(f"{ts},{machine_id},{value:.3f}\n") -- GitLab From 9512db297653fe800b1c1b4af79221569ec81de5 Mon Sep 17 00:00:00 2001 From: Wes Brewer Date: Tue, 30 Sep 2025 23:41:37 +0300 Subject: [PATCH 13/26] Add configs for perlmutter and selene --- config/perlmutter.yaml | 51 ++++++++++++++++++++++++++++++++++++++++++ config/selene.yaml | 51 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 102 insertions(+) create mode 100644 config/perlmutter.yaml create mode 100644 config/selene.yaml diff --git a/config/perlmutter.yaml b/config/perlmutter.yaml new file mode 100644 index 0000000..8863a36 --- /dev/null +++ b/config/perlmutter.yaml @@ -0,0 +1,51 @@ +system: + num_cdus: 36 + racks_per_cdu: 3 + nodes_per_rack: 128 + rectifiers_per_rack: 32 + chassis_per_rack: 8 + nodes_per_blade: 2 + switches_per_chassis: 4 + nics_per_node: 4 + rectifiers_per_chassis: 4 + nodes_per_rectifier: 4 + missing_racks: [] + down_nodes: [] + cpus_per_node: 1 + gpus_per_node: 4 + cpu_peak_flops: 3580000000000.0 + gpu_peak_flops: 9700000000000.0 + cpu_fp_ratio: 0.667 + gpu_fp_ratio: 0.667 +power: + power_gpu_idle: 88 + power_gpu_max: 300 + power_cpu_idle: 90 + power_cpu_max: 280 + power_mem: 74.26 + power_nic: 20 + power_nvme: 30 + power_switch: 250 + power_cdu: 8473.47 + power_update_freq: 15 + rectifier_peak_threshold: 13670 + sivoc_loss_constant: 13 + sivoc_efficiency: 0.98 + rectifier_loss_constant: 17 + rectifier_efficiency: 0.96 + power_cost: 0.094 +scheduler: + seed: 42 + job_arrival_time: 900 + mtbf: 11 + trace_quanta: 15 + min_wall_time: 3600 + max_wall_time: 43200 + ui_update_freq: 900 + max_nodes_per_job: 3000 + job_end_probs: + COMPLETED: 0.63 + FAILED: 0.13 + CANCELLED: 0.12 + TIMEOUT: 0.11 + NODE_FAIL: 0.01 diff --git a/config/selene.yaml b/config/selene.yaml new file mode 100644 index 0000000..8f42bf6 --- /dev/null +++ b/config/selene.yaml @@ -0,0 +1,51 @@ +system: + num_cdus: 20 + racks_per_cdu: 7 + nodes_per_rack: 4 + rectifiers_per_rack: 32 + chassis_per_rack: 4 + nodes_per_blade: 2 + switches_per_chassis: 4 + nics_per_node: 4 + rectifiers_per_chassis: 4 + nodes_per_rectifier: 4 + missing_racks: [] + down_nodes: [] + cpus_per_node: 2 + gpus_per_node: 8 + cpu_peak_flops: 3481000000000.0 + gpu_peak_flops: 624000000000000.0 # BF8 performance + cpu_fp_ratio: 0.667 + gpu_fp_ratio: 0.667 +power: + power_gpu_idle: 88 + power_gpu_max: 400 + power_cpu_idle: 90 + power_cpu_max: 280 + power_mem: 74.26 + power_nic: 20 + power_nvme: 30 + power_switch: 250 + power_cdu: 8473.47 + power_update_freq: 15 + rectifier_peak_threshold: 13670 + sivoc_loss_constant: 13 + sivoc_efficiency: 0.98 + rectifier_loss_constant: 17 + rectifier_efficiency: 0.96 + power_cost: 0.094 +scheduler: + seed: 42 + job_arrival_time: 900 + mtbf: 11 + trace_quanta: 15 + min_wall_time: 3600 + max_wall_time: 43200 + ui_update_freq: 900 + max_nodes_per_job: 3000 + job_end_probs: + COMPLETED: 0.63 + FAILED: 0.13 + CANCELLED: 0.12 + TIMEOUT: 0.11 + NODE_FAIL: 0.01 -- GitLab From 26912590eae6f002c8e69bade78439d264d9226f Mon Sep 17 00:00:00 2001 From: Wes Brewer Date: Wed, 1 Oct 2025 17:58:54 +0300 Subject: [PATCH 14/26] Fix issue with end time of simulation for philly traces --- raps/dataloaders/philly.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/raps/dataloaders/philly.py b/raps/dataloaders/philly.py index 840f566..636d943 100644 --- a/raps/dataloaders/philly.py +++ b/raps/dataloaders/philly.py @@ -321,10 +321,15 @@ def load_data(files, **kwargs): jobs_list.append(Job(job)) print(job) - - # Find max end timestamp across jobs - #end_ts = max(j.end_time for j in jobs_list if j.end_time is not None) - end_ts = 3600 + +# if len(jobs_list) >= 5: +# break + + # Find max end timestamp across jobs, relative to first job + end_ts = max(j.end_time for j in jobs_list if j.end_time is not None) + + # Absolute end_ts + end_ts = start_ts + end_ts return WorkloadData( jobs=jobs_list, -- GitLab From 02506f01196d9e037dccda665c45f0b7f3a6a969 Mon Sep 17 00:00:00 2001 From: Wes Brewer Date: Wed, 1 Oct 2025 19:43:02 +0300 Subject: [PATCH 15/26] Got philly working and scheduling on both partitions, but only by setting start_time to 0 --- raps/dataloaders/philly.py | 33 +++++++++++++++++++++------------ 1 file changed, 21 insertions(+), 12 deletions(-) diff --git a/raps/dataloaders/philly.py b/raps/dataloaders/philly.py index 636d943..247d967 100644 --- a/raps/dataloaders/philly.py +++ b/raps/dataloaders/philly.py @@ -274,6 +274,9 @@ def load_data(files, **kwargs): submit_time = submitted.timestamp() - start_ts if submitted else None start_time = start.timestamp() - start_ts if start else None end_time = end.timestamp() - start_ts if end else None + #submit_time = submitted.timestamp() + #start_time = start.timestamp() + #end_time = end.timestamp() if not submit_time or not start_time or not end_time: warnings.warn( @@ -305,31 +308,37 @@ def load_data(files, **kwargs): ntx_trace=None, nrx_trace=None, - submit_time=submit_time, - start_time=start_time, + submit_time=0, #submit_time, + start_time=0, #start_time, end_time=end_time, - time_limit=0, + time_limit=end_time, #0, expected_run_time=wall_time if wall_time else 0, current_run_time=0, trace_time=None, - trace_start_time=None, - trace_end_time=None, - trace_quanta=None, + trace_start_time=0, #None, + trace_end_time=end_time, #None, + trace_quanta=60, trace_missing_values=False, downscale=1 ) - jobs_list.append(Job(job)) + if job_cpu_trace and job_gpu_trace: + jobs_list.append(Job(job)) - print(job) + #print(job) + print(start_ts, job["start_time"], job["end_time"]) -# if len(jobs_list) >= 5: -# break + if len(jobs_list) >= 20: + break # Find max end timestamp across jobs, relative to first job - end_ts = max(j.end_time for j in jobs_list if j.end_time is not None) + #start_ts = min(j.start_time for j in jobs_list if j.start_time is not None) + #end_ts = max(j.end_time for j in jobs_list if j.end_time is not None) # Absolute end_ts - end_ts = start_ts + end_ts + #end_ts = start_ts + end_ts + end_ts = start_ts + 43200 + + print("***", start_ts, end_ts) return WorkloadData( jobs=jobs_list, -- GitLab From 80b299962648ea65569f5b0f0dddf059ea24620e Mon Sep 17 00:00:00 2001 From: Wes Brewer Date: Wed, 1 Oct 2025 20:58:26 +0300 Subject: [PATCH 16/26] Get it fully working now with proper start times (note manually implemented 2baf2b1b here) --- raps/dataloaders/philly.py | 62 ++++++++++++++++++++-------------- raps/engine.py | 69 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 106 insertions(+), 25 deletions(-) diff --git a/raps/dataloaders/philly.py b/raps/dataloaders/philly.py index 247d967..a0cc2af 100644 --- a/raps/dataloaders/philly.py +++ b/raps/dataloaders/philly.py @@ -134,10 +134,7 @@ def load_data(files, **kwargs): cpu_util["time"] = cpu_util["time"].apply(parse_date) # --- 3. GPU util --- - #PDT = timezone(timedelta(hours=-7)) - #start_dt = datetime.fromtimestamp(start_ts, tz=PDT) - #end_dt = datetime.fromtimestamp(end_ts, tz=PDT) - start_dt = datetime.fromtimestamp(start_ts) # naive datetime + start_dt = datetime.fromtimestamp(start_ts) end_dt = datetime.fromtimestamp(end_ts) cpu_trace_dir = os.path.join(trace_dir, "dist", "cpu_by_day") @@ -148,6 +145,20 @@ def load_data(files, **kwargs): with open(job_file) as f: job_log = json.load(f) + # --- First pass: filter jobs by date range --- + filtered_log = [] + for raw in job_log: + submitted = raw.get("submitted_time") + if submitted is None or submitted == "None": + continue + if isinstance(submitted, (int, float)): + submitted_dt = datetime.fromtimestamp(int(submitted)) + else: + submitted_dt = parse_date(submitted) + if submitted_dt and start_dt <= submitted_dt <= end_dt: + filtered_log.append(raw) + job_log = filtered_log + # Filter job_log to only jobs matching the partition's gpus_per_node if gpus_per_node is not None: filtered_log = [] @@ -162,6 +173,15 @@ def load_data(files, **kwargs): # --- First pass: find earliest submit time --- start_ts = None + + ### debug + print("num jobs found", len(job_log)) + for job in job_log[:100]: + print(f"Job {job['jobid']}:") + for attempt in job["attempts"]: + print(" Start:", attempt["start_time"]) + ### end debug + for raw in job_log: submitted = raw.get("submitted_time") if submitted is None or submitted == "None": @@ -176,6 +196,9 @@ def load_data(files, **kwargs): if start_ts is None or t < start_ts: start_ts = t + # debug + print(f"Job {job['jobid']}: submit_time {submitted}, start_ts: {start_ts}") + if start_ts is None: raise ValueError("No valid submitted_time found in Philly traces") @@ -222,6 +245,8 @@ def load_data(files, **kwargs): gpus += len(detail.get("gpus", [])) num_nodes = len(machine_ids) + if num_nodes == 0: + continue gpus_per_node = gpus // num_nodes # --- absolute datetimes (used for filtering traces) --- @@ -274,15 +299,13 @@ def load_data(files, **kwargs): submit_time = submitted.timestamp() - start_ts if submitted else None start_time = start.timestamp() - start_ts if start else None end_time = end.timestamp() - start_ts if end else None - #submit_time = submitted.timestamp() - #start_time = start.timestamp() - #end_time = end.timestamp() if not submit_time or not start_time or not end_time: warnings.warn( f"skipped {jobid} b/c missing submit_time, start_time, or end_time", UserWarning ) + continue scheduled_nodes = [node_mapping[mid] for mid in machine_ids if mid in node_mapping] @@ -308,10 +331,11 @@ def load_data(files, **kwargs): ntx_trace=None, nrx_trace=None, - submit_time=0, #submit_time, - start_time=0, #start_time, + submit_time=submit_time, + start_time=start_time, end_time=end_time, - time_limit=end_time, #0, + #time_limit=end_time - start_time, + time_limit=end_time, expected_run_time=wall_time if wall_time else 0, current_run_time=0, trace_time=None, @@ -323,22 +347,10 @@ def load_data(files, **kwargs): ) if job_cpu_trace and job_gpu_trace: jobs_list.append(Job(job)) + else: + tqdm.write(f"skipping {job['id']} b/c either no cpu or gpu trace") - #print(job) - print(start_ts, job["start_time"], job["end_time"]) - - if len(jobs_list) >= 20: - break - - # Find max end timestamp across jobs, relative to first job - #start_ts = min(j.start_time for j in jobs_list if j.start_time is not None) - #end_ts = max(j.end_time for j in jobs_list if j.end_time is not None) - - # Absolute end_ts - #end_ts = start_ts + end_ts - end_ts = start_ts + 43200 - - print("***", start_ts, end_ts) + tqdm.write(f"abs start time: {start_ts} rel job start: {job['start_time']} rel job end: {job['end_time']}") return WorkloadData( jobs=jobs_list, diff --git a/raps/engine.py b/raps/engine.py index 67cd999..6e3ad19 100644 --- a/raps/engine.py +++ b/raps/engine.py @@ -340,6 +340,75 @@ class Engine: start_date=self.start, ) + if sim_config.live and not sim_config.replay: + td = Telemetry(**sim_config_dict) + workload_data = td.load_from_live_system() + elif sim_config.replay: + # TODO: this will have issues if running separate systems or custom systems + partition_short = partition.split("/")[-1] if partition else None + td = Telemetry( + **sim_config_dict, + partition=partition, + ) + if partition: + snap_map = {p.stem: p for p in sim_config.replay[0].glob("*.npz")} + if len(snap_map) > 0: + if partition_short not in snap_map: + raise RuntimeError(f"Snapshot '{partition_short}.npz' not in {sim_config.replay[0]}") + replay_files = [snap_map[partition_short]] + else: + replay_files = sim_config.replay + else: + replay_files = sim_config.replay + + workload_data = td.load_from_files(replay_files) + else: # Synthetic jobs + wl = Workload(sim_config_args, system_config_dict) + workload_data = wl.generate_jobs() + td = Telemetry(**sim_config_dict) + + jobs = workload_data.jobs + + # TODO refactor how stat/end/fastforward/time work + if sim_config.fastforward is not None: + workload_data.telemetry_start = workload_data.telemetry_start + sim_config.fastforward + + if sim_config.time is not None: + workload_data.telemetry_end = workload_data.telemetry_start + sim_config.time + + if sim_config.time_delta is not None: + time_delta = sim_config.time_delta + else: + time_delta = 1 + + if sim_config.continuous_job_generation: + continuous_workload = wl + else: + continuous_workload = None + + accounts = None + if sim_config.accounts: + job_accounts = Accounts(jobs) + if sim_config.accounts_json: + loaded_accounts = Accounts.from_json_filename(sim_config.accounts_json) + accounts = Accounts.merge(loaded_accounts, job_accounts) + else: + accounts = job_accounts + + engine = Engine( + power_manager=power_manager, + flops_manager=flops_manager, + cooling_model=cooling_model, + continuous_workload=continuous_workload, + jobs=jobs, + accounts=accounts, + telemetry=td, + sim_config=sim_config, + system_config=system_config, + ) + + return engine, workload_data, time_delta + def add_running_jobs_to_queue(self, jobs_to_submit: List): """ Modifies jobs_to_submit and self.queue -- GitLab From cdff947d5fc3198a1dc00c887f38979475742b81 Mon Sep 17 00:00:00 2001 From: Wes Brewer Date: Wed, 1 Oct 2025 22:40:32 +0300 Subject: [PATCH 17/26] Add docstring to top of file and work on dataloader performance enhancements --- raps/dataloaders/philly.py | 190 +++++++++++++++++++++++++------------ 1 file changed, 128 insertions(+), 62 deletions(-) diff --git a/raps/dataloaders/philly.py b/raps/dataloaders/philly.py index a0cc2af..990a72f 100644 --- a/raps/dataloaders/philly.py +++ b/raps/dataloaders/philly.py @@ -1,9 +1,75 @@ +""" +Main reference to Philly traces: + + Jeon, Myeongjae, et al. "Analysis of Large-Scale Multi-Tenant GPU clusters for DNN training workloads." + 2019 USENIX Annual Technical Conference (USENIX ATC 19). 2019. + https://www.usenix.org/system/files/atc19-jeon.pdf + +The repository is available here: + + https://github.com/msr-fiddle/philly-traces + +The data portion of the repo can be downloaded using one of the following methods: + + git clone https://github.com/msr-fiddle/philly-traces.git + cd philly-traces + git lfs pull + + wget https://github.com/msr-fiddle/philly-traces/raw/master/trace-data.tar.gz + + curl -L -o trace-data.tar.gz https://github.com/msr-fiddle/philly-traces/raw/master/trace-data.tar.gz + +Once the file is downloaded, assuming its in /opt/data/philly/trace-data directory: + + /opt/data/philly/trace-data/trace-data.tar.gz + + cd /opt/data/philly/trace-data + + run `tar xvfz trace-data.tar.gz` which will unpack the following files: + + cluster_cpu_util 1.5G + cluster_gpu_util 2.8G + cluster_mem_util 2.2G + cluster_job_log 37M + cluster_machine_list 8K + + then run the following: + + python /path/to/raps/scripts/parse_philly_traces.py cluster_cpu_util + python /path/to/raps/scripts/parse_philly_traces.py cluster_gpu_util + + this will parse these two files into two directories, cpu_by_day and gpu_by_day, + creating one file for each day and adding the lines for that day into the files. + + sanity checks: + + wc -l cluster_cpu_util + 45028261 cluster_cpu_util + wc -l cpu_by_day/*.csv + 45350898 total + + wc -l cluster_gpu_util + 44750641 cluster_gpu_util + wc -l gpu_by_day/*.csv + 44750640 total + +Running a replay simulation: + + python main.py run-parts -x philly -f /opt/data/philly/trace-data --start 2017-10-03T00:00 --end 2017-10-04T00:00 + +One the dataloader has been run at least once, it will dump npz files into a directory, so +they can be replayed again without having to go through the expensive extractoin process, using e.g.: + + python main.py run-parts -x philly -f raps-output-5efefa3 + +Note: it is possible to run simulations for an user-defined length of time between 10/3/2017 to 12/15/2017. + +""" import os import glob import json import csv import pandas as pd -import warnings from datetime import datetime, timezone, timedelta from tqdm import tqdm @@ -45,7 +111,7 @@ def parse_timestamp(val): def load_traces_by_day(trace_dir, start_dt, end_dt, colname): """Load CPU or GPU traces between start_dt and end_dt.""" - frames = [] + traces = {} current = start_dt.date() while current <= end_dt.date(): @@ -64,15 +130,15 @@ def load_traces_by_day(trace_dir, start_dt, end_dt, colname): # Convert util column to numeric (NA/invalid → NaN) df[colname] = pd.to_numeric(df[colname], errors="coerce") - frames.append(df) + traces[current] = df else: print(f"⚠ No trace file for {current}") current += timedelta(days=1) - if not frames: - return pd.DataFrame(columns=["time", "machineId", colname]) + if not traces: + return {} - return pd.concat(frames, ignore_index=True) + return traces def parse_date(s): if not s or s == "None": @@ -91,13 +157,16 @@ def load_data(files, **kwargs): Returns: list[Job] """ + debug = kwargs.get("debug") + print("started reading of philly traces... please be patient...", flush=True) + # extract --start from kwargs start_ts = to_epoch(kwargs.get("start", DEFAULT_START)) end_ts = to_epoch(kwargs.get("end", DEFAULT_END)) assert len(files) == 1, "Expecting a single directory path" trace_dir = files[0] - gpu_trace_dir = os.path.join(files[0], "dist", "gpu_by_day") + gpu_trace_dir = os.path.join(files[0], "gpu_by_day") config = kwargs.get('config') gpus_per_node = config.get("GPUS_PER_NODE") if gpus_per_node is None: @@ -127,19 +196,10 @@ def load_data(files, **kwargs): # Assign partition ID (e.g. 0 for 2-GPU, 1 for 8-GPU) partition_id = 0 if gpus_per_node == 2 else 1 - # --- 2. CPU util --- - cpu_file = os.path.join(trace_dir, "cluster_cpu_util") - cpu_util = pd.read_csv(cpu_file) - cpu_util["time"] = cpu_util["time"].str.replace(" PST","").str.replace(" PDT","") - cpu_util["time"] = cpu_util["time"].apply(parse_date) - # --- 3. GPU util --- start_dt = datetime.fromtimestamp(start_ts) end_dt = datetime.fromtimestamp(end_ts) - cpu_trace_dir = os.path.join(trace_dir, "dist", "cpu_by_day") - gpu_trace_dir = os.path.join(trace_dir, "dist", "gpu_by_day") - # --- 4. Job log --- job_file = os.path.join(trace_dir, "cluster_job_log") with open(job_file) as f: @@ -174,14 +234,6 @@ def load_data(files, **kwargs): # --- First pass: find earliest submit time --- start_ts = None - ### debug - print("num jobs found", len(job_log)) - for job in job_log[:100]: - print(f"Job {job['jobid']}:") - for attempt in job["attempts"]: - print(" Start:", attempt["start_time"]) - ### end debug - for raw in job_log: submitted = raw.get("submitted_time") if submitted is None or submitted == "None": @@ -196,12 +248,15 @@ def load_data(files, **kwargs): if start_ts is None or t < start_ts: start_ts = t - # debug - print(f"Job {job['jobid']}: submit_time {submitted}, start_ts: {start_ts}") - if start_ts is None: raise ValueError("No valid submitted_time found in Philly traces") + # --- Pre-load all traces for the given date range --- + cpu_trace_dir = os.path.join(trace_dir, "cpu_by_day") + gpu_trace_dir = os.path.join(trace_dir, "gpu_by_day") + all_cpu_traces = load_traces_by_day(cpu_trace_dir, start_dt, end_dt, "cpu_util") + all_gpu_traces = load_traces_by_day(gpu_trace_dir, start_dt, end_dt, "gpu_util") + # --- Second pass: build jobs --- jobs_list = [] for raw in tqdm(job_log, desc="Building Job objects"): @@ -252,46 +307,59 @@ def load_data(files, **kwargs): # --- absolute datetimes (used for filtering traces) --- submitted_dt = parse_timestamp(raw.get("submitted_time")) - # Clamp to global CLI window - this should be fixed later to include the actual - # trace start and end times (trace_start_time? and trace_end_time?) - #job_start = max(start, start_dt) if start else start_dt - #job_end = min(end, end_dt) if end else end_dt job_start = start job_end = end - # CPU utilization traces - cpu_trace = load_traces_by_day(cpu_trace_dir, job_start, job_end, "cpu_util") + if not job_start or not job_end: + continue - mask = ( - (cpu_trace["machineId"].isin(machine_ids)) & - (cpu_trace["time"] >= start) & - (cpu_trace["time"] <= end) - ) - job_cpu = cpu_trace.loc[mask].copy() + # --- CPU utilization traces --- + cpu_dfs = [] + current_date = job_start.date() + while current_date <= job_end.date(): + if current_date in all_cpu_traces: + cpu_dfs.append(all_cpu_traces[current_date]) + current_date += timedelta(days=1) - # Aggregate across machines if >1 machine - if len(machine_ids) > 1: - job_cpu = job_cpu.groupby("time")["cpu_util"].mean().reset_index() + if not cpu_dfs: + job_cpu_trace = [] + else: + job_cpu_df = pd.concat(cpu_dfs, ignore_index=True) + mask = ( + (job_cpu_df["machineId"].isin(machine_ids)) & + (job_cpu_df["time"] >= start) & + (job_cpu_df["time"] <= end) + ) + job_cpu = job_cpu_df.loc[mask].copy() - # Convert from percentage to fraction - job_cpu_trace = (job_cpu["cpu_util"].to_numpy() * 0.01).tolist() + if len(machine_ids) > 1: + job_cpu = job_cpu.groupby("time")["cpu_util"].mean().reset_index() - # Extract GPU utilization traces - gpu_trace = load_traces_by_day(gpu_trace_dir, job_start, job_end, "gpu_util") + job_cpu_trace = (job_cpu["cpu_util"].to_numpy() * 0.01).tolist() - mask = ( - (gpu_trace["machineId"].isin(machine_ids)) & - (gpu_trace["time"] >= start) & - (gpu_trace["time"] <= end) - ) - # Convert traces from percent to fraction of gpus_per_node, e.g., 8 gpus at 100% is 8, at 50% is 4, etc. - job_gpu = gpu_trace.loc[mask].copy() + # --- GPU utilization traces --- + gpu_dfs = [] + current_date = job_start.date() + while current_date <= job_end.date(): + if current_date in all_gpu_traces: + gpu_dfs.append(all_gpu_traces[current_date]) + current_date += timedelta(days=1) - # Aggregate across machines if >1 machine - if len(machine_ids) > 1: - job_gpu = job_gpu.groupby("time")["gpu_util"].mean().reset_index() + if not gpu_dfs: + job_gpu_trace = [] + else: + job_gpu_df = pd.concat(gpu_dfs, ignore_index=True) + mask = ( + (job_gpu_df["machineId"].isin(machine_ids)) & + (job_gpu_df["time"] >= start) & + (job_gpu_df["time"] <= end) + ) + job_gpu = job_gpu_df.loc[mask].copy() - job_gpu_trace = (job_gpu["gpu_util"].to_numpy() * 0.01 * gpus_per_node).tolist() + if len(machine_ids) > 1: + job_gpu = job_gpu.groupby("time")["gpu_util"].mean().reset_index() + + job_gpu_trace = (job_gpu["gpu_util"].to_numpy() * 0.01 * gpus_per_node).tolist() if machine_ids: @@ -301,10 +369,7 @@ def load_data(files, **kwargs): end_time = end.timestamp() - start_ts if end else None if not submit_time or not start_time or not end_time: - warnings.warn( - f"skipped {jobid} b/c missing submit_time, start_time, or end_time", - UserWarning - ) + tqdm.write(f"skipped {jobid} b/c missing submit_time, start_time, or end_time") continue scheduled_nodes = [node_mapping[mid] for mid in machine_ids if mid in node_mapping] @@ -350,7 +415,8 @@ def load_data(files, **kwargs): else: tqdm.write(f"skipping {job['id']} b/c either no cpu or gpu trace") - tqdm.write(f"abs start time: {start_ts} rel job start: {job['start_time']} rel job end: {job['end_time']}") + if debug: + tqdm.write(f"abs start time: {start_ts} rel job start: {job['start_time']} rel job end: {job['end_time']}") return WorkloadData( jobs=jobs_list, -- GitLab From f397377b4df255dbf04739297e80c95500144d89 Mon Sep 17 00:00:00 2001 From: Wes Brewer Date: Wed, 1 Oct 2025 23:31:44 +0300 Subject: [PATCH 18/26] Refined and document the hardware TDP and performance specs that were used for philly --- config/philly/2-gpu.yaml | 10 +++++----- config/philly/8-gpu.yaml | 10 +++++----- raps/dataloaders/philly.py | 13 ++++++++++++- 3 files changed, 22 insertions(+), 11 deletions(-) diff --git a/config/philly/2-gpu.yaml b/config/philly/2-gpu.yaml index 0622605..d7201b2 100644 --- a/config/philly/2-gpu.yaml +++ b/config/philly/2-gpu.yaml @@ -13,15 +13,15 @@ system: cpus_per_node: 2 cores_per_cpu: 20 gpus_per_node: 2 - cpu_peak_flops: 1248000000000.0 - gpu_peak_flops: 7800000000000.0 + cpu_peak_flops: 1248000000000.0 # assume Xeon E5-2690v4 CPU 64-bit + gpu_peak_flops: 9300000000000.0 # assume 12G P100 32-bit cpu_fp_ratio: 0.667 gpu_fp_ratio: 0.667 power: - power_gpu_idle: 75 - power_gpu_max: 300 + power_gpu_idle: 30 + power_gpu_max: 250 power_cpu_idle: 90 - power_cpu_max: 280 + power_cpu_max: 270 power_mem: 74.26 power_nvme: 30 power_nic: 20 diff --git a/config/philly/8-gpu.yaml b/config/philly/8-gpu.yaml index aae80ee..1e92282 100644 --- a/config/philly/8-gpu.yaml +++ b/config/philly/8-gpu.yaml @@ -13,15 +13,15 @@ system: cpus_per_node: 2 cores_per_cpu: 20 gpus_per_node: 8 - cpu_peak_flops: 1248000000000.0 - gpu_peak_flops: 7800000000000.0 + cpu_peak_flops: 1248000000000.0 # assume Xeon E5-2690v4 CPU 64-bit + gpu_peak_flops: 12000000000000.0 # assume 24G P40 32-bit cpu_fp_ratio: 0.667 gpu_fp_ratio: 0.667 power: - power_gpu_idle: 75 - power_gpu_max: 300 + power_gpu_idle: 50 + power_gpu_max: 250 power_cpu_idle: 90 - power_cpu_max: 280 + power_cpu_max: 270 power_mem: 74.26 power_nvme: 30 power_nic: 20 diff --git a/raps/dataloaders/philly.py b/raps/dataloaders/philly.py index 990a72f..552c647 100644 --- a/raps/dataloaders/philly.py +++ b/raps/dataloaders/philly.py @@ -1,10 +1,21 @@ """ -Main reference to Philly traces: +This is the dataloader for the Philly traces which is documented in this paper: Jeon, Myeongjae, et al. "Analysis of Large-Scale Multi-Tenant GPU clusters for DNN training workloads." 2019 USENIX Annual Technical Conference (USENIX ATC 19). 2019. https://www.usenix.org/system/files/atc19-jeon.pdf +Note on hardware specs: + + Philly only provides GPU memory sizes (12G & 24G) without clarifying GPU models. + Hu et al. (2024) https://arxiv.org/html/2403.07648v1 + + For estimating system power and FLOPS performance, we assume that the 2-GPU + nodes used Tesla P100 (12 GB) GPUs and the 8-GPU nodes used Tesla P40 (24 GB) + GPUs, consistent with hardware Microsoft deployed around 2017. Training is + assumed to have been performed in 32-bit (FP32), and the CPUs are assumed + to be 64-bit Intel Xeon E5-2690 v4. + The repository is available here: https://github.com/msr-fiddle/philly-traces -- GitLab From 63ffbc7f5ed3f5c6ddf8d87a2504e038f13806ca Mon Sep 17 00:00:00 2001 From: Wes Brewer Date: Thu, 2 Oct 2025 00:41:53 +0300 Subject: [PATCH 19/26] Refactor to improve pylint score. Move to using absolute times for jobs. --- raps/dataloaders/philly.py | 139 ++++++++++++++++++++----------------- 1 file changed, 75 insertions(+), 64 deletions(-) diff --git a/raps/dataloaders/philly.py b/raps/dataloaders/philly.py index 552c647..ca8121b 100644 --- a/raps/dataloaders/philly.py +++ b/raps/dataloaders/philly.py @@ -1,9 +1,9 @@ """ This is the dataloader for the Philly traces which is documented in this paper: - Jeon, Myeongjae, et al. "Analysis of Large-Scale Multi-Tenant GPU clusters for DNN training workloads." - 2019 USENIX Annual Technical Conference (USENIX ATC 19). 2019. - https://www.usenix.org/system/files/atc19-jeon.pdf + Jeon, Myeongjae, et al. "Analysis of Large-Scale Multi-Tenant GPU clusters + for DNN training workloads." 2019 USENIX Annual Technical Conference + (USENIX ATC 19). 2019. https://www.usenix.org/system/files/atc19-jeon.pdf Note on hardware specs: @@ -11,9 +11,9 @@ Note on hardware specs: Hu et al. (2024) https://arxiv.org/html/2403.07648v1 For estimating system power and FLOPS performance, we assume that the 2-GPU - nodes used Tesla P100 (12 GB) GPUs and the 8-GPU nodes used Tesla P40 (24 GB) - GPUs, consistent with hardware Microsoft deployed around 2017. Training is - assumed to have been performed in 32-bit (FP32), and the CPUs are assumed + nodes used Tesla P100 (12 GB) GPUs and the 8-GPU nodes used Tesla P40 (24 GB) + GPUs, consistent with hardware Microsoft deployed around 2017. Training is + assumed to have been performed in 32-bit (FP32), and the CPUs are assumed to be 64-bit Intel Xeon E5-2690 v4. The repository is available here: @@ -28,9 +28,10 @@ The data portion of the repo can be downloaded using one of the following method wget https://github.com/msr-fiddle/philly-traces/raw/master/trace-data.tar.gz - curl -L -o trace-data.tar.gz https://github.com/msr-fiddle/philly-traces/raw/master/trace-data.tar.gz + curl -L -o trace-data.tar.gz \ + https://github.com/msr-fiddle/philly-traces/raw/master/trace-data.tar.gz -Once the file is downloaded, assuming its in /opt/data/philly/trace-data directory: +After the file is downloaded, assuming its in /opt/data/philly/trace-data directory: /opt/data/philly/trace-data/trace-data.tar.gz @@ -66,32 +67,38 @@ Once the file is downloaded, assuming its in /opt/data/philly/trace-data directo Running a replay simulation: - python main.py run-parts -x philly -f /opt/data/philly/trace-data --start 2017-10-03T00:00 --end 2017-10-04T00:00 + python main.py run-parts -x philly -f /opt/data/philly/trace-data \ + --start 2017-10-03T00:00 --end 2017-10-04T00:00 -One the dataloader has been run at least once, it will dump npz files into a directory, so -they can be replayed again without having to go through the expensive extractoin process, using e.g.: +Once the dataloader has been run at least once, it will dump npz files into a directory, +so they can be replayed again without having to go through the expensive extractoin process, +using e.g.: python main.py run-parts -x philly -f raps-output-5efefa3 -Note: it is possible to run simulations for an user-defined length of time between 10/3/2017 to 12/15/2017. +Note: it is possible to run simulations for an user-defined length of time between +10/3/2017 to 12/15/2017. """ -import os -import glob -import json + import csv -import pandas as pd +import json +import os +from datetime import datetime, timedelta, timezone -from datetime import datetime, timezone, timedelta +import pandas as pd from tqdm import tqdm -from raps.job import job_dict, Job + +from raps.job import Job, job_dict from raps.utils import WorkloadData DATE_FORMAT_STR = "%Y-%m-%d %H:%M:%S" DEFAULT_START = "2017-10-03T00:00" DEFAULT_END = "2017-10-04T00:00" + def to_epoch(ts_str): + """Convert a timestamp string or int/float into epoch seconds.""" if ts_str is None: return None if isinstance(ts_str, (int, float)): @@ -102,6 +109,7 @@ def to_epoch(ts_str): dt = datetime.strptime(ts_str, DATE_FORMAT_STR) return int(dt.timestamp()) + def parse_timestamp(val): """ Convert Philly job log timestamps to datetime. @@ -120,6 +128,7 @@ def parse_timestamp(val): return None return None + def load_traces_by_day(trace_dir, start_dt, end_dt, colname): """Load CPU or GPU traces between start_dt and end_dt.""" traces = {} @@ -131,12 +140,14 @@ def load_traces_by_day(trace_dir, start_dt, end_dt, colname): df = pd.read_csv( daily_file, names=["time", "machineId", colname], # no header in daily CSVs - dtype={"machineId": str, colname: str}, # avoid DtypeWarning + dtype={"machineId": str, colname: str}, # avoid DtypeWarning ) # Normalize time column (strip PST/PDT, parse datetime) df["time"] = df["time"].str.replace(" PST", "").str.replace(" PDT", "") - df["time"] = pd.to_datetime(df["time"], errors="coerce", format=DATE_FORMAT_STR) + df["time"] = pd.to_datetime( + df["time"], errors="coerce", format=DATE_FORMAT_STR + ) # Convert util column to numeric (NA/invalid → NaN) df[colname] = pd.to_numeric(df[colname], errors="coerce") @@ -151,13 +162,16 @@ def load_traces_by_day(trace_dir, start_dt, end_dt, colname): return traces + def parse_date(s): + """Parse a Philly trace date string into a datetime object.""" if not s or s == "None": return None # strip possible timezone labels like "PST"/"PDT" s = s.replace(" PST", "").replace(" PDT", "") return datetime.strptime(s, DATE_FORMAT_STR) + def load_data(files, **kwargs): """ Load Philly trace into ExaDigiT Job objects. @@ -178,7 +192,7 @@ def load_data(files, **kwargs): assert len(files) == 1, "Expecting a single directory path" trace_dir = files[0] gpu_trace_dir = os.path.join(files[0], "gpu_by_day") - config = kwargs.get('config') + config = kwargs.get("config") gpus_per_node = config.get("GPUS_PER_NODE") if gpus_per_node is None: raise ValueError("Must pass gpus_per_node (2 or 8)") @@ -186,34 +200,34 @@ def load_data(files, **kwargs): # --- 1. Machine list --- machine_file = os.path.join(trace_dir, "cluster_machine_list") machines = {} - with open(machine_file) as f: + with open(machine_file, encoding="utf-8") as f: reader = csv.DictReader(f) for row in reader: mid = row["machineId"] machines[mid] = { "num_gpus": int(row[" number of GPUs"]), - "gpu_mem": row[" single GPU mem"].strip() + "gpu_mem": row[" single GPU mem"].strip(), } partition_machines = { - mid: info for mid, info in machines.items() - if info["num_gpus"] == gpus_per_node + mid: info for mid, info in machines.items() if info["num_gpus"] == gpus_per_node } # Build node → index mapping for this partition - node_mapping = {mid: idx for idx, mid in enumerate(sorted(partition_machines.keys()))} - max_nodes = len(node_mapping) + node_mapping = { + mid: idx for idx, mid in enumerate(sorted(partition_machines.keys())) + } # Assign partition ID (e.g. 0 for 2-GPU, 1 for 8-GPU) partition_id = 0 if gpus_per_node == 2 else 1 # --- 3. GPU util --- start_dt = datetime.fromtimestamp(start_ts) - end_dt = datetime.fromtimestamp(end_ts) + end_dt = datetime.fromtimestamp(end_ts) # --- 4. Job log --- job_file = os.path.join(trace_dir, "cluster_job_log") - with open(job_file) as f: + with open(job_file, encoding="utf-8") as f: job_log = json.load(f) # --- First pass: filter jobs by date range --- @@ -237,7 +251,9 @@ def load_data(files, **kwargs): attempts = raw.get("attempts", []) if attempts and "detail" in attempts[0]: # Count GPUs from the first detail - gpus = sum(len(detail.get("gpus", [])) for detail in attempts[0]["detail"]) + gpus = sum( + len(detail.get("gpus", [])) for detail in attempts[0]["detail"] + ) if gpus > 0 and (gpus % gpus_per_node == 0): filtered_log.append(raw) job_log = filtered_log @@ -309,7 +325,7 @@ def load_data(files, **kwargs): mid = detail["ip"] machine_ids.append(mid) gpus += len(detail.get("gpus", [])) - + num_nodes = len(machine_ids) if num_nodes == 0: continue @@ -337,9 +353,9 @@ def load_data(files, **kwargs): else: job_cpu_df = pd.concat(cpu_dfs, ignore_index=True) mask = ( - (job_cpu_df["machineId"].isin(machine_ids)) & - (job_cpu_df["time"] >= start) & - (job_cpu_df["time"] <= end) + (job_cpu_df["machineId"].isin(machine_ids)) + & (job_cpu_df["time"] >= start) + & (job_cpu_df["time"] <= end) ) job_cpu = job_cpu_df.loc[mask].copy() @@ -361,65 +377,59 @@ def load_data(files, **kwargs): else: job_gpu_df = pd.concat(gpu_dfs, ignore_index=True) mask = ( - (job_gpu_df["machineId"].isin(machine_ids)) & - (job_gpu_df["time"] >= start) & - (job_gpu_df["time"] <= end) + (job_gpu_df["machineId"].isin(machine_ids)) + & (job_gpu_df["time"] >= start) + & (job_gpu_df["time"] <= end) ) job_gpu = job_gpu_df.loc[mask].copy() if len(machine_ids) > 1: job_gpu = job_gpu.groupby("time")["gpu_util"].mean().reset_index() - job_gpu_trace = (job_gpu["gpu_util"].to_numpy() * 0.01 * gpus_per_node).tolist() - + job_gpu_trace = ( + job_gpu["gpu_util"].to_numpy() * 0.01 * gpus_per_node + ).tolist() if machine_ids: - # Shift times relative to start_ts - submit_time = submitted.timestamp() - start_ts if submitted else None - start_time = start.timestamp() - start_ts if start else None - end_time = end.timestamp() - start_ts if end else None + submit_time = submitted.timestamp() + start_time = start.timestamp() + end_time = end.timestamp() if not submit_time or not start_time or not end_time: - tqdm.write(f"skipped {jobid} b/c missing submit_time, start_time, or end_time") + tqdm.write( + f"skipped {jobid} b/c missing submit_time, start_time, or end_time" + ) continue - - scheduled_nodes = [node_mapping[mid] for mid in machine_ids if mid in node_mapping] - if submit_time and start_time and end_time: + scheduled_nodes = [ + node_mapping[mid] for mid in machine_ids if mid in node_mapping + ] + + if submit_time and start_time and end_time: job = job_dict( id=jobid, name=f"philly-{jobid}", account=user if user else "unknown", - nodes_required=len(machine_ids), partition=partition_id, - priority=0, - cpu_cores_required=1, gpu_units_required=gpus_per_node, - end_state=status, scheduled_nodes=scheduled_nodes, - cpu_trace=job_cpu_trace, gpu_trace=job_gpu_trace, - ntx_trace=None, - nrx_trace=None, - + ntx_trace=[], + nrx_trace=[], submit_time=submit_time, start_time=start_time, end_time=end_time, - #time_limit=end_time - start_time, time_limit=end_time, expected_run_time=wall_time if wall_time else 0, - current_run_time=0, - trace_time=None, - trace_start_time=0, #None, - trace_end_time=end_time, #None, + trace_start_time=start_time, # None, + trace_end_time=end_time, # None, trace_quanta=60, - trace_missing_values=False, - downscale=1 + trace_missing_values=False ) if job_cpu_trace and job_gpu_trace: jobs_list.append(Job(job)) @@ -427,10 +437,11 @@ def load_data(files, **kwargs): tqdm.write(f"skipping {job['id']} b/c either no cpu or gpu trace") if debug: - tqdm.write(f"abs start time: {start_ts} rel job start: {job['start_time']} rel job end: {job['end_time']}") + tqdm.write(f"{job['id']} start: {job['start_time']} end: {job['end_time']}") return WorkloadData( jobs=jobs_list, - telemetry_start=0, telemetry_end=int(end_ts - start_ts), + telemetry_start=start_ts, + telemetry_end=end_ts, start_date=datetime.fromtimestamp(start_ts, timezone.utc), ) -- GitLab From 49d9afa99bfd3b40bfbb8a6f4e51b154b86a2e39 Mon Sep 17 00:00:00 2001 From: Wes Brewer Date: Fri, 3 Oct 2025 00:42:09 +0300 Subject: [PATCH 20/26] Add in Srishti's `-w calculon` option and module for generating realistic LLM traces --- raps/sim_config.py | 4 +- raps/workloads/__init__.py | 4 +- raps/workloads/calculon.py | 173 +++++++++++++++++++++++++++++++++++++ 3 files changed, 178 insertions(+), 3 deletions(-) create mode 100644 raps/workloads/calculon.py diff --git a/raps/sim_config.py b/raps/sim_config.py index da3541c..a69c288 100644 --- a/raps/sim_config.py +++ b/raps/sim_config.py @@ -134,8 +134,8 @@ class SimConfig(RAPSBaseModel, abc.ABC): """ Grab data from live system. """ # Workload arguments (TODO split into separate model) - workload: Literal['random', 'benchmark', 'peak', 'idle', - 'synthetic', 'multitenant', 'replay', 'randomAI'] = "random" + workload: Literal['random', 'benchmark', 'peak', 'idle', 'synthetic', + 'multitenant', 'replay', 'randomAI', 'calculon'] = "random" """ Type of synthetic workload """ multimodal: list[float] = [1.0] diff --git a/raps/workloads/__init__.py b/raps/workloads/__init__.py index a34261a..d61befc 100644 --- a/raps/workloads/__init__.py +++ b/raps/workloads/__init__.py @@ -10,6 +10,7 @@ from raps.sim_config import SingleSimConfig from raps.telemetry import Telemetry from .basic import BasicWorkload +from .calculon import Calculon from .constants import JOB_NAMES, ACCT_NAMES, MAX_PRIORITY from .distribution import DistributionWorkload from .live import continuous_job_generation @@ -51,7 +52,8 @@ class Workload( BaseWorkload, DistributionWorkload, BasicWorkload, - MultitenantWorkload + MultitenantWorkload, + Calculon ): """Final workload class with all workload types.""" pass diff --git a/raps/workloads/calculon.py b/raps/workloads/calculon.py new file mode 100644 index 0000000..8e8c1bc --- /dev/null +++ b/raps/workloads/calculon.py @@ -0,0 +1,173 @@ +""" +Calculon is a analytical model for estimating LLM training times for given architectures +on particular hardware. It is described in the paper: + + Isaev, Mikhail, et al. "Calculon: a methodology and tool for high-level co-design of + systems and large language models." SC23 Proceedings + https://dl.acm.org/doi/pdf/10.1145/3581784.3607102 + +The code is available at https://github.com/calculon-ai/calculon +which this module assumes is already cloned into the third_party directory. + +Calculon requires installing `psutil`, which can be pip installed via: + + pip install psutil + +Since Calculon by default supports A100 GPUs, we are able to use the default files that +are already setup in Calculon, and therefore have added two systems which have A100 GPUs: +Selene and Perlmutter. Example run commands: + + python main.py run --system selene -w calculon + python main.py run --system perlmutter -w calculon + +This code is currently setup to generate synthetic traces for four different LLM models: +megatron-22B, gpt3-175B, turing-530B, and megatron-1T. Adjust these by modifying +llm_model_tests below. + +""" +import json +import os +import random +import subprocess +from pathlib import Path + +import numpy as np + +from raps.job import job_dict + +from .constants import ACCT_NAMES + + +class Calculon: + """Calculon workload mixin for Workload class.""" + + def __init__(self, *args, **kwargs): + # NOTE: mixins usually accept (sim_config_args, system_config_dict) through Workload + super().__init__(*args, **kwargs) + + def calculon(self, **kwargs): + """Generate workload using Calculon backend + job trace synthesis.""" + jobs = [] + + llm_models_test = [ + ["megatron-22B", 8, 4], + ["gpt3-175B", 64, 64], + ["turing-530B", 280, 280], + ["megatron-1T", 512, 512], + ] + + for llm_model, num_nodes, max_batch_size in llm_models_test: + for partition in self.partitions: + config = self.config_map[partition] + gpu_system = "a100_80g" + data_type = "float16" + output = f"{llm_model}_{gpu_system}_{max_batch_size}_{data_type}_{num_nodes}.json" + + # call Calculon binary/subprocess to get MFU + batch time + mfu, total_batch_time = self._run_calculon( + llm_model, gpu_system, max_batch_size, num_nodes, data_type, output + ) + + # derive job stats + num_iters = 3000 + trace_quanta = config["TRACE_QUANTA"] + job_time = total_batch_time * num_iters + num_samples = int(job_time // trace_quanta) + + system_util = np.full(num_samples, mfu) + cpu_util = random.random() * config["CPUS_PER_NODE"] + cpu_trace = cpu_util * np.ones(num_iters) + + net_tx, net_rx = [], [] + num_nodes = num_nodes // config["GPUS_PER_NODE"] + + epochs = 1 + wall_time = job_time + for i in range(epochs): + job_info = job_dict( + nodes_required=num_nodes, + scheduled_nodes=[], + name=f"{llm_model} training for {num_iters} iterations", + account=ACCT_NAMES[0], + cpu_trace=cpu_trace, + gpu_trace=system_util, + ntx_trace=net_tx, + nrx_trace=net_rx, + end_state="COMPLETED", + id=None, + priority=100, + partition=partition, + time_limit=job_time + 1, + start_time=0, + end_time=job_time, + trace_time=job_time, + trace_start_time=0, + trace_end_time=job_time, + ) + jobs.append(job_info) + wall_time += job_time + + return jobs + + def _run_calculon(self, model, system, max_batch_size, num_nodes, data_type, output): + """Internal: run Calculon subprocess and parse result.""" + base_path = Path("third_party/calculon") + + # paths + model_path = base_path / "models" / f"{model}.json" + system_path = base_path / "systems" / f"{system}.json" + raw_path = base_path / "optimal_executions" / output.replace(".json", "_raw.json") + exec_path = base_path / "optimal_executions" / output.replace(".json", "_exec.json") + stats_path = base_path / "optimal_executions" / output.replace(".json", "_stats.json") + + # Run llm-optimal-execution to generate candidate executions + opt_cmd = [ + "./bin/calculon", "llm-optimal-execution", + f"models/{model}.json", + str(num_nodes), + str(max_batch_size), + data_type, + f"systems/{system}.json", + f"optimal_executions/{output.replace('.json', '_raw.json')}", + ] + subprocess.run(opt_cmd, check=True, cwd=base_path, env={**os.environ, "PYTHONPATH": "."}) + + # Read raw output, pick first/best execution and dump it as exec.json + with open(raw_path) as f: + raw_data = json.load(f) + + # get first (or best) key + first_key = sorted(raw_data.keys(), key=lambda k: float(k))[0] + best_exec = raw_data[first_key]["execution"] + + with open(exec_path, "w") as f: + json.dump(best_exec, f, indent=2) + + # Run llm with chosen execution, system, and model → stats.json + llm_cmd = [ + "./bin/calculon", "llm", + f"models/{model}.json", + f"optimal_executions/{output.replace('.json', '_exec.json')}", + f"systems/{system}.json", + f"optimal_executions/{output.replace('.json', '_stats.json')}", + ] + subprocess.run(llm_cmd, check=True, cwd=base_path, env={**os.environ, "PYTHONPATH": "."}) + + # Parse stats.json to extract metrics + with open(stats_path) as f: + stats_data = json.load(f) + + stats = stats_data.get("stats", {}) + + # These keys may vary depending on Calculon version + mfu = stats.get("model_flops_utilization") \ + or stats.get("sample_rate") \ + or stats.get("best_sample_rate") \ + or 0.0 + + total_batch_time = stats.get("block_fw_time") \ + or stats.get("batch_time") \ + or stats.get("total_time") \ + or 0.0 + + return mfu, total_batch_time -- GitLab From badf9df6ca9b0853aa4786e12202b868e7442b3e Mon Sep 17 00:00:00 2001 From: Wes Brewer Date: Fri, 3 Oct 2025 02:05:59 +0300 Subject: [PATCH 21/26] Fix some issues to get calculon workloads running. Add ability to use previously cached results --- config/perlmutter.yaml | 2 +- config/selene.yaml | 2 +- raps/workloads/calculon.py | 109 ++++++++++++++++++++----------------- 3 files changed, 60 insertions(+), 53 deletions(-) diff --git a/config/perlmutter.yaml b/config/perlmutter.yaml index 8863a36..e8de04c 100644 --- a/config/perlmutter.yaml +++ b/config/perlmutter.yaml @@ -38,7 +38,7 @@ scheduler: seed: 42 job_arrival_time: 900 mtbf: 11 - trace_quanta: 15 + trace_quanta: 10 min_wall_time: 3600 max_wall_time: 43200 ui_update_freq: 900 diff --git a/config/selene.yaml b/config/selene.yaml index 8f42bf6..0520da1 100644 --- a/config/selene.yaml +++ b/config/selene.yaml @@ -38,7 +38,7 @@ scheduler: seed: 42 job_arrival_time: 900 mtbf: 11 - trace_quanta: 15 + trace_quanta: 10 min_wall_time: 3600 max_wall_time: 43200 ui_update_freq: 900 diff --git a/raps/workloads/calculon.py b/raps/workloads/calculon.py index 8e8c1bc..f843084 100644 --- a/raps/workloads/calculon.py +++ b/raps/workloads/calculon.py @@ -21,10 +21,19 @@ Selene and Perlmutter. Example run commands: python main.py run --system perlmutter -w calculon This code is currently setup to generate synthetic traces for four different LLM models: -megatron-22B, gpt3-175B, turing-530B, and megatron-1T. Adjust these by modifying -llm_model_tests below. +megatron-22B, gpt3-175B, turing-530B, and megatron-1T. These four tests can take a couple +**hours** to run. On first run, consider commenting out the last three models to only test +the smallest case, megatron-22B. The parameter `llm_models_tests` below defines which tests +are run. + +Finally, the code below is setup to uses previously cached results, so once the json +files are generated by Calculon, they can be rerun very quickly again and again. +The caveat to this is if you want to change some Calculon configurations, +you will need to delete the cached json files in the calculon/optimal_executions folder, +to force it to regenerate new files. """ +import math import json import os import random @@ -33,7 +42,7 @@ from pathlib import Path import numpy as np -from raps.job import job_dict +from raps.job import Job, job_dict from .constants import ACCT_NAMES @@ -69,14 +78,17 @@ class Calculon: ) # derive job stats - num_iters = 3000 + num_iters = 1000000 # realistic number is probably in the millions trace_quanta = config["TRACE_QUANTA"] + job_time = total_batch_time * num_iters - num_samples = int(job_time // trace_quanta) + num_samples = math.ceil(job_time / trace_quanta) + 1 + end_time = num_samples * trace_quanta # align job to tick grid - system_util = np.full(num_samples, mfu) + # use random CPU utilizations for now cpu_util = random.random() * config["CPUS_PER_NODE"] - cpu_trace = cpu_util * np.ones(num_iters) + cpu_trace = np.full(num_samples, cpu_util) # same length + gpu_trace = np.full(num_samples, mfu) # length matches simulation steps net_tx, net_rx = [], [] num_nodes = num_nodes // config["GPUS_PER_NODE"] @@ -90,7 +102,7 @@ class Calculon: name=f"{llm_model} training for {num_iters} iterations", account=ACCT_NAMES[0], cpu_trace=cpu_trace, - gpu_trace=system_util, + gpu_trace=gpu_trace, ntx_trace=net_tx, nrx_trace=net_rx, end_state="COMPLETED", @@ -99,12 +111,15 @@ class Calculon: partition=partition, time_limit=job_time + 1, start_time=0, - end_time=job_time, + end_time=end_time, + expected_run_time=end_time, + trace_quanta=trace_quanta, trace_time=job_time, trace_start_time=0, trace_end_time=job_time, ) - jobs.append(job_info) + job = Job(job_info) + jobs.append(job) wall_time += job_time return jobs @@ -112,15 +127,26 @@ class Calculon: def _run_calculon(self, model, system, max_batch_size, num_nodes, data_type, output): """Internal: run Calculon subprocess and parse result.""" base_path = Path("third_party/calculon") - - # paths - model_path = base_path / "models" / f"{model}.json" - system_path = base_path / "systems" / f"{system}.json" - raw_path = base_path / "optimal_executions" / output.replace(".json", "_raw.json") - exec_path = base_path / "optimal_executions" / output.replace(".json", "_exec.json") - stats_path = base_path / "optimal_executions" / output.replace(".json", "_stats.json") - - # Run llm-optimal-execution to generate candidate executions + output_dir = base_path / "optimal_executions" + output_dir.mkdir(exist_ok=True) + + # expected files + raw_file = output_dir / f"{output.replace('.json', '_raw.json')}" + exec_file = output_dir / f"{output.replace('.json', '_exec.json')}" + stats_file = output_dir / f"{output.replace('.json', '_stats.json')}" + + # if all three exist, skip running + if raw_file.exists() and exec_file.exists() and stats_file.exists(): + print(f"[INFO] Using cached Calculon results for {output}") + with open(raw_file) as f: + data = json.load(f) + first_key = list(data.keys())[0] + stats = data[first_key]["stats"] + mfu = stats.get("sample_rate", 0) # or compute MFU if you want + batch_time = stats.get("block_fw_time", 0) # example placeholder + return mfu, batch_time + + # otherwise, run Calculon opt_cmd = [ "./bin/calculon", "llm-optimal-execution", f"models/{model}.json", @@ -128,46 +154,27 @@ class Calculon: str(max_batch_size), data_type, f"systems/{system}.json", - f"optimal_executions/{output.replace('.json', '_raw.json')}", + str(raw_file), ] - subprocess.run(opt_cmd, check=True, cwd=base_path, env={**os.environ, "PYTHONPATH": "."}) - - # Read raw output, pick first/best execution and dump it as exec.json - with open(raw_path) as f: - raw_data = json.load(f) - # get first (or best) key - first_key = sorted(raw_data.keys(), key=lambda k: float(k))[0] - best_exec = raw_data[first_key]["execution"] - - with open(exec_path, "w") as f: - json.dump(best_exec, f, indent=2) - - # Run llm with chosen execution, system, and model → stats.json llm_cmd = [ "./bin/calculon", "llm", f"models/{model}.json", - f"optimal_executions/{output.replace('.json', '_exec.json')}", + str(exec_file), f"systems/{system}.json", - f"optimal_executions/{output.replace('.json', '_stats.json')}", + str(stats_file), ] - subprocess.run(llm_cmd, check=True, cwd=base_path, env={**os.environ, "PYTHONPATH": "."}) - # Parse stats.json to extract metrics - with open(stats_path) as f: - stats_data = json.load(f) - - stats = stats_data.get("stats", {}) + subprocess.run(opt_cmd, check=True, cwd=base_path, env={**os.environ, "PYTHONPATH": "."}) + subprocess.run(llm_cmd, check=True, cwd=base_path, env={**os.environ, "PYTHONPATH": "."}) - # These keys may vary depending on Calculon version - mfu = stats.get("model_flops_utilization") \ - or stats.get("sample_rate") \ - or stats.get("best_sample_rate") \ - or 0.0 + # parse output + with open(raw_file) as f: + data = json.load(f) + first_key = list(data.keys())[0] + stats = data[first_key]["stats"] - total_batch_time = stats.get("block_fw_time") \ - or stats.get("batch_time") \ - or stats.get("total_time") \ - or 0.0 + mfu = stats.get("sample_rate", 0) + batch_time = stats.get("block_fw_time", 0) - return mfu, total_batch_time + return mfu, batch_time -- GitLab From b701d357ef91163febbc2036c621702b5d2aecdf Mon Sep 17 00:00:00 2001 From: Wes Brewer Date: Fri, 3 Oct 2025 02:10:12 +0300 Subject: [PATCH 22/26] Refactor get_current_utilization b/c it didn't handle case where job.trace_quanta not set --- raps/utils.py | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/raps/utils.py b/raps/utils.py index de565d4..e232bce 100644 --- a/raps/utils.py +++ b/raps/utils.py @@ -634,22 +634,23 @@ def convert_numpy_to_builtin(obj): def get_current_utilization(trace, job: Job): - # Return utilizaiton for a trace at the jobs current running time. - # Note: this should move to a trace.py and a Trace class! - util = 0.0 + """Return utilization for a trace at the job's current running time. + Note: this should move to a trace.py and a Trace class! + """ + if not job.trace_quanta: + raise ValueError("job.trace_quanta is not set; cannot compute utilization.") - if job.trace_quanta: - time_quanta_index = int((job.running_time - job.trace_start_time) // job.trace_quanta) - if time_quanta_index < 0: - time_quanta_index = 0 + time_quanta_index = int((job.running_time - job.trace_start_time) // job.trace_quanta) + if time_quanta_index < 0: + time_quanta_index = 0 - if (isinstance(trace, list) and trace != []) or \ + if (isinstance(trace, list) and trace) or \ (isinstance(trace, np.ndarray) and trace.size != 0): if time_quanta_index < len(trace): util = get_utilization(trace, time_quanta_index) else: util = get_utilization(trace, max(0, len(trace) - 1)) - elif isinstance(trace, float) or isinstance(trace, int): + elif isinstance(trace, (float, int)): util = trace else: util = 0.0 -- GitLab From a7ed8225f02c4b9eab3f2ec1398b7672e76d7a2c Mon Sep 17 00:00:00 2001 From: Wes Brewer Date: Tue, 21 Oct 2025 12:36:49 -0400 Subject: [PATCH 23/26] Update parse_philly_traces.py with docstring and cleanup --- scripts/parse_philly_traces.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/scripts/parse_philly_traces.py b/scripts/parse_philly_traces.py index 5693e4e..57ce8c2 100644 --- a/scripts/parse_philly_traces.py +++ b/scripts/parse_philly_traces.py @@ -1,3 +1,14 @@ +""" +See raps/dataloaders/philly.py for how to download philly traces. + +Run following to parse philly traces into separate files for each day: + + python /path/to/raps/scripts/parse_philly_traces.py cluster_cpu_util + python /path/to/raps/scripts/parse_philly_traces.py cluster_gpu_util + +This will parse these two files into two directories, cpu_by_day and gpu_by_day, +creating one file for each day and adding the lines for that day into the files. +""" import os import sys from datetime import datetime @@ -23,7 +34,6 @@ with open(input_file) as f: output_dir = "cpu_by_day" if is_cpu else "gpu_by_day" os.makedirs(output_dir, exist_ok=True) - #for i, line in enumerate(f, 1): for line in tqdm(f, total=total_lines, desc="Processing lines"): parts = line.strip().split(",") -- GitLab From 4fa6fd65bc2051ed78d8a721214e61484aa4a689 Mon Sep 17 00:00:00 2001 From: Wes Brewer Date: Tue, 21 Oct 2025 14:31:00 -0400 Subject: [PATCH 24/26] Update philly run command to use --start 2017-10-03T00:14:56Z --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 3684594..160549f 100644 --- a/README.md +++ b/README.md @@ -83,7 +83,7 @@ Microsoft Azure - 2017 Philly Traces python main.py run-parts -x philly -w multitenant # Telemetry replay - python main.py run-parts -x philly -f /opt/data/philly/trace-data --start 2017-10-03T00:00 --end 2017-10-04T00:00 + python main.py run-parts -x philly -f /opt/data/philly/trace-data --start 2017-10-03T00:14:56Z --end 2017-10-04T00:00 For Lumi -- GitLab From b5a530ffcc504c03a00272129d8cc3fd449c2a72 Mon Sep 17 00:00:00 2001 From: Wes Brewer Date: Tue, 21 Oct 2025 14:32:30 -0400 Subject: [PATCH 25/26] Update .gitignore to ignore raps-output-* and ppo_raps_logs --- .gitignore | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.gitignore b/.gitignore index ed10fab..c5f7241 100644 --- a/.gitignore +++ b/.gitignore @@ -7,3 +7,5 @@ venv simulation_results/ models/fmu-models .shell-completion-cache +raps-output-* +ppo_raps_logs -- GitLab From b2181f80eceedcc7fb41e86128d6c186a7da1723 Mon Sep 17 00:00:00 2001 From: Wes Brewer Date: Wed, 22 Oct 2025 16:15:49 -0400 Subject: [PATCH 26/26] Remove dead code that was not properly deleted when resolving merge conflict --- raps/engine.py | 69 -------------------------------------------------- 1 file changed, 69 deletions(-) diff --git a/raps/engine.py b/raps/engine.py index 6e3ad19..67cd999 100644 --- a/raps/engine.py +++ b/raps/engine.py @@ -340,75 +340,6 @@ class Engine: start_date=self.start, ) - if sim_config.live and not sim_config.replay: - td = Telemetry(**sim_config_dict) - workload_data = td.load_from_live_system() - elif sim_config.replay: - # TODO: this will have issues if running separate systems or custom systems - partition_short = partition.split("/")[-1] if partition else None - td = Telemetry( - **sim_config_dict, - partition=partition, - ) - if partition: - snap_map = {p.stem: p for p in sim_config.replay[0].glob("*.npz")} - if len(snap_map) > 0: - if partition_short not in snap_map: - raise RuntimeError(f"Snapshot '{partition_short}.npz' not in {sim_config.replay[0]}") - replay_files = [snap_map[partition_short]] - else: - replay_files = sim_config.replay - else: - replay_files = sim_config.replay - - workload_data = td.load_from_files(replay_files) - else: # Synthetic jobs - wl = Workload(sim_config_args, system_config_dict) - workload_data = wl.generate_jobs() - td = Telemetry(**sim_config_dict) - - jobs = workload_data.jobs - - # TODO refactor how stat/end/fastforward/time work - if sim_config.fastforward is not None: - workload_data.telemetry_start = workload_data.telemetry_start + sim_config.fastforward - - if sim_config.time is not None: - workload_data.telemetry_end = workload_data.telemetry_start + sim_config.time - - if sim_config.time_delta is not None: - time_delta = sim_config.time_delta - else: - time_delta = 1 - - if sim_config.continuous_job_generation: - continuous_workload = wl - else: - continuous_workload = None - - accounts = None - if sim_config.accounts: - job_accounts = Accounts(jobs) - if sim_config.accounts_json: - loaded_accounts = Accounts.from_json_filename(sim_config.accounts_json) - accounts = Accounts.merge(loaded_accounts, job_accounts) - else: - accounts = job_accounts - - engine = Engine( - power_manager=power_manager, - flops_manager=flops_manager, - cooling_model=cooling_model, - continuous_workload=continuous_workload, - jobs=jobs, - accounts=accounts, - telemetry=td, - sim_config=sim_config, - system_config=system_config, - ) - - return engine, workload_data, time_delta - def add_running_jobs_to_queue(self, jobs_to_submit: List): """ Modifies jobs_to_submit and self.queue -- GitLab