Loading raps/dataloaders/philly.py +23 −62 Original line number Diff line number Diff line Loading @@ -74,60 +74,6 @@ def load_traces_by_day(trace_dir, start_dt, end_dt, colname): return pd.concat(frames, ignore_index=True) #def load_traces_by_day(trace_dir, start_dt, end_dt, colname): # """Load CPU or GPU traces between start_dt and end_dt. # # Args: # trace_dir (str): Directory containing daily CSV files. # start_dt (datetime): Start datetime. # end_dt (datetime): End datetime. # colname (str): Name of the utilization column (e.g., 'cpu_util' or 'gpu_util'). # """ # frames = [] # current = start_dt.date() # # while current <= end_dt.date(): # daily_file = os.path.join(trace_dir, f"{current}.csv") # if os.path.exists(daily_file): # df = pd.read_csv( # daily_file, # names=["time", "machineId", colname], # no header in daily CSVs # parse_dates=["time"] # ) # frames.append(df) # else: # print(f"⚠ No trace file for {current}") # current += timedelta(days=1) # # if not frames: # return pd.DataFrame(columns=["time", "machineId", colname]) # # return pd.concat(frames, ignore_index=True) #def load_gpu_traces_by_day(trace_dir, start_dt, end_dt): # """Load GPU traces only for the days between start_dt and end_dt.""" # frames = [] # # current = start_dt.date() # # while current <= end_dt.date(): # daily_file = os.path.join(trace_dir, f"{current}.csv") # if os.path.exists(daily_file): # df = pd.read_csv( # daily_file, # names=["time", "machineId", "gpu_util"], # no header in daily CSVs # parse_dates=["time"] # ) # frames.append(df) # else: # print(f"⚠ No trace file for {current}") # current += timedelta(days=1) # # if not frames: # return pd.DataFrame(columns=["time", "machineId", "gpu_util"]) # # return pd.concat(frames, ignore_index=True) def parse_date(s): if not s or s == "None": return None Loading Loading @@ -188,9 +134,11 @@ def load_data(files, **kwargs): cpu_util["time"] = cpu_util["time"].apply(parse_date) # --- 3. GPU util --- PDT = timezone(timedelta(hours=-7)) start_dt = datetime.fromtimestamp(start_ts, tz=PDT) end_dt = datetime.fromtimestamp(end_ts, tz=PDT) #PDT = timezone(timedelta(hours=-7)) #start_dt = datetime.fromtimestamp(start_ts, tz=PDT) #end_dt = datetime.fromtimestamp(end_ts, tz=PDT) start_dt = datetime.fromtimestamp(start_ts) # naive datetime end_dt = datetime.fromtimestamp(end_ts) cpu_trace_dir = os.path.join(trace_dir, "dist", "cpu_by_day") gpu_trace_dir = os.path.join(trace_dir, "dist", "gpu_by_day") Loading Loading @@ -279,8 +227,15 @@ def load_data(files, **kwargs): # --- absolute datetimes (used for filtering traces) --- submitted_dt = parse_timestamp(raw.get("submitted_time")) # Clamp to global CLI window - this should be fixed later to include the actual # trace start and end times (trace_start_time? and trace_end_time?) #job_start = max(start, start_dt) if start else start_dt #job_end = min(end, end_dt) if end else end_dt job_start = start job_end = end # CPU utilization traces cpu_trace = load_traces_by_day(cpu_trace_dir, start, end, "cpu_util") cpu_trace = load_traces_by_day(cpu_trace_dir, job_start, job_end, "cpu_util") mask = ( (cpu_trace["machineId"].isin(machine_ids)) & Loading @@ -293,10 +248,11 @@ def load_data(files, **kwargs): if len(machine_ids) > 1: job_cpu = job_cpu.groupby("time")["cpu_util"].mean().reset_index() # Convert from percentage to fraction job_cpu_trace = (job_cpu["cpu_util"].to_numpy() * 0.01).tolist() # Extract GPU utilization traces gpu_trace = load_traces_by_day(gpu_trace_dir, start, end, "gpu_util") gpu_trace = load_traces_by_day(gpu_trace_dir, job_start, job_end, "gpu_util") mask = ( (gpu_trace["machineId"].isin(machine_ids)) & Loading @@ -304,7 +260,14 @@ def load_data(files, **kwargs): (gpu_trace["time"] <= end) ) # Convert traces from percent to fraction of gpus_per_node, e.g., 8 gpus at 100% is 8, at 50% is 4, etc. job_gpu_trace = (gpu_trace.loc[mask, "gpu_util"].to_numpy() * 0.01 * gpus_per_node).tolist() job_gpu = gpu_trace.loc[mask].copy() # Aggregate across machines if >1 machine if len(machine_ids) > 1: job_gpu = job_gpu.groupby("time")["gpu_util"].mean().reset_index() job_gpu_trace = (job_gpu["gpu_util"].to_numpy() * 0.01 * gpus_per_node).tolist() if machine_ids: # Shift times relative to start_ts Loading Loading @@ -359,8 +322,6 @@ def load_data(files, **kwargs): print(job) exit() # Find max end timestamp across jobs #end_ts = max(j.end_time for j in jobs_list if j.end_time is not None) end_ts = 3600 Loading Loading
raps/dataloaders/philly.py +23 −62 Original line number Diff line number Diff line Loading @@ -74,60 +74,6 @@ def load_traces_by_day(trace_dir, start_dt, end_dt, colname): return pd.concat(frames, ignore_index=True) #def load_traces_by_day(trace_dir, start_dt, end_dt, colname): # """Load CPU or GPU traces between start_dt and end_dt. # # Args: # trace_dir (str): Directory containing daily CSV files. # start_dt (datetime): Start datetime. # end_dt (datetime): End datetime. # colname (str): Name of the utilization column (e.g., 'cpu_util' or 'gpu_util'). # """ # frames = [] # current = start_dt.date() # # while current <= end_dt.date(): # daily_file = os.path.join(trace_dir, f"{current}.csv") # if os.path.exists(daily_file): # df = pd.read_csv( # daily_file, # names=["time", "machineId", colname], # no header in daily CSVs # parse_dates=["time"] # ) # frames.append(df) # else: # print(f"⚠ No trace file for {current}") # current += timedelta(days=1) # # if not frames: # return pd.DataFrame(columns=["time", "machineId", colname]) # # return pd.concat(frames, ignore_index=True) #def load_gpu_traces_by_day(trace_dir, start_dt, end_dt): # """Load GPU traces only for the days between start_dt and end_dt.""" # frames = [] # # current = start_dt.date() # # while current <= end_dt.date(): # daily_file = os.path.join(trace_dir, f"{current}.csv") # if os.path.exists(daily_file): # df = pd.read_csv( # daily_file, # names=["time", "machineId", "gpu_util"], # no header in daily CSVs # parse_dates=["time"] # ) # frames.append(df) # else: # print(f"⚠ No trace file for {current}") # current += timedelta(days=1) # # if not frames: # return pd.DataFrame(columns=["time", "machineId", "gpu_util"]) # # return pd.concat(frames, ignore_index=True) def parse_date(s): if not s or s == "None": return None Loading Loading @@ -188,9 +134,11 @@ def load_data(files, **kwargs): cpu_util["time"] = cpu_util["time"].apply(parse_date) # --- 3. GPU util --- PDT = timezone(timedelta(hours=-7)) start_dt = datetime.fromtimestamp(start_ts, tz=PDT) end_dt = datetime.fromtimestamp(end_ts, tz=PDT) #PDT = timezone(timedelta(hours=-7)) #start_dt = datetime.fromtimestamp(start_ts, tz=PDT) #end_dt = datetime.fromtimestamp(end_ts, tz=PDT) start_dt = datetime.fromtimestamp(start_ts) # naive datetime end_dt = datetime.fromtimestamp(end_ts) cpu_trace_dir = os.path.join(trace_dir, "dist", "cpu_by_day") gpu_trace_dir = os.path.join(trace_dir, "dist", "gpu_by_day") Loading Loading @@ -279,8 +227,15 @@ def load_data(files, **kwargs): # --- absolute datetimes (used for filtering traces) --- submitted_dt = parse_timestamp(raw.get("submitted_time")) # Clamp to global CLI window - this should be fixed later to include the actual # trace start and end times (trace_start_time? and trace_end_time?) #job_start = max(start, start_dt) if start else start_dt #job_end = min(end, end_dt) if end else end_dt job_start = start job_end = end # CPU utilization traces cpu_trace = load_traces_by_day(cpu_trace_dir, start, end, "cpu_util") cpu_trace = load_traces_by_day(cpu_trace_dir, job_start, job_end, "cpu_util") mask = ( (cpu_trace["machineId"].isin(machine_ids)) & Loading @@ -293,10 +248,11 @@ def load_data(files, **kwargs): if len(machine_ids) > 1: job_cpu = job_cpu.groupby("time")["cpu_util"].mean().reset_index() # Convert from percentage to fraction job_cpu_trace = (job_cpu["cpu_util"].to_numpy() * 0.01).tolist() # Extract GPU utilization traces gpu_trace = load_traces_by_day(gpu_trace_dir, start, end, "gpu_util") gpu_trace = load_traces_by_day(gpu_trace_dir, job_start, job_end, "gpu_util") mask = ( (gpu_trace["machineId"].isin(machine_ids)) & Loading @@ -304,7 +260,14 @@ def load_data(files, **kwargs): (gpu_trace["time"] <= end) ) # Convert traces from percent to fraction of gpus_per_node, e.g., 8 gpus at 100% is 8, at 50% is 4, etc. job_gpu_trace = (gpu_trace.loc[mask, "gpu_util"].to_numpy() * 0.01 * gpus_per_node).tolist() job_gpu = gpu_trace.loc[mask].copy() # Aggregate across machines if >1 machine if len(machine_ids) > 1: job_gpu = job_gpu.groupby("time")["gpu_util"].mean().reset_index() job_gpu_trace = (job_gpu["gpu_util"].to_numpy() * 0.01 * gpus_per_node).tolist() if machine_ids: # Shift times relative to start_ts Loading Loading @@ -359,8 +322,6 @@ def load_data(files, **kwargs): print(job) exit() # Find max end timestamp across jobs #end_ts = max(j.end_time for j in jobs_list if j.end_time is not None) end_ts = 3600 Loading