Loading raps/dataloaders/philly.py +120 −48 Original line number Diff line number Diff line import os import glob import json import csv import pandas as pd import warnings from datetime import datetime, timezone from datetime import datetime, timezone, timedelta from tqdm import tqdm from raps.job import job_dict, Job from raps.utils import WorkloadData DATE_FORMAT_STR = "%Y-%m-%d %H:%M:%S" DEFAULT_START = "2017-10-03T00:00" DEFAULT_END = "2017-10-04T00:00" def to_epoch(ts_str): if ts_str is None: return None if isinstance(ts_str, (int, float)): return int(ts_str) if "T" in ts_str: dt = datetime.fromisoformat(ts_str) else: dt = datetime.strptime(ts_str, DATE_FORMAT_STR) return int(dt.timestamp()) def parse_timestamp(val): """ Convert Philly job log timestamps to datetime. Handles integers (epoch) and strings with PST/PDT. Returns datetime or None. """ if val is None or val == "None": return None if isinstance(val, (int, float)): return datetime.fromtimestamp(int(val), tz=timezone.utc).replace(tzinfo=None) if isinstance(val, str): val = val.replace(" PST", "").replace(" PDT", "") try: return datetime.strptime(val, DATE_FORMAT_STR).replace(tzinfo=None) except ValueError: return None return None def load_gpu_traces_by_dayXX(gpu_trace_dir, machine_ids, job_start_dt, job_end_dt): """ Load GPU utilization for specific machines and time range, using preprocessed per-day CSVs (gpu_by_day/). """ dfs = [] current = job_start_dt.date() while current <= job_end_dt.date(): day_file = os.path.join(gpu_trace_dir, f"{current}.csv") if os.path.exists(day_file): df = pd.read_csv( day_file, names=["time", "machineId", "gpu_util"], parse_dates=["time"], on_bad_lines="skip" ) df = df[df["machineId"].isin(machine_ids)] df = df[(df["time"] >= job_start_dt) & (df["time"] <= job_end_dt)] if not df.empty: dfs.append(df) current += timedelta(days=1) if dfs: return pd.concat(dfs, ignore_index=True) return pd.DataFrame(columns=["time", "machineId", "gpu_util"]) def load_gpu_traces_by_day(trace_dir, start_dt, end_dt): """Load GPU traces only for the days between start_dt and end_dt.""" gpu_dir = os.path.join(trace_dir, "dist/gpu_by_day") frames = [] current = start_dt.date() while current <= end_dt.date(): daily_file = os.path.join(gpu_dir, f"{current}.csv") if os.path.exists(daily_file): df = pd.read_csv( daily_file, names=["time", "machineId", "gpu_util"], # no header in daily CSVs parse_dates=["time"] ) frames.append(df) else: print(f"⚠ No trace file for {current}") current += timedelta(days=1) if not frames: return pd.DataFrame(columns=["time", "machineId", "gpu_util"]) return pd.concat(frames, ignore_index=True) def parse_date(s): if not s or s == "None": Loading @@ -28,8 +111,12 @@ def load_data(files, **kwargs): Returns: list[Job] """ # extract --start from kwargs start_ts = to_epoch(kwargs.get("start", DEFAULT_START)) end_ts = to_epoch(kwargs.get("end", DEFAULT_END)) assert len(files) == 1, "Expecting a single directory path" trace_dir = files[0] gpu_trace_dir = os.path.join(files[0], "dist", "gpu_by_day") config = kwargs.get('config') gpus_per_node = config.get("GPUS_PER_NODE") if gpus_per_node is None: Loading Loading @@ -62,46 +149,36 @@ def load_data(files, **kwargs): # --- 2. CPU util --- cpu_file = os.path.join(trace_dir, "cluster_cpu_util") cpu_util = pd.read_csv(cpu_file) # cpu_util has columns: time, machine_id, cpu_util cpu_util["time"] = cpu_util["time"].str.replace(" PST","").str.replace(" PDT","") #cpu_util["time"] = pd.to_datetime(cpu_util["time"], format="%Y-%m-%d %H:%M:%S") # now cpu_util has: time (datetime), machine_id, cpu_util cpu_util["time"] = cpu_util["time"].apply(parse_date) # --- 3. GPU util --- gpu_file = os.path.join(trace_dir, "cluster_gpu_util") with warnings.catch_warnings(record=True) as wlist: gpu_util = pd.read_csv( gpu_file, engine="python", on_bad_lines="skip" ) if wlist: warnings.warn( f"cluster_gpu_util: skipped {len(wlist)} malformed lines while reading {gpu_file}", UserWarning ) # Convert time to datetime gpu_util["time"] = pd.to_datetime(gpu_util["time"], errors="coerce").dt.tz_localize(None) # Identify GPU columns gpu_cols = [c for c in gpu_util.columns if c.startswith("gpu")] start_dt = datetime.fromtimestamp(start_ts, tz=timezone.utc) end_dt = datetime.fromtimestamp(end_ts, tz=timezone.utc) # Collapse per row: sum all GPU utilizations and divide by 100 gpu_util["gpu_util"] = gpu_util[gpu_cols].sum(axis=1) / 100.0 gpu_trace_dir = os.path.join(trace_dir, "dist", "gpu_by_day") # Keep only collapsed util plus metadata gpu_util = gpu_util[["time", "machineId", "gpu_util"]] #print("Sample GPU util after preprocess:", gpu_util.head()) # For each job: gpu_trace = load_gpu_traces_by_day(gpu_trace_dir, start_dt, end_dt) job_gpu = load_gpu_traces_by_day(gpu_trace_dir, start_dt, end_dt) # --- 4. Job log --- job_file = os.path.join(trace_dir, "cluster_job_log") with open(job_file) as f: job_log = json.load(f) # Filter job_log to only jobs matching the partition's gpus_per_node if gpus_per_node is not None: filtered_log = [] for raw in job_log: attempts = raw.get("attempts", []) if attempts and "detail" in attempts[0]: # Count GPUs from the first detail gpus = sum(len(detail.get("gpus", [])) for detail in attempts[0]["detail"]) if gpus > 0 and (gpus % gpus_per_node == 0): filtered_log.append(raw) job_log = filtered_log # --- First pass: find earliest submit time --- start_ts = None for raw in job_log: Loading Loading @@ -176,25 +253,19 @@ def load_data(files, **kwargs): if len(machine_ids) > 1: job_cpu = job_cpu.groupby("time")["cpu_util"].mean().reset_index() #print("Job", jobid) #print("machine_ids from job:", machine_ids[:5]) #print("gpu_util machineId sample:", gpu_util["machineId"].unique()[:5]) #print("start, end:", start, end) #print("gpu_util time range:", gpu_util["time"].min(), gpu_util["time"].max()) # --- absolute datetimes (used for filtering traces) --- submitted_dt = parse_timestamp(raw.get("submitted_time")) # GPU utilization traces job_gpu = None if machine_ids and start and end: mask = ( gpu_util["machineId"].isin(machine_ids) & (gpu_util["time"] >= start) & (gpu_util["time"] <= end) (gpu_trace["machineId"].isin(machine_ids)) & (gpu_trace["time"] >= start_dt) & (gpu_trace["time"] <= end_dt) ) job_gpu = gpu_util.loc[mask].copy() job_gpu = gpu_trace.loc[mask].copy() # Aggregate across machines if >1 machine if len(machine_ids) > 1: job_gpu = job_gpu.groupby("time")["gpu_util"].sum().reset_index() print(f" job_gpu shape after filtering: {job_gpu.shape}") if job_gpu.empty: print(" ⚠ No GPU rows matched this job") if machine_ids: # Shift times relative to start_ts Loading Loading @@ -229,7 +300,7 @@ def load_data(files, **kwargs): end_state=status, scheduled_nodes=scheduled_nodes, cpu_trace=job_cpu, cpu_trace=0, gpu_trace=job_gpu, ntx_trace=None, nrx_trace=None, Loading @@ -252,7 +323,8 @@ def load_data(files, **kwargs): print(job) # Find max end timestamp across jobs end_ts = max(j.end_time for j in jobs_list if j.end_time is not None) #end_ts = max(j.end_time for j in jobs_list if j.end_time is not None) end_ts = 3600 return WorkloadData( jobs=jobs_list, Loading Loading
raps/dataloaders/philly.py +120 −48 Original line number Diff line number Diff line import os import glob import json import csv import pandas as pd import warnings from datetime import datetime, timezone from datetime import datetime, timezone, timedelta from tqdm import tqdm from raps.job import job_dict, Job from raps.utils import WorkloadData DATE_FORMAT_STR = "%Y-%m-%d %H:%M:%S" DEFAULT_START = "2017-10-03T00:00" DEFAULT_END = "2017-10-04T00:00" def to_epoch(ts_str): if ts_str is None: return None if isinstance(ts_str, (int, float)): return int(ts_str) if "T" in ts_str: dt = datetime.fromisoformat(ts_str) else: dt = datetime.strptime(ts_str, DATE_FORMAT_STR) return int(dt.timestamp()) def parse_timestamp(val): """ Convert Philly job log timestamps to datetime. Handles integers (epoch) and strings with PST/PDT. Returns datetime or None. """ if val is None or val == "None": return None if isinstance(val, (int, float)): return datetime.fromtimestamp(int(val), tz=timezone.utc).replace(tzinfo=None) if isinstance(val, str): val = val.replace(" PST", "").replace(" PDT", "") try: return datetime.strptime(val, DATE_FORMAT_STR).replace(tzinfo=None) except ValueError: return None return None def load_gpu_traces_by_dayXX(gpu_trace_dir, machine_ids, job_start_dt, job_end_dt): """ Load GPU utilization for specific machines and time range, using preprocessed per-day CSVs (gpu_by_day/). """ dfs = [] current = job_start_dt.date() while current <= job_end_dt.date(): day_file = os.path.join(gpu_trace_dir, f"{current}.csv") if os.path.exists(day_file): df = pd.read_csv( day_file, names=["time", "machineId", "gpu_util"], parse_dates=["time"], on_bad_lines="skip" ) df = df[df["machineId"].isin(machine_ids)] df = df[(df["time"] >= job_start_dt) & (df["time"] <= job_end_dt)] if not df.empty: dfs.append(df) current += timedelta(days=1) if dfs: return pd.concat(dfs, ignore_index=True) return pd.DataFrame(columns=["time", "machineId", "gpu_util"]) def load_gpu_traces_by_day(trace_dir, start_dt, end_dt): """Load GPU traces only for the days between start_dt and end_dt.""" gpu_dir = os.path.join(trace_dir, "dist/gpu_by_day") frames = [] current = start_dt.date() while current <= end_dt.date(): daily_file = os.path.join(gpu_dir, f"{current}.csv") if os.path.exists(daily_file): df = pd.read_csv( daily_file, names=["time", "machineId", "gpu_util"], # no header in daily CSVs parse_dates=["time"] ) frames.append(df) else: print(f"⚠ No trace file for {current}") current += timedelta(days=1) if not frames: return pd.DataFrame(columns=["time", "machineId", "gpu_util"]) return pd.concat(frames, ignore_index=True) def parse_date(s): if not s or s == "None": Loading @@ -28,8 +111,12 @@ def load_data(files, **kwargs): Returns: list[Job] """ # extract --start from kwargs start_ts = to_epoch(kwargs.get("start", DEFAULT_START)) end_ts = to_epoch(kwargs.get("end", DEFAULT_END)) assert len(files) == 1, "Expecting a single directory path" trace_dir = files[0] gpu_trace_dir = os.path.join(files[0], "dist", "gpu_by_day") config = kwargs.get('config') gpus_per_node = config.get("GPUS_PER_NODE") if gpus_per_node is None: Loading Loading @@ -62,46 +149,36 @@ def load_data(files, **kwargs): # --- 2. CPU util --- cpu_file = os.path.join(trace_dir, "cluster_cpu_util") cpu_util = pd.read_csv(cpu_file) # cpu_util has columns: time, machine_id, cpu_util cpu_util["time"] = cpu_util["time"].str.replace(" PST","").str.replace(" PDT","") #cpu_util["time"] = pd.to_datetime(cpu_util["time"], format="%Y-%m-%d %H:%M:%S") # now cpu_util has: time (datetime), machine_id, cpu_util cpu_util["time"] = cpu_util["time"].apply(parse_date) # --- 3. GPU util --- gpu_file = os.path.join(trace_dir, "cluster_gpu_util") with warnings.catch_warnings(record=True) as wlist: gpu_util = pd.read_csv( gpu_file, engine="python", on_bad_lines="skip" ) if wlist: warnings.warn( f"cluster_gpu_util: skipped {len(wlist)} malformed lines while reading {gpu_file}", UserWarning ) # Convert time to datetime gpu_util["time"] = pd.to_datetime(gpu_util["time"], errors="coerce").dt.tz_localize(None) # Identify GPU columns gpu_cols = [c for c in gpu_util.columns if c.startswith("gpu")] start_dt = datetime.fromtimestamp(start_ts, tz=timezone.utc) end_dt = datetime.fromtimestamp(end_ts, tz=timezone.utc) # Collapse per row: sum all GPU utilizations and divide by 100 gpu_util["gpu_util"] = gpu_util[gpu_cols].sum(axis=1) / 100.0 gpu_trace_dir = os.path.join(trace_dir, "dist", "gpu_by_day") # Keep only collapsed util plus metadata gpu_util = gpu_util[["time", "machineId", "gpu_util"]] #print("Sample GPU util after preprocess:", gpu_util.head()) # For each job: gpu_trace = load_gpu_traces_by_day(gpu_trace_dir, start_dt, end_dt) job_gpu = load_gpu_traces_by_day(gpu_trace_dir, start_dt, end_dt) # --- 4. Job log --- job_file = os.path.join(trace_dir, "cluster_job_log") with open(job_file) as f: job_log = json.load(f) # Filter job_log to only jobs matching the partition's gpus_per_node if gpus_per_node is not None: filtered_log = [] for raw in job_log: attempts = raw.get("attempts", []) if attempts and "detail" in attempts[0]: # Count GPUs from the first detail gpus = sum(len(detail.get("gpus", [])) for detail in attempts[0]["detail"]) if gpus > 0 and (gpus % gpus_per_node == 0): filtered_log.append(raw) job_log = filtered_log # --- First pass: find earliest submit time --- start_ts = None for raw in job_log: Loading Loading @@ -176,25 +253,19 @@ def load_data(files, **kwargs): if len(machine_ids) > 1: job_cpu = job_cpu.groupby("time")["cpu_util"].mean().reset_index() #print("Job", jobid) #print("machine_ids from job:", machine_ids[:5]) #print("gpu_util machineId sample:", gpu_util["machineId"].unique()[:5]) #print("start, end:", start, end) #print("gpu_util time range:", gpu_util["time"].min(), gpu_util["time"].max()) # --- absolute datetimes (used for filtering traces) --- submitted_dt = parse_timestamp(raw.get("submitted_time")) # GPU utilization traces job_gpu = None if machine_ids and start and end: mask = ( gpu_util["machineId"].isin(machine_ids) & (gpu_util["time"] >= start) & (gpu_util["time"] <= end) (gpu_trace["machineId"].isin(machine_ids)) & (gpu_trace["time"] >= start_dt) & (gpu_trace["time"] <= end_dt) ) job_gpu = gpu_util.loc[mask].copy() job_gpu = gpu_trace.loc[mask].copy() # Aggregate across machines if >1 machine if len(machine_ids) > 1: job_gpu = job_gpu.groupby("time")["gpu_util"].sum().reset_index() print(f" job_gpu shape after filtering: {job_gpu.shape}") if job_gpu.empty: print(" ⚠ No GPU rows matched this job") if machine_ids: # Shift times relative to start_ts Loading Loading @@ -229,7 +300,7 @@ def load_data(files, **kwargs): end_state=status, scheduled_nodes=scheduled_nodes, cpu_trace=job_cpu, cpu_trace=0, gpu_trace=job_gpu, ntx_trace=None, nrx_trace=None, Loading @@ -252,7 +323,8 @@ def load_data(files, **kwargs): print(job) # Find max end timestamp across jobs end_ts = max(j.end_time for j in jobs_list if j.end_time is not None) #end_ts = max(j.end_time for j in jobs_list if j.end_time is not None) end_ts = 3600 return WorkloadData( jobs=jobs_list, Loading