Loading raps/dataloaders/philly.py +88 −22 Original line number Diff line number Diff line Loading @@ -43,10 +43,9 @@ def parse_timestamp(val): return None return None def load_gpu_traces_by_day(trace_dir, start_dt, end_dt): """Load GPU traces only for the days between start_dt and end_dt.""" def load_traces_by_day(trace_dir, start_dt, end_dt, colname): """Load CPU or GPU traces between start_dt and end_dt.""" frames = [] current = start_dt.date() while current <= end_dt.date(): Loading @@ -54,19 +53,81 @@ def load_gpu_traces_by_day(trace_dir, start_dt, end_dt): if os.path.exists(daily_file): df = pd.read_csv( daily_file, names=["time", "machineId", "gpu_util"], # no header in daily CSVs parse_dates=["time"] names=["time", "machineId", colname], # no header in daily CSVs dtype={"machineId": str, colname: str}, # avoid DtypeWarning ) # Normalize time column (strip PST/PDT, parse datetime) df["time"] = df["time"].str.replace(" PST", "").str.replace(" PDT", "") df["time"] = pd.to_datetime(df["time"], errors="coerce", format=DATE_FORMAT_STR) # Convert util column to numeric (NA/invalid → NaN) df[colname] = pd.to_numeric(df[colname], errors="coerce") frames.append(df) else: print(f"⚠ No trace file for {current}") current += timedelta(days=1) if not frames: return pd.DataFrame(columns=["time", "machineId", "gpu_util"]) return pd.DataFrame(columns=["time", "machineId", colname]) return pd.concat(frames, ignore_index=True) #def load_traces_by_day(trace_dir, start_dt, end_dt, colname): # """Load CPU or GPU traces between start_dt and end_dt. # # Args: # trace_dir (str): Directory containing daily CSV files. # start_dt (datetime): Start datetime. # end_dt (datetime): End datetime. # colname (str): Name of the utilization column (e.g., 'cpu_util' or 'gpu_util'). # """ # frames = [] # current = start_dt.date() # # while current <= end_dt.date(): # daily_file = os.path.join(trace_dir, f"{current}.csv") # if os.path.exists(daily_file): # df = pd.read_csv( # daily_file, # names=["time", "machineId", colname], # no header in daily CSVs # parse_dates=["time"] # ) # frames.append(df) # else: # print(f"⚠ No trace file for {current}") # current += timedelta(days=1) # # if not frames: # return pd.DataFrame(columns=["time", "machineId", colname]) # # return pd.concat(frames, ignore_index=True) #def load_gpu_traces_by_day(trace_dir, start_dt, end_dt): # """Load GPU traces only for the days between start_dt and end_dt.""" # frames = [] # # current = start_dt.date() # # while current <= end_dt.date(): # daily_file = os.path.join(trace_dir, f"{current}.csv") # if os.path.exists(daily_file): # df = pd.read_csv( # daily_file, # names=["time", "machineId", "gpu_util"], # no header in daily CSVs # parse_dates=["time"] # ) # frames.append(df) # else: # print(f"⚠ No trace file for {current}") # current += timedelta(days=1) # # if not frames: # return pd.DataFrame(columns=["time", "machineId", "gpu_util"]) # # return pd.concat(frames, ignore_index=True) def parse_date(s): if not s or s == "None": return None Loading Loading @@ -131,6 +192,7 @@ def load_data(files, **kwargs): start_dt = datetime.fromtimestamp(start_ts, tz=PDT) end_dt = datetime.fromtimestamp(end_ts, tz=PDT) cpu_trace_dir = os.path.join(trace_dir, "dist", "cpu_by_day") gpu_trace_dir = os.path.join(trace_dir, "dist", "gpu_by_day") # --- 4. Job log --- Loading Loading @@ -214,23 +276,27 @@ def load_data(files, **kwargs): num_nodes = len(machine_ids) gpus_per_node = gpus // num_nodes # CPU utilization traces #if machine_ids and start and end: # mask = ( # cpu_util["machine_id"].isin(machine_ids) & # (cpu_util["time"] >= start) & # (cpu_util["time"] <= end) # ) # job_cpu = cpu_util.loc[mask].copy() # # # Aggregate across machines if >1 machine # if len(machine_ids) > 1: # job_cpu = job_cpu.groupby("time")["cpu_util"].mean().reset_index() # --- absolute datetimes (used for filtering traces) --- submitted_dt = parse_timestamp(raw.get("submitted_time")) gpu_trace = load_gpu_traces_by_day(gpu_trace_dir, start, end) # CPU utilization traces cpu_trace = load_traces_by_day(cpu_trace_dir, start, end, "cpu_util") mask = ( (cpu_trace["machineId"].isin(machine_ids)) & (cpu_trace["time"] >= start) & (cpu_trace["time"] <= end) ) job_cpu = cpu_trace.loc[mask].copy() # Aggregate across machines if >1 machine if len(machine_ids) > 1: job_cpu = job_cpu.groupby("time")["cpu_util"].mean().reset_index() job_cpu_trace = (job_cpu["cpu_util"].to_numpy() * 0.01).tolist() # Extract GPU utilization traces gpu_trace = load_traces_by_day(gpu_trace_dir, start, end, "gpu_util") mask = ( (gpu_trace["machineId"].isin(machine_ids)) & Loading Loading @@ -265,13 +331,13 @@ def load_data(files, **kwargs): partition=partition_id, priority=0, cpu_cores_required=0, cpu_cores_required=1, gpu_units_required=gpus_per_node, end_state=status, scheduled_nodes=scheduled_nodes, cpu_trace=0, cpu_trace=job_cpu_trace, gpu_trace=job_gpu_trace, ntx_trace=None, nrx_trace=None, Loading Loading
raps/dataloaders/philly.py +88 −22 Original line number Diff line number Diff line Loading @@ -43,10 +43,9 @@ def parse_timestamp(val): return None return None def load_gpu_traces_by_day(trace_dir, start_dt, end_dt): """Load GPU traces only for the days between start_dt and end_dt.""" def load_traces_by_day(trace_dir, start_dt, end_dt, colname): """Load CPU or GPU traces between start_dt and end_dt.""" frames = [] current = start_dt.date() while current <= end_dt.date(): Loading @@ -54,19 +53,81 @@ def load_gpu_traces_by_day(trace_dir, start_dt, end_dt): if os.path.exists(daily_file): df = pd.read_csv( daily_file, names=["time", "machineId", "gpu_util"], # no header in daily CSVs parse_dates=["time"] names=["time", "machineId", colname], # no header in daily CSVs dtype={"machineId": str, colname: str}, # avoid DtypeWarning ) # Normalize time column (strip PST/PDT, parse datetime) df["time"] = df["time"].str.replace(" PST", "").str.replace(" PDT", "") df["time"] = pd.to_datetime(df["time"], errors="coerce", format=DATE_FORMAT_STR) # Convert util column to numeric (NA/invalid → NaN) df[colname] = pd.to_numeric(df[colname], errors="coerce") frames.append(df) else: print(f"⚠ No trace file for {current}") current += timedelta(days=1) if not frames: return pd.DataFrame(columns=["time", "machineId", "gpu_util"]) return pd.DataFrame(columns=["time", "machineId", colname]) return pd.concat(frames, ignore_index=True) #def load_traces_by_day(trace_dir, start_dt, end_dt, colname): # """Load CPU or GPU traces between start_dt and end_dt. # # Args: # trace_dir (str): Directory containing daily CSV files. # start_dt (datetime): Start datetime. # end_dt (datetime): End datetime. # colname (str): Name of the utilization column (e.g., 'cpu_util' or 'gpu_util'). # """ # frames = [] # current = start_dt.date() # # while current <= end_dt.date(): # daily_file = os.path.join(trace_dir, f"{current}.csv") # if os.path.exists(daily_file): # df = pd.read_csv( # daily_file, # names=["time", "machineId", colname], # no header in daily CSVs # parse_dates=["time"] # ) # frames.append(df) # else: # print(f"⚠ No trace file for {current}") # current += timedelta(days=1) # # if not frames: # return pd.DataFrame(columns=["time", "machineId", colname]) # # return pd.concat(frames, ignore_index=True) #def load_gpu_traces_by_day(trace_dir, start_dt, end_dt): # """Load GPU traces only for the days between start_dt and end_dt.""" # frames = [] # # current = start_dt.date() # # while current <= end_dt.date(): # daily_file = os.path.join(trace_dir, f"{current}.csv") # if os.path.exists(daily_file): # df = pd.read_csv( # daily_file, # names=["time", "machineId", "gpu_util"], # no header in daily CSVs # parse_dates=["time"] # ) # frames.append(df) # else: # print(f"⚠ No trace file for {current}") # current += timedelta(days=1) # # if not frames: # return pd.DataFrame(columns=["time", "machineId", "gpu_util"]) # # return pd.concat(frames, ignore_index=True) def parse_date(s): if not s or s == "None": return None Loading Loading @@ -131,6 +192,7 @@ def load_data(files, **kwargs): start_dt = datetime.fromtimestamp(start_ts, tz=PDT) end_dt = datetime.fromtimestamp(end_ts, tz=PDT) cpu_trace_dir = os.path.join(trace_dir, "dist", "cpu_by_day") gpu_trace_dir = os.path.join(trace_dir, "dist", "gpu_by_day") # --- 4. Job log --- Loading Loading @@ -214,23 +276,27 @@ def load_data(files, **kwargs): num_nodes = len(machine_ids) gpus_per_node = gpus // num_nodes # CPU utilization traces #if machine_ids and start and end: # mask = ( # cpu_util["machine_id"].isin(machine_ids) & # (cpu_util["time"] >= start) & # (cpu_util["time"] <= end) # ) # job_cpu = cpu_util.loc[mask].copy() # # # Aggregate across machines if >1 machine # if len(machine_ids) > 1: # job_cpu = job_cpu.groupby("time")["cpu_util"].mean().reset_index() # --- absolute datetimes (used for filtering traces) --- submitted_dt = parse_timestamp(raw.get("submitted_time")) gpu_trace = load_gpu_traces_by_day(gpu_trace_dir, start, end) # CPU utilization traces cpu_trace = load_traces_by_day(cpu_trace_dir, start, end, "cpu_util") mask = ( (cpu_trace["machineId"].isin(machine_ids)) & (cpu_trace["time"] >= start) & (cpu_trace["time"] <= end) ) job_cpu = cpu_trace.loc[mask].copy() # Aggregate across machines if >1 machine if len(machine_ids) > 1: job_cpu = job_cpu.groupby("time")["cpu_util"].mean().reset_index() job_cpu_trace = (job_cpu["cpu_util"].to_numpy() * 0.01).tolist() # Extract GPU utilization traces gpu_trace = load_traces_by_day(gpu_trace_dir, start, end, "gpu_util") mask = ( (gpu_trace["machineId"].isin(machine_ids)) & Loading Loading @@ -265,13 +331,13 @@ def load_data(files, **kwargs): partition=partition_id, priority=0, cpu_cores_required=0, cpu_cores_required=1, gpu_units_required=gpus_per_node, end_state=status, scheduled_nodes=scheduled_nodes, cpu_trace=0, cpu_trace=job_cpu_trace, gpu_trace=job_gpu_trace, ntx_trace=None, nrx_trace=None, Loading