Loading raps/dataloaders/philly.py +85 −22 Original line number Diff line number Diff line import os import json import csv import datetime import pandas as pd import warnings from datetime import datetime, timezone from tqdm import tqdm from raps.job import job_dict, Job from raps.utils import WorkloadData DATE_FORMAT_STR = "%Y-%m-%d %H:%M:%S" Loading @@ -14,7 +16,7 @@ def parse_date(s): return None # strip possible timezone labels like "PST"/"PDT" s = s.replace(" PST", "").replace(" PDT", "") return datetime.datetime.strptime(s, DATE_FORMAT_STR) return datetime.strptime(s, DATE_FORMAT_STR) def load_data(files, **kwargs): """ Loading @@ -41,10 +43,17 @@ def load_data(files, **kwargs): "gpu_mem": row[" single GPU mem"].strip() } # build node → index mapping node_mapping = {mid: idx for idx, mid in enumerate(sorted(machines.keys()))} # --- 2. CPU util --- cpu_file = os.path.join(trace_dir, "cluster_cpu_util") cpu_util = pd.read_csv(cpu_file) # cpu_util has columns: time, machine_id, cpu_util cpu_util["time"] = cpu_util["time"].str.replace(" PST","").str.replace(" PDT","") #cpu_util["time"] = pd.to_datetime(cpu_util["time"], format="%Y-%m-%d %H:%M:%S") # now cpu_util has: time (datetime), machine_id, cpu_util cpu_util["time"] = cpu_util["time"].apply(parse_date) # --- 3. GPU util --- gpu_file = os.path.join(trace_dir, "cluster_gpu_util") Loading @@ -55,20 +64,33 @@ def load_data(files, **kwargs): engine="python", on_bad_lines="skip" ) if wlist: warnings.warn( f"cluster_gpu_util: skipped {len(wlist)} malformed lines while reading {gpu_file}", UserWarning ) # Convert time to datetime gpu_util["time"] = pd.to_datetime(gpu_util["time"], errors="coerce").dt.tz_localize(None) # Identify GPU columns gpu_cols = [c for c in gpu_util.columns if c.startswith("gpu")] # Collapse per row: sum all GPU utilizations and divide by 100 gpu_util["gpu_util"] = gpu_util[gpu_cols].sum(axis=1) / 100.0 # Keep only collapsed util plus metadata gpu_util = gpu_util[["time", "machineId", "gpu_util"]] print("Sample GPU util after preprocess:", gpu_util.head()) # --- 4. Job log --- job_file = os.path.join(trace_dir, "cluster_job_log") with open(job_file) as f: job_log = json.load(f) # --- First pass: find earliest submit time --- earliest_submit = None start_ts = None for raw in job_log: submitted = raw.get("submitted_time") if submitted is None or submitted == "None": Loading @@ -80,14 +102,15 @@ def load_data(files, **kwargs): else: t = parse_date(submitted).timestamp() if earliest_submit is None or t < earliest_submit: earliest_submit = t if start_ts is None or t < start_ts: start_ts = t if earliest_submit is None: if start_ts is None: raise ValueError("No valid submitted_time found in Philly traces") # --- Second pass: build jobs --- jobs = [] jobs_list = [] for raw in tqdm(job_log, desc="Building Job objects"): jobid = raw.get("jobid") user = raw.get("user") Loading @@ -96,7 +119,7 @@ def load_data(files, **kwargs): # Submitted time submitted = raw.get("submitted_time") if isinstance(submitted, (int, float)): submitted = datetime.datetime.fromtimestamp(int(submitted)) submitted = datetime.fromtimestamp(int(submitted)) else: submitted = parse_date(submitted) Loading @@ -107,12 +130,12 @@ def load_data(files, **kwargs): et = attempts[-1].get("end_time") if isinstance(st, (int, float)): start = datetime.datetime.fromtimestamp(int(st)) start = datetime.fromtimestamp(int(st)) elif st: start = parse_date(st) if isinstance(et, (int, float)): end = datetime.datetime.fromtimestamp(int(et)) end = datetime.fromtimestamp(int(et)) elif et: end = parse_date(et) Loading @@ -128,15 +151,44 @@ def load_data(files, **kwargs): machine_ids.append(mid) gpus += len(detail.get("gpus", [])) # Collect utilization traces job_cpu = cpu_util[cpu_util["machine_id"].isin(machine_ids)] job_gpu = gpu_util[gpu_util["machineId"].isin(machine_ids)] # CPU utilization traces if machine_ids and start and end: mask = ( cpu_util["machine_id"].isin(machine_ids) & (cpu_util["time"] >= start) & (cpu_util["time"] <= end) ) job_cpu = cpu_util.loc[mask].copy() # Aggregate across machines if >1 machine if len(machine_ids) > 1: job_cpu = job_cpu.groupby("time")["cpu_util"].mean().reset_index() print("Job", jobid) print("machine_ids from job:", machine_ids[:5]) print("gpu_util machineId sample:", gpu_util["machineId"].unique()[:5]) print("start, end:", start, end) print("gpu_util time range:", gpu_util["time"].min(), gpu_util["time"].max()) # GPU utilization traces job_gpu = None if machine_ids and start and end: mask = ( gpu_util["machineId"].isin(machine_ids) & (gpu_util["time"] >= start) & (gpu_util["time"] <= end) ) job_gpu = gpu_util.loc[mask].copy() # Aggregate across machines if >1 machine if len(machine_ids) > 1: job_gpu = job_gpu.groupby("time")["gpu_util"].sum().reset_index() if machine_ids: # Shift times relative to earliest_submit submit_time = submitted.timestamp() - earliest_submit if submitted else None start_time = start.timestamp() - earliest_submit if start else None end_time = end.timestamp() - earliest_submit if end else None # Shift times relative to start_ts submit_time = submitted.timestamp() - start_ts if submitted else None start_time = start.timestamp() - start_ts if start else None end_time = end.timestamp() - start_ts if end else None if not submit_time or not start_time or not end_time: warnings.warn( Loading @@ -144,6 +196,7 @@ def load_data(files, **kwargs): UserWarning ) scheduled_nodes = [node_mapping[mid] for mid in machine_ids if mid in node_mapping] if submit_time and start_time and end_time: Loading @@ -162,9 +215,10 @@ def load_data(files, **kwargs): allocated_gpu_units=gpus, end_state=status, scheduled_nodes=scheduled_nodes, cpu_trace=job_cpu if not job_cpu.empty else None, gpu_trace=job_gpu if not job_gpu.empty else None, cpu_trace=job_cpu, gpu_trace=job_gpu, ntx_trace=None, nrx_trace=None, Loading @@ -181,6 +235,15 @@ def load_data(files, **kwargs): trace_missing_values=False, downscale=1 ) jobs.append(Job(job)) jobs_list.append(Job(job)) return jobs print(job) # Find max end timestamp across jobs end_ts = max(j.end_time for j in jobs_list if j.end_time is not None) return WorkloadData( jobs=jobs_list, telemetry_start=0, telemetry_end=int(end_ts - start_ts), start_date=datetime.fromtimestamp(start_ts, timezone.utc), ) Loading
raps/dataloaders/philly.py +85 −22 Original line number Diff line number Diff line import os import json import csv import datetime import pandas as pd import warnings from datetime import datetime, timezone from tqdm import tqdm from raps.job import job_dict, Job from raps.utils import WorkloadData DATE_FORMAT_STR = "%Y-%m-%d %H:%M:%S" Loading @@ -14,7 +16,7 @@ def parse_date(s): return None # strip possible timezone labels like "PST"/"PDT" s = s.replace(" PST", "").replace(" PDT", "") return datetime.datetime.strptime(s, DATE_FORMAT_STR) return datetime.strptime(s, DATE_FORMAT_STR) def load_data(files, **kwargs): """ Loading @@ -41,10 +43,17 @@ def load_data(files, **kwargs): "gpu_mem": row[" single GPU mem"].strip() } # build node → index mapping node_mapping = {mid: idx for idx, mid in enumerate(sorted(machines.keys()))} # --- 2. CPU util --- cpu_file = os.path.join(trace_dir, "cluster_cpu_util") cpu_util = pd.read_csv(cpu_file) # cpu_util has columns: time, machine_id, cpu_util cpu_util["time"] = cpu_util["time"].str.replace(" PST","").str.replace(" PDT","") #cpu_util["time"] = pd.to_datetime(cpu_util["time"], format="%Y-%m-%d %H:%M:%S") # now cpu_util has: time (datetime), machine_id, cpu_util cpu_util["time"] = cpu_util["time"].apply(parse_date) # --- 3. GPU util --- gpu_file = os.path.join(trace_dir, "cluster_gpu_util") Loading @@ -55,20 +64,33 @@ def load_data(files, **kwargs): engine="python", on_bad_lines="skip" ) if wlist: warnings.warn( f"cluster_gpu_util: skipped {len(wlist)} malformed lines while reading {gpu_file}", UserWarning ) # Convert time to datetime gpu_util["time"] = pd.to_datetime(gpu_util["time"], errors="coerce").dt.tz_localize(None) # Identify GPU columns gpu_cols = [c for c in gpu_util.columns if c.startswith("gpu")] # Collapse per row: sum all GPU utilizations and divide by 100 gpu_util["gpu_util"] = gpu_util[gpu_cols].sum(axis=1) / 100.0 # Keep only collapsed util plus metadata gpu_util = gpu_util[["time", "machineId", "gpu_util"]] print("Sample GPU util after preprocess:", gpu_util.head()) # --- 4. Job log --- job_file = os.path.join(trace_dir, "cluster_job_log") with open(job_file) as f: job_log = json.load(f) # --- First pass: find earliest submit time --- earliest_submit = None start_ts = None for raw in job_log: submitted = raw.get("submitted_time") if submitted is None or submitted == "None": Loading @@ -80,14 +102,15 @@ def load_data(files, **kwargs): else: t = parse_date(submitted).timestamp() if earliest_submit is None or t < earliest_submit: earliest_submit = t if start_ts is None or t < start_ts: start_ts = t if earliest_submit is None: if start_ts is None: raise ValueError("No valid submitted_time found in Philly traces") # --- Second pass: build jobs --- jobs = [] jobs_list = [] for raw in tqdm(job_log, desc="Building Job objects"): jobid = raw.get("jobid") user = raw.get("user") Loading @@ -96,7 +119,7 @@ def load_data(files, **kwargs): # Submitted time submitted = raw.get("submitted_time") if isinstance(submitted, (int, float)): submitted = datetime.datetime.fromtimestamp(int(submitted)) submitted = datetime.fromtimestamp(int(submitted)) else: submitted = parse_date(submitted) Loading @@ -107,12 +130,12 @@ def load_data(files, **kwargs): et = attempts[-1].get("end_time") if isinstance(st, (int, float)): start = datetime.datetime.fromtimestamp(int(st)) start = datetime.fromtimestamp(int(st)) elif st: start = parse_date(st) if isinstance(et, (int, float)): end = datetime.datetime.fromtimestamp(int(et)) end = datetime.fromtimestamp(int(et)) elif et: end = parse_date(et) Loading @@ -128,15 +151,44 @@ def load_data(files, **kwargs): machine_ids.append(mid) gpus += len(detail.get("gpus", [])) # Collect utilization traces job_cpu = cpu_util[cpu_util["machine_id"].isin(machine_ids)] job_gpu = gpu_util[gpu_util["machineId"].isin(machine_ids)] # CPU utilization traces if machine_ids and start and end: mask = ( cpu_util["machine_id"].isin(machine_ids) & (cpu_util["time"] >= start) & (cpu_util["time"] <= end) ) job_cpu = cpu_util.loc[mask].copy() # Aggregate across machines if >1 machine if len(machine_ids) > 1: job_cpu = job_cpu.groupby("time")["cpu_util"].mean().reset_index() print("Job", jobid) print("machine_ids from job:", machine_ids[:5]) print("gpu_util machineId sample:", gpu_util["machineId"].unique()[:5]) print("start, end:", start, end) print("gpu_util time range:", gpu_util["time"].min(), gpu_util["time"].max()) # GPU utilization traces job_gpu = None if machine_ids and start and end: mask = ( gpu_util["machineId"].isin(machine_ids) & (gpu_util["time"] >= start) & (gpu_util["time"] <= end) ) job_gpu = gpu_util.loc[mask].copy() # Aggregate across machines if >1 machine if len(machine_ids) > 1: job_gpu = job_gpu.groupby("time")["gpu_util"].sum().reset_index() if machine_ids: # Shift times relative to earliest_submit submit_time = submitted.timestamp() - earliest_submit if submitted else None start_time = start.timestamp() - earliest_submit if start else None end_time = end.timestamp() - earliest_submit if end else None # Shift times relative to start_ts submit_time = submitted.timestamp() - start_ts if submitted else None start_time = start.timestamp() - start_ts if start else None end_time = end.timestamp() - start_ts if end else None if not submit_time or not start_time or not end_time: warnings.warn( Loading @@ -144,6 +196,7 @@ def load_data(files, **kwargs): UserWarning ) scheduled_nodes = [node_mapping[mid] for mid in machine_ids if mid in node_mapping] if submit_time and start_time and end_time: Loading @@ -162,9 +215,10 @@ def load_data(files, **kwargs): allocated_gpu_units=gpus, end_state=status, scheduled_nodes=scheduled_nodes, cpu_trace=job_cpu if not job_cpu.empty else None, gpu_trace=job_gpu if not job_gpu.empty else None, cpu_trace=job_cpu, gpu_trace=job_gpu, ntx_trace=None, nrx_trace=None, Loading @@ -181,6 +235,15 @@ def load_data(files, **kwargs): trace_missing_values=False, downscale=1 ) jobs.append(Job(job)) jobs_list.append(Job(job)) return jobs print(job) # Find max end timestamp across jobs end_ts = max(j.end_time for j in jobs_list if j.end_time is not None) return WorkloadData( jobs=jobs_list, telemetry_start=0, telemetry_end=int(end_ts - start_ts), start_date=datetime.fromtimestamp(start_ts, timezone.utc), )