Loading README.md +1 −1 Original line number Diff line number Diff line Loading @@ -83,7 +83,7 @@ Microsoft Azure - 2017 Philly Traces python main.py run-parts -x philly -w multitenant # Telemetry replay python main.py run-parts -x philly -f /opt/data/philly/trace-data python main.py run-parts -x philly -f /opt/data/philly/trace-data --start 2017-10-03T00:00 --end 2017-10-04T00:00 For Lumi Loading raps/dataloaders/philly.py +33 −53 Original line number Diff line number Diff line Loading @@ -43,41 +43,14 @@ def parse_timestamp(val): return None return None def load_gpu_traces_by_dayXX(gpu_trace_dir, machine_ids, job_start_dt, job_end_dt): """ Load GPU utilization for specific machines and time range, using preprocessed per-day CSVs (gpu_by_day/). """ dfs = [] current = job_start_dt.date() while current <= job_end_dt.date(): day_file = os.path.join(gpu_trace_dir, f"{current}.csv") if os.path.exists(day_file): df = pd.read_csv( day_file, names=["time", "machineId", "gpu_util"], parse_dates=["time"], on_bad_lines="skip" ) df = df[df["machineId"].isin(machine_ids)] df = df[(df["time"] >= job_start_dt) & (df["time"] <= job_end_dt)] if not df.empty: dfs.append(df) current += timedelta(days=1) if dfs: return pd.concat(dfs, ignore_index=True) return pd.DataFrame(columns=["time", "machineId", "gpu_util"]) def load_gpu_traces_by_day(trace_dir, start_dt, end_dt): """Load GPU traces only for the days between start_dt and end_dt.""" gpu_dir = os.path.join(trace_dir, "dist/gpu_by_day") frames = [] current = start_dt.date() while current <= end_dt.date(): daily_file = os.path.join(gpu_dir, f"{current}.csv") daily_file = os.path.join(trace_dir, f"{current}.csv") if os.path.exists(daily_file): df = pd.read_csv( daily_file, Loading Loading @@ -114,6 +87,7 @@ def load_data(files, **kwargs): # extract --start from kwargs start_ts = to_epoch(kwargs.get("start", DEFAULT_START)) end_ts = to_epoch(kwargs.get("end", DEFAULT_END)) assert len(files) == 1, "Expecting a single directory path" trace_dir = files[0] gpu_trace_dir = os.path.join(files[0], "dist", "gpu_by_day") Loading Loading @@ -153,15 +127,12 @@ def load_data(files, **kwargs): cpu_util["time"] = cpu_util["time"].apply(parse_date) # --- 3. GPU util --- start_dt = datetime.fromtimestamp(start_ts, tz=timezone.utc) end_dt = datetime.fromtimestamp(end_ts, tz=timezone.utc) PDT = timezone(timedelta(hours=-7)) start_dt = datetime.fromtimestamp(start_ts, tz=PDT) end_dt = datetime.fromtimestamp(end_ts, tz=PDT) gpu_trace_dir = os.path.join(trace_dir, "dist", "gpu_by_day") # For each job: gpu_trace = load_gpu_traces_by_day(gpu_trace_dir, start_dt, end_dt) job_gpu = load_gpu_traces_by_day(gpu_trace_dir, start_dt, end_dt) # --- 4. Job log --- job_file = os.path.join(trace_dir, "cluster_job_log") with open(job_file) as f: Loading Loading @@ -241,31 +212,38 @@ def load_data(files, **kwargs): gpus += len(detail.get("gpus", [])) # CPU utilization traces if machine_ids and start and end: mask = ( cpu_util["machine_id"].isin(machine_ids) & (cpu_util["time"] >= start) & (cpu_util["time"] <= end) ) job_cpu = cpu_util.loc[mask].copy() # Aggregate across machines if >1 machine if len(machine_ids) > 1: job_cpu = job_cpu.groupby("time")["cpu_util"].mean().reset_index() #if machine_ids and start and end: # mask = ( # cpu_util["machine_id"].isin(machine_ids) & # (cpu_util["time"] >= start) & # (cpu_util["time"] <= end) # ) # job_cpu = cpu_util.loc[mask].copy() # # # Aggregate across machines if >1 machine # if len(machine_ids) > 1: # job_cpu = job_cpu.groupby("time")["cpu_util"].mean().reset_index() # --- absolute datetimes (used for filtering traces) --- submitted_dt = parse_timestamp(raw.get("submitted_time")) print("***", machine_ids, start, end) gpu_trace = load_gpu_traces_by_day(gpu_trace_dir, start, end) mask = ( (gpu_trace["machineId"].isin(machine_ids)) & (gpu_trace["time"] >= start_dt) & (gpu_trace["time"] <= end_dt) (gpu_trace["time"] >= start) & (gpu_trace["time"] <= end) ) job_gpu = gpu_trace.loc[mask].copy() #job_gpu = gpu_trace.loc[mask].copy() #job_gpu_series = job_gpu["gpu_util"].tolist() #job_gpu_series = (job_gpu["gpu_util"].to_numpy() * 0.01).tolist() job_gpu_trace = (gpu_trace.loc[mask, "gpu_util"].to_numpy() * 0.01).tolist() print(f" job_gpu shape after filtering: {job_gpu.shape}") if job_gpu.empty: print(" ⚠ No GPU rows matched this job") #print(f" job_gpu shape after filtering: {job_gpu_trace.shape}") #if job_gpu_trace.empty: # print(" ⚠ No GPU rows matched this job") if machine_ids: # Shift times relative to start_ts Loading Loading @@ -301,7 +279,7 @@ def load_data(files, **kwargs): scheduled_nodes=scheduled_nodes, cpu_trace=0, gpu_trace=job_gpu, gpu_trace=job_gpu_trace, ntx_trace=None, nrx_trace=None, Loading @@ -322,6 +300,8 @@ def load_data(files, **kwargs): print(job) exit() # Find max end timestamp across jobs #end_ts = max(j.end_time for j in jobs_list if j.end_time is not None) end_ts = 3600 Loading Loading
README.md +1 −1 Original line number Diff line number Diff line Loading @@ -83,7 +83,7 @@ Microsoft Azure - 2017 Philly Traces python main.py run-parts -x philly -w multitenant # Telemetry replay python main.py run-parts -x philly -f /opt/data/philly/trace-data python main.py run-parts -x philly -f /opt/data/philly/trace-data --start 2017-10-03T00:00 --end 2017-10-04T00:00 For Lumi Loading
raps/dataloaders/philly.py +33 −53 Original line number Diff line number Diff line Loading @@ -43,41 +43,14 @@ def parse_timestamp(val): return None return None def load_gpu_traces_by_dayXX(gpu_trace_dir, machine_ids, job_start_dt, job_end_dt): """ Load GPU utilization for specific machines and time range, using preprocessed per-day CSVs (gpu_by_day/). """ dfs = [] current = job_start_dt.date() while current <= job_end_dt.date(): day_file = os.path.join(gpu_trace_dir, f"{current}.csv") if os.path.exists(day_file): df = pd.read_csv( day_file, names=["time", "machineId", "gpu_util"], parse_dates=["time"], on_bad_lines="skip" ) df = df[df["machineId"].isin(machine_ids)] df = df[(df["time"] >= job_start_dt) & (df["time"] <= job_end_dt)] if not df.empty: dfs.append(df) current += timedelta(days=1) if dfs: return pd.concat(dfs, ignore_index=True) return pd.DataFrame(columns=["time", "machineId", "gpu_util"]) def load_gpu_traces_by_day(trace_dir, start_dt, end_dt): """Load GPU traces only for the days between start_dt and end_dt.""" gpu_dir = os.path.join(trace_dir, "dist/gpu_by_day") frames = [] current = start_dt.date() while current <= end_dt.date(): daily_file = os.path.join(gpu_dir, f"{current}.csv") daily_file = os.path.join(trace_dir, f"{current}.csv") if os.path.exists(daily_file): df = pd.read_csv( daily_file, Loading Loading @@ -114,6 +87,7 @@ def load_data(files, **kwargs): # extract --start from kwargs start_ts = to_epoch(kwargs.get("start", DEFAULT_START)) end_ts = to_epoch(kwargs.get("end", DEFAULT_END)) assert len(files) == 1, "Expecting a single directory path" trace_dir = files[0] gpu_trace_dir = os.path.join(files[0], "dist", "gpu_by_day") Loading Loading @@ -153,15 +127,12 @@ def load_data(files, **kwargs): cpu_util["time"] = cpu_util["time"].apply(parse_date) # --- 3. GPU util --- start_dt = datetime.fromtimestamp(start_ts, tz=timezone.utc) end_dt = datetime.fromtimestamp(end_ts, tz=timezone.utc) PDT = timezone(timedelta(hours=-7)) start_dt = datetime.fromtimestamp(start_ts, tz=PDT) end_dt = datetime.fromtimestamp(end_ts, tz=PDT) gpu_trace_dir = os.path.join(trace_dir, "dist", "gpu_by_day") # For each job: gpu_trace = load_gpu_traces_by_day(gpu_trace_dir, start_dt, end_dt) job_gpu = load_gpu_traces_by_day(gpu_trace_dir, start_dt, end_dt) # --- 4. Job log --- job_file = os.path.join(trace_dir, "cluster_job_log") with open(job_file) as f: Loading Loading @@ -241,31 +212,38 @@ def load_data(files, **kwargs): gpus += len(detail.get("gpus", [])) # CPU utilization traces if machine_ids and start and end: mask = ( cpu_util["machine_id"].isin(machine_ids) & (cpu_util["time"] >= start) & (cpu_util["time"] <= end) ) job_cpu = cpu_util.loc[mask].copy() # Aggregate across machines if >1 machine if len(machine_ids) > 1: job_cpu = job_cpu.groupby("time")["cpu_util"].mean().reset_index() #if machine_ids and start and end: # mask = ( # cpu_util["machine_id"].isin(machine_ids) & # (cpu_util["time"] >= start) & # (cpu_util["time"] <= end) # ) # job_cpu = cpu_util.loc[mask].copy() # # # Aggregate across machines if >1 machine # if len(machine_ids) > 1: # job_cpu = job_cpu.groupby("time")["cpu_util"].mean().reset_index() # --- absolute datetimes (used for filtering traces) --- submitted_dt = parse_timestamp(raw.get("submitted_time")) print("***", machine_ids, start, end) gpu_trace = load_gpu_traces_by_day(gpu_trace_dir, start, end) mask = ( (gpu_trace["machineId"].isin(machine_ids)) & (gpu_trace["time"] >= start_dt) & (gpu_trace["time"] <= end_dt) (gpu_trace["time"] >= start) & (gpu_trace["time"] <= end) ) job_gpu = gpu_trace.loc[mask].copy() #job_gpu = gpu_trace.loc[mask].copy() #job_gpu_series = job_gpu["gpu_util"].tolist() #job_gpu_series = (job_gpu["gpu_util"].to_numpy() * 0.01).tolist() job_gpu_trace = (gpu_trace.loc[mask, "gpu_util"].to_numpy() * 0.01).tolist() print(f" job_gpu shape after filtering: {job_gpu.shape}") if job_gpu.empty: print(" ⚠ No GPU rows matched this job") #print(f" job_gpu shape after filtering: {job_gpu_trace.shape}") #if job_gpu_trace.empty: # print(" ⚠ No GPU rows matched this job") if machine_ids: # Shift times relative to start_ts Loading Loading @@ -301,7 +279,7 @@ def load_data(files, **kwargs): scheduled_nodes=scheduled_nodes, cpu_trace=0, gpu_trace=job_gpu, gpu_trace=job_gpu_trace, ntx_trace=None, nrx_trace=None, Loading @@ -322,6 +300,8 @@ def load_data(files, **kwargs): print(job) exit() # Find max end timestamp across jobs #end_ts = max(j.end_time for j in jobs_list if j.end_time is not None) end_ts = 3600 Loading