Loading raps/dataloaders/philly.py +23 −11 Original line number Diff line number Diff line Loading @@ -30,6 +30,10 @@ def load_data(files, **kwargs): """ assert len(files) == 1, "Expecting a single directory path" trace_dir = files[0] config = kwargs.get('config') gpus_per_node = config.get("GPUS_PER_NODE") if gpus_per_node is None: raise ValueError("Must pass gpus_per_node (2 or 8)") # --- 1. Machine list --- machine_file = os.path.join(trace_dir, "cluster_machine_list") Loading @@ -43,8 +47,17 @@ def load_data(files, **kwargs): "gpu_mem": row[" single GPU mem"].strip() } # build node → index mapping node_mapping = {mid: idx for idx, mid in enumerate(sorted(machines.keys()))} partition_machines = { mid: info for mid, info in machines.items() if info["num_gpus"] == gpus_per_node } # Build node → index mapping for this partition node_mapping = {mid: idx for idx, mid in enumerate(sorted(partition_machines.keys()))} max_nodes = len(node_mapping) # Assign partition ID (e.g. 0 for 2-GPU, 1 for 8-GPU) partition_id = 0 if gpus_per_node == 2 else 1 # --- 2. CPU util --- cpu_file = os.path.join(trace_dir, "cluster_cpu_util") Loading Loading @@ -82,7 +95,7 @@ def load_data(files, **kwargs): # Keep only collapsed util plus metadata gpu_util = gpu_util[["time", "machineId", "gpu_util"]] print("Sample GPU util after preprocess:", gpu_util.head()) #print("Sample GPU util after preprocess:", gpu_util.head()) # --- 4. Job log --- job_file = os.path.join(trace_dir, "cluster_job_log") Loading @@ -108,10 +121,9 @@ def load_data(files, **kwargs): if start_ts is None: raise ValueError("No valid submitted_time found in Philly traces") # --- Second pass: build jobs --- jobs_list = [] for raw in tqdm(job_log, desc="Building Job objects"): for raw in tqdm(job_log[:1000], desc="Building Job objects"): jobid = raw.get("jobid") user = raw.get("user") status = raw.get("status") Loading Loading @@ -164,11 +176,11 @@ def load_data(files, **kwargs): if len(machine_ids) > 1: job_cpu = job_cpu.groupby("time")["cpu_util"].mean().reset_index() print("Job", jobid) print("machine_ids from job:", machine_ids[:5]) print("gpu_util machineId sample:", gpu_util["machineId"].unique()[:5]) print("start, end:", start, end) print("gpu_util time range:", gpu_util["time"].min(), gpu_util["time"].max()) #print("Job", jobid) #print("machine_ids from job:", machine_ids[:5]) #print("gpu_util machineId sample:", gpu_util["machineId"].unique()[:5]) #print("start, end:", start, end) #print("gpu_util time range:", gpu_util["time"].min(), gpu_util["time"].max()) # GPU utilization traces job_gpu = None Loading Loading @@ -206,7 +218,7 @@ def load_data(files, **kwargs): account=user if user else "unknown", nodes_required=len(machine_ids), partition=0, partition=partition_id, priority=0, cpu_cores_required=0, Loading Loading
raps/dataloaders/philly.py +23 −11 Original line number Diff line number Diff line Loading @@ -30,6 +30,10 @@ def load_data(files, **kwargs): """ assert len(files) == 1, "Expecting a single directory path" trace_dir = files[0] config = kwargs.get('config') gpus_per_node = config.get("GPUS_PER_NODE") if gpus_per_node is None: raise ValueError("Must pass gpus_per_node (2 or 8)") # --- 1. Machine list --- machine_file = os.path.join(trace_dir, "cluster_machine_list") Loading @@ -43,8 +47,17 @@ def load_data(files, **kwargs): "gpu_mem": row[" single GPU mem"].strip() } # build node → index mapping node_mapping = {mid: idx for idx, mid in enumerate(sorted(machines.keys()))} partition_machines = { mid: info for mid, info in machines.items() if info["num_gpus"] == gpus_per_node } # Build node → index mapping for this partition node_mapping = {mid: idx for idx, mid in enumerate(sorted(partition_machines.keys()))} max_nodes = len(node_mapping) # Assign partition ID (e.g. 0 for 2-GPU, 1 for 8-GPU) partition_id = 0 if gpus_per_node == 2 else 1 # --- 2. CPU util --- cpu_file = os.path.join(trace_dir, "cluster_cpu_util") Loading Loading @@ -82,7 +95,7 @@ def load_data(files, **kwargs): # Keep only collapsed util plus metadata gpu_util = gpu_util[["time", "machineId", "gpu_util"]] print("Sample GPU util after preprocess:", gpu_util.head()) #print("Sample GPU util after preprocess:", gpu_util.head()) # --- 4. Job log --- job_file = os.path.join(trace_dir, "cluster_job_log") Loading @@ -108,10 +121,9 @@ def load_data(files, **kwargs): if start_ts is None: raise ValueError("No valid submitted_time found in Philly traces") # --- Second pass: build jobs --- jobs_list = [] for raw in tqdm(job_log, desc="Building Job objects"): for raw in tqdm(job_log[:1000], desc="Building Job objects"): jobid = raw.get("jobid") user = raw.get("user") status = raw.get("status") Loading Loading @@ -164,11 +176,11 @@ def load_data(files, **kwargs): if len(machine_ids) > 1: job_cpu = job_cpu.groupby("time")["cpu_util"].mean().reset_index() print("Job", jobid) print("machine_ids from job:", machine_ids[:5]) print("gpu_util machineId sample:", gpu_util["machineId"].unique()[:5]) print("start, end:", start, end) print("gpu_util time range:", gpu_util["time"].min(), gpu_util["time"].max()) #print("Job", jobid) #print("machine_ids from job:", machine_ids[:5]) #print("gpu_util machineId sample:", gpu_util["machineId"].unique()[:5]) #print("start, end:", start, end) #print("gpu_util time range:", gpu_util["time"].min(), gpu_util["time"].max()) # GPU utilization traces job_gpu = None Loading Loading @@ -206,7 +218,7 @@ def load_data(files, **kwargs): account=user if user else "unknown", nodes_required=len(machine_ids), partition=0, partition=partition_id, priority=0, cpu_cores_required=0, Loading