Loading raps/dataloaders/philly.py +37 −25 Original line number Diff line number Diff line Loading @@ -134,10 +134,7 @@ def load_data(files, **kwargs): cpu_util["time"] = cpu_util["time"].apply(parse_date) # --- 3. GPU util --- #PDT = timezone(timedelta(hours=-7)) #start_dt = datetime.fromtimestamp(start_ts, tz=PDT) #end_dt = datetime.fromtimestamp(end_ts, tz=PDT) start_dt = datetime.fromtimestamp(start_ts) # naive datetime start_dt = datetime.fromtimestamp(start_ts) end_dt = datetime.fromtimestamp(end_ts) cpu_trace_dir = os.path.join(trace_dir, "dist", "cpu_by_day") Loading @@ -148,6 +145,20 @@ def load_data(files, **kwargs): with open(job_file) as f: job_log = json.load(f) # --- First pass: filter jobs by date range --- filtered_log = [] for raw in job_log: submitted = raw.get("submitted_time") if submitted is None or submitted == "None": continue if isinstance(submitted, (int, float)): submitted_dt = datetime.fromtimestamp(int(submitted)) else: submitted_dt = parse_date(submitted) if submitted_dt and start_dt <= submitted_dt <= end_dt: filtered_log.append(raw) job_log = filtered_log # Filter job_log to only jobs matching the partition's gpus_per_node if gpus_per_node is not None: filtered_log = [] Loading @@ -162,6 +173,15 @@ def load_data(files, **kwargs): # --- First pass: find earliest submit time --- start_ts = None ### debug print("num jobs found", len(job_log)) for job in job_log[:100]: print(f"Job {job['jobid']}:") for attempt in job["attempts"]: print(" Start:", attempt["start_time"]) ### end debug for raw in job_log: submitted = raw.get("submitted_time") if submitted is None or submitted == "None": Loading @@ -176,6 +196,9 @@ def load_data(files, **kwargs): if start_ts is None or t < start_ts: start_ts = t # debug print(f"Job {job['jobid']}: submit_time {submitted}, start_ts: {start_ts}") if start_ts is None: raise ValueError("No valid submitted_time found in Philly traces") Loading Loading @@ -222,6 +245,8 @@ def load_data(files, **kwargs): gpus += len(detail.get("gpus", [])) num_nodes = len(machine_ids) if num_nodes == 0: continue gpus_per_node = gpus // num_nodes # --- absolute datetimes (used for filtering traces) --- Loading Loading @@ -274,15 +299,13 @@ def load_data(files, **kwargs): submit_time = submitted.timestamp() - start_ts if submitted else None start_time = start.timestamp() - start_ts if start else None end_time = end.timestamp() - start_ts if end else None #submit_time = submitted.timestamp() #start_time = start.timestamp() #end_time = end.timestamp() if not submit_time or not start_time or not end_time: warnings.warn( f"skipped {jobid} b/c missing submit_time, start_time, or end_time", UserWarning ) continue scheduled_nodes = [node_mapping[mid] for mid in machine_ids if mid in node_mapping] Loading @@ -308,10 +331,11 @@ def load_data(files, **kwargs): ntx_trace=None, nrx_trace=None, submit_time=0, #submit_time, start_time=0, #start_time, submit_time=submit_time, start_time=start_time, end_time=end_time, time_limit=end_time, #0, #time_limit=end_time - start_time, time_limit=end_time, expected_run_time=wall_time if wall_time else 0, current_run_time=0, trace_time=None, Loading @@ -323,22 +347,10 @@ def load_data(files, **kwargs): ) if job_cpu_trace and job_gpu_trace: jobs_list.append(Job(job)) else: tqdm.write(f"skipping {job['id']} b/c either no cpu or gpu trace") #print(job) print(start_ts, job["start_time"], job["end_time"]) if len(jobs_list) >= 20: break # Find max end timestamp across jobs, relative to first job #start_ts = min(j.start_time for j in jobs_list if j.start_time is not None) #end_ts = max(j.end_time for j in jobs_list if j.end_time is not None) # Absolute end_ts #end_ts = start_ts + end_ts end_ts = start_ts + 43200 print("***", start_ts, end_ts) tqdm.write(f"abs start time: {start_ts} rel job start: {job['start_time']} rel job end: {job['end_time']}") return WorkloadData( jobs=jobs_list, Loading raps/engine.py +69 −0 Original line number Diff line number Diff line Loading @@ -340,6 +340,75 @@ class Engine: start_date=self.start, ) if sim_config.live and not sim_config.replay: td = Telemetry(**sim_config_dict) workload_data = td.load_from_live_system() elif sim_config.replay: # TODO: this will have issues if running separate systems or custom systems partition_short = partition.split("/")[-1] if partition else None td = Telemetry( **sim_config_dict, partition=partition, ) if partition: snap_map = {p.stem: p for p in sim_config.replay[0].glob("*.npz")} if len(snap_map) > 0: if partition_short not in snap_map: raise RuntimeError(f"Snapshot '{partition_short}.npz' not in {sim_config.replay[0]}") replay_files = [snap_map[partition_short]] else: replay_files = sim_config.replay else: replay_files = sim_config.replay workload_data = td.load_from_files(replay_files) else: # Synthetic jobs wl = Workload(sim_config_args, system_config_dict) workload_data = wl.generate_jobs() td = Telemetry(**sim_config_dict) jobs = workload_data.jobs # TODO refactor how stat/end/fastforward/time work if sim_config.fastforward is not None: workload_data.telemetry_start = workload_data.telemetry_start + sim_config.fastforward if sim_config.time is not None: workload_data.telemetry_end = workload_data.telemetry_start + sim_config.time if sim_config.time_delta is not None: time_delta = sim_config.time_delta else: time_delta = 1 if sim_config.continuous_job_generation: continuous_workload = wl else: continuous_workload = None accounts = None if sim_config.accounts: job_accounts = Accounts(jobs) if sim_config.accounts_json: loaded_accounts = Accounts.from_json_filename(sim_config.accounts_json) accounts = Accounts.merge(loaded_accounts, job_accounts) else: accounts = job_accounts engine = Engine( power_manager=power_manager, flops_manager=flops_manager, cooling_model=cooling_model, continuous_workload=continuous_workload, jobs=jobs, accounts=accounts, telemetry=td, sim_config=sim_config, system_config=system_config, ) return engine, workload_data, time_delta def add_running_jobs_to_queue(self, jobs_to_submit: List): """ Modifies jobs_to_submit and self.queue Loading Loading
raps/dataloaders/philly.py +37 −25 Original line number Diff line number Diff line Loading @@ -134,10 +134,7 @@ def load_data(files, **kwargs): cpu_util["time"] = cpu_util["time"].apply(parse_date) # --- 3. GPU util --- #PDT = timezone(timedelta(hours=-7)) #start_dt = datetime.fromtimestamp(start_ts, tz=PDT) #end_dt = datetime.fromtimestamp(end_ts, tz=PDT) start_dt = datetime.fromtimestamp(start_ts) # naive datetime start_dt = datetime.fromtimestamp(start_ts) end_dt = datetime.fromtimestamp(end_ts) cpu_trace_dir = os.path.join(trace_dir, "dist", "cpu_by_day") Loading @@ -148,6 +145,20 @@ def load_data(files, **kwargs): with open(job_file) as f: job_log = json.load(f) # --- First pass: filter jobs by date range --- filtered_log = [] for raw in job_log: submitted = raw.get("submitted_time") if submitted is None or submitted == "None": continue if isinstance(submitted, (int, float)): submitted_dt = datetime.fromtimestamp(int(submitted)) else: submitted_dt = parse_date(submitted) if submitted_dt and start_dt <= submitted_dt <= end_dt: filtered_log.append(raw) job_log = filtered_log # Filter job_log to only jobs matching the partition's gpus_per_node if gpus_per_node is not None: filtered_log = [] Loading @@ -162,6 +173,15 @@ def load_data(files, **kwargs): # --- First pass: find earliest submit time --- start_ts = None ### debug print("num jobs found", len(job_log)) for job in job_log[:100]: print(f"Job {job['jobid']}:") for attempt in job["attempts"]: print(" Start:", attempt["start_time"]) ### end debug for raw in job_log: submitted = raw.get("submitted_time") if submitted is None or submitted == "None": Loading @@ -176,6 +196,9 @@ def load_data(files, **kwargs): if start_ts is None or t < start_ts: start_ts = t # debug print(f"Job {job['jobid']}: submit_time {submitted}, start_ts: {start_ts}") if start_ts is None: raise ValueError("No valid submitted_time found in Philly traces") Loading Loading @@ -222,6 +245,8 @@ def load_data(files, **kwargs): gpus += len(detail.get("gpus", [])) num_nodes = len(machine_ids) if num_nodes == 0: continue gpus_per_node = gpus // num_nodes # --- absolute datetimes (used for filtering traces) --- Loading Loading @@ -274,15 +299,13 @@ def load_data(files, **kwargs): submit_time = submitted.timestamp() - start_ts if submitted else None start_time = start.timestamp() - start_ts if start else None end_time = end.timestamp() - start_ts if end else None #submit_time = submitted.timestamp() #start_time = start.timestamp() #end_time = end.timestamp() if not submit_time or not start_time or not end_time: warnings.warn( f"skipped {jobid} b/c missing submit_time, start_time, or end_time", UserWarning ) continue scheduled_nodes = [node_mapping[mid] for mid in machine_ids if mid in node_mapping] Loading @@ -308,10 +331,11 @@ def load_data(files, **kwargs): ntx_trace=None, nrx_trace=None, submit_time=0, #submit_time, start_time=0, #start_time, submit_time=submit_time, start_time=start_time, end_time=end_time, time_limit=end_time, #0, #time_limit=end_time - start_time, time_limit=end_time, expected_run_time=wall_time if wall_time else 0, current_run_time=0, trace_time=None, Loading @@ -323,22 +347,10 @@ def load_data(files, **kwargs): ) if job_cpu_trace and job_gpu_trace: jobs_list.append(Job(job)) else: tqdm.write(f"skipping {job['id']} b/c either no cpu or gpu trace") #print(job) print(start_ts, job["start_time"], job["end_time"]) if len(jobs_list) >= 20: break # Find max end timestamp across jobs, relative to first job #start_ts = min(j.start_time for j in jobs_list if j.start_time is not None) #end_ts = max(j.end_time for j in jobs_list if j.end_time is not None) # Absolute end_ts #end_ts = start_ts + end_ts end_ts = start_ts + 43200 print("***", start_ts, end_ts) tqdm.write(f"abs start time: {start_ts} rel job start: {job['start_time']} rel job end: {job['end_time']}") return WorkloadData( jobs=jobs_list, Loading
raps/engine.py +69 −0 Original line number Diff line number Diff line Loading @@ -340,6 +340,75 @@ class Engine: start_date=self.start, ) if sim_config.live and not sim_config.replay: td = Telemetry(**sim_config_dict) workload_data = td.load_from_live_system() elif sim_config.replay: # TODO: this will have issues if running separate systems or custom systems partition_short = partition.split("/")[-1] if partition else None td = Telemetry( **sim_config_dict, partition=partition, ) if partition: snap_map = {p.stem: p for p in sim_config.replay[0].glob("*.npz")} if len(snap_map) > 0: if partition_short not in snap_map: raise RuntimeError(f"Snapshot '{partition_short}.npz' not in {sim_config.replay[0]}") replay_files = [snap_map[partition_short]] else: replay_files = sim_config.replay else: replay_files = sim_config.replay workload_data = td.load_from_files(replay_files) else: # Synthetic jobs wl = Workload(sim_config_args, system_config_dict) workload_data = wl.generate_jobs() td = Telemetry(**sim_config_dict) jobs = workload_data.jobs # TODO refactor how stat/end/fastforward/time work if sim_config.fastforward is not None: workload_data.telemetry_start = workload_data.telemetry_start + sim_config.fastforward if sim_config.time is not None: workload_data.telemetry_end = workload_data.telemetry_start + sim_config.time if sim_config.time_delta is not None: time_delta = sim_config.time_delta else: time_delta = 1 if sim_config.continuous_job_generation: continuous_workload = wl else: continuous_workload = None accounts = None if sim_config.accounts: job_accounts = Accounts(jobs) if sim_config.accounts_json: loaded_accounts = Accounts.from_json_filename(sim_config.accounts_json) accounts = Accounts.merge(loaded_accounts, job_accounts) else: accounts = job_accounts engine = Engine( power_manager=power_manager, flops_manager=flops_manager, cooling_model=cooling_model, continuous_workload=continuous_workload, jobs=jobs, accounts=accounts, telemetry=td, sim_config=sim_config, system_config=system_config, ) return engine, workload_data, time_delta def add_running_jobs_to_queue(self, jobs_to_submit: List): """ Modifies jobs_to_submit and self.queue Loading