diff --git a/raps/engine.py b/raps/engine.py index ac7e8c76524616cae0e5c6a68a480bf53303688e..d502e45ff5d695f6bf74019c9abbb3be2d9a009d 100644 --- a/raps/engine.py +++ b/raps/engine.py @@ -499,7 +499,7 @@ class Engine: # update Running time for job in self.running: if job.current_state == JobState.RUNNING: - job.running_time = self.current_timestep - job.start_time + job.current_run_time = self.current_timestep - job.start_time # Stop the simulation if no more jobs are running or in the queue or in the job list. if autoshutdown and \ @@ -552,7 +552,7 @@ class Engine: for job in self.running: - job.running_time = self.current_timestep - job.start_time + job.current_run_time = self.current_timestep - job.start_time if job.current_state != JobState.RUNNING: raise ValueError( @@ -561,15 +561,15 @@ class Engine: ) else: # if job.state == JobState.RUNNING: # Error checks - if not replay and job.running_time > job.time_limit and job.end_time is not None: + if not replay and job.current_run_time > job.time_limit and job.end_time is not None: raise Exception(f"Job exceded time limit! " - f"{job.running_time} > {job.time_limit}" + f"{job.current_run_time} > {job.time_limit}" f"\n{job}" f"\nCurrent timestep:{self.current_timestep - self.timestep_start} (rel)" ) - if replay and job.running_time > job.expected_run_time: + if replay and job.current_run_time > job.expected_run_time: raise Exception(f"Job should have ended in replay! " - f" {job.running_time} > {job.expected_run_time}" + f" {job.current_run_time} > {job.expected_run_time}" f"\n{job}" f"\nCurrent timestep:{self.current_timestep - self.timestep_start} (rel)" ) diff --git a/raps/job.py b/raps/job.py index 05c455e7a07c8943cc5e52f296f4dde5c0bcabde..4d1dda01233023803938cc8d24e0091072842bfa 100644 --- a/raps/job.py +++ b/raps/job.py @@ -180,7 +180,7 @@ class Job: self.trace_start_time = None # Relative start time of the trace (to running time) self.trace_end_time = None # Relative end time of the trace self.trace_quanta = None # Trace quanta associated with the job # None means single value! - self.running_time = 0 # Current running time updated when simulating + self.current_run_time = 0 # Current running time updated when simulating # If a job dict was given, override the values from the job_dict: for key, value in job_dict.items(): @@ -232,7 +232,7 @@ class Job: f"trace_start_time={self.trace_start_time}, " f"trace_end_time={self.trace_end_time}, " f"trace_quanta={self.trace_quanta}, " - f"running_time={self.running_time}, " + f"current_run_time={self.current_run_time}, " f"power={self.power}, " f"power_history={self.power_history})") @@ -296,7 +296,7 @@ class JobStatistics: self.account = job.account self.num_nodes = len(job.scheduled_nodes) self.scheduled_nodes = job.scheduled_nodes - self.run_time = job.running_time + self.run_time = job.current_run_time self.submit_time = job.submit_time self.start_time = job.start_time self.end_time = job.end_time diff --git a/raps/network/base.py b/raps/network/base.py index bab2ec8eabb187b03affcb4e2079f2b145af3b97..3f3daeb953a182c0e0574c0da0f44aa50785abe7 100644 --- a/raps/network/base.py +++ b/raps/network/base.py @@ -4,6 +4,7 @@ from raps.utils import get_current_utilization from raps.network.fat_tree import node_id_to_host_name from raps.network.torus3d import link_loads_for_job_torus, torus_host_from_real_index + def debug_print_trace(job, label: str = ""): """Print either the length (if iterable) or the value of job.gpu_trace.""" if hasattr(job.gpu_trace, "__len__"): @@ -138,6 +139,7 @@ def worst_link_util(loads, throughput): max_util = util return max_util + def get_link_util_stats(loads, throughput, top_n=10): """ Calculates a distribution of link utilization stats. @@ -148,9 +150,9 @@ def get_link_util_stats(loads, throughput, top_n=10): # Calculate utilization for every link utilizations = {(edge): (byte_load * 8) / throughput for edge, byte_load in loads.items()} - + util_values = list(utilizations.values()) - + stats = { 'max': np.max(util_values), 'mean': np.mean(util_values), @@ -161,14 +163,16 @@ def get_link_util_stats(loads, throughput, top_n=10): # Get top N congested links sorted_links = sorted(utilizations.items(), key=lambda item: item[1], reverse=True) stats['top_links'] = sorted_links[:top_n] - + return stats + def max_throughput_per_tick(legacy_cfg: dict, trace_quanta: int) -> float: """Return bytes-per-tick throughput of a single link.""" bw = legacy_cfg.get("NETWORK_MAX_BW") or 12.5e9 return float(bw) * trace_quanta + def simulate_inter_job_congestion(network_model, jobs, legacy_cfg, debug=False): """ Simulates network congestion from a list of concurrently running jobs. @@ -181,8 +185,8 @@ def simulate_inter_job_congestion(network_model, jobs, legacy_cfg, debug=False): trace_quanta = jobs[0].trace_quanta if jobs else 0 for job in jobs: - # Assuming job.running_time is 0 for this static simulation - job.running_time = 0 + # Assuming job.current_run_time is 0 for this static simulation + job.current_run_time = 0 job.trace_start_time = 0 net_tx = get_current_utilization(job.ntx_trace, job) @@ -193,7 +197,7 @@ def simulate_inter_job_congestion(network_model, jobs, legacy_cfg, debug=False): host_list = [node_id_to_host_name(n, k) for n in job.scheduled_nodes] else: # dragonfly host_list = [network_model.real_to_fat_idx[real_n] for real_n in job.scheduled_nodes] - + job_loads = link_loads_for_job(network_model.net_graph, host_list, net_tx) elif network_model.topology == "torus3d": @@ -214,5 +218,5 @@ def simulate_inter_job_congestion(network_model, jobs, legacy_cfg, debug=False): max_throughput = max_throughput_per_tick(legacy_cfg, trace_quanta) net_stats = get_link_util_stats(total_loads, max_throughput) - + return net_stats diff --git a/raps/power.py b/raps/power.py index dd0745bab451fdc96969a481dd18b24a6f02d3ba..b1e6c9d70994d6aa76d17075d9bd3499c6af22f1 100644 --- a/raps/power.py +++ b/raps/power.py @@ -55,7 +55,7 @@ def compute_node_power(cpu_util, gpu_util, net_util, config): power_gpu = gpu_util * config['POWER_GPU_MAX'] + \ (config['GPUS_PER_NODE'] - gpu_util) * config['POWER_GPU_IDLE'] - if config.get("POWER_NIC_IDLE") != None and config.get("POWER_NIC_MAX") != None: + if config.get("POWER_NIC_IDLE") is not None and config.get("POWER_NIC_MAX") is not None: power_nic = config['POWER_NIC_IDLE'] + \ (config['POWER_NIC_MAX'] - config['POWER_NIC_IDLE']) * net_util else: @@ -432,7 +432,6 @@ class PowerManager: jobs_power = self.update_power_state(scheduled_nodes, cpu_utils, gpu_utils, net_utils) for i, job in enumerate(running_jobs): - # if job.running_time % self.config['TRACE_QUANTA'] == 0: job.power_history.append(jobs_power[i] * len(job.scheduled_nodes)) # Update the power array UI component diff --git a/raps/sim_config.py b/raps/sim_config.py index 254859ad959b8ca2ed5ac97c95963f7bdd9d09c9..a12512f2e87b7d55c4846604b9de7b2fcfc65aa3 100644 --- a/raps/sim_config.py +++ b/raps/sim_config.py @@ -136,7 +136,7 @@ class SimConfig(RAPSBaseModel, abc.ABC): # Workload arguments (TODO split into separate model) workload: Literal['random', 'benchmark', 'peak', 'idle', 'synthetic', 'multitenant', 'replay', 'randomAI', 'network_test', - 'inter_job_congestion', 'calculon'] = "random" + 'inter_job_congestion', 'calculon', 'hpl'] = "random" """ Type of synthetic workload """ multimodal: list[float] = [1.0] diff --git a/raps/ui.py b/raps/ui.py index 6330bc984e6c261286a77bb62abace73117b6508..cee033a7cf73df18d7d8b1ccc1de98acc37bd475 100644 --- a/raps/ui.py +++ b/raps/ui.py @@ -191,10 +191,10 @@ class LayoutManager: nodes_display = col_nodelist if self.engine.downscale != 1: - running_time_str = convert_seconds_to_hhmmss(job.running_time // self.engine.downscale) + \ - f" +{job.running_time % self.engine.downscale}/{self.engine.downscale}s" + running_time_str = convert_seconds_to_hhmmss(job.current_run_time // self.engine.downscale) + \ + f" +{job.current_run_time % self.engine.downscale}/{self.engine.downscale}s" else: - running_time_str = convert_seconds_to_hhmm(job.running_time) + running_time_str = convert_seconds_to_hhmm(job.current_run_time) row = [ str(job.id).zfill(5), @@ -269,13 +269,13 @@ class LayoutManager: # Add data row with white values time_in_s = time // self.engine.downscale if (time_in_s < 946684800): # Introducing Y2K into our codebase! Kek - time_str = convert_seconds_to_hhmm(time_in_s) + time_str = convert_seconds_to_hhmmss(time_in_s) else: # For the curious: If the simulation time in seconds is large than # unix timestamp for Jan 2000 this is a unix timestamp, time_str = f"{datetime.fromtimestamp(time_in_s).strftime('%Y-%m-%d %H:%M')}" if timestep_start != 0: # append time simulated - time_str += f"\nSim: {convert_seconds_to_hhmm(time_in_s - timestep_start)}" + time_str += f"\nSim: {convert_seconds_to_hhmmss(time_in_s - timestep_start)}" row.append(time_str) row.append(str(nrun)) diff --git a/raps/utils.py b/raps/utils.py index e232bce5cc9986547340b0944ea85a5f05a34432..d98be2a7073bd01a8f16436364b33aeb1af7b1a0 100644 --- a/raps/utils.py +++ b/raps/utils.py @@ -640,7 +640,7 @@ def get_current_utilization(trace, job: Job): if not job.trace_quanta: raise ValueError("job.trace_quanta is not set; cannot compute utilization.") - time_quanta_index = int((job.running_time - job.trace_start_time) // job.trace_quanta) + time_quanta_index = int((job.current_run_time - job.trace_start_time) // job.trace_quanta) if time_quanta_index < 0: time_quanta_index = 0 @@ -700,6 +700,7 @@ def validate_resolved_path(path: str | Path, info: ValidationInfo): raise ValueError(f"{path} is not under {base_path}") return path + ResolvedPath = A[Path, AfterValidator(validate_resolved_path)] """ Resolve a path, and expand ~ in the path string. @@ -829,7 +830,7 @@ def read_yaml(config_file: str | None) -> dict: return result -def read_yaml_parsed(cls: type[T], config_file = None) -> dict: +def read_yaml_parsed(cls: type[T], config_file=None) -> dict: """ Like read_yaml, but parses the input to resolve paths etc. Exits on error after printing message (for use in the CLI) diff --git a/raps/workloads/__init__.py b/raps/workloads/__init__.py index 9bcb41a4a84d7ce423157894109ab78a77dcd224..d7891964bcb6711f851c19de44e666c78e1fbd94 100644 --- a/raps/workloads/__init__.py +++ b/raps/workloads/__init__.py @@ -13,6 +13,7 @@ from .basic import BasicWorkload from .calculon import Calculon from .constants import JOB_NAMES, ACCT_NAMES, MAX_PRIORITY from .distribution import DistributionWorkload +from .hpl import HPL from .live import continuous_job_generation from .multitenant import MultitenantWorkload from .network import NetworkTestWorkload @@ -57,7 +58,8 @@ class Workload( MultitenantWorkload, NetworkTestWorkload, InterJobCongestionWorkload, - Calculon + Calculon, + HPL ): """Final workload class with all workload types.""" pass diff --git a/raps/workloads/hpl.py b/raps/workloads/hpl.py new file mode 100644 index 0000000000000000000000000000000000000000..e338061e5358a65d3cc7aef09032cea0acc360d1 --- /dev/null +++ b/raps/workloads/hpl.py @@ -0,0 +1,229 @@ +""" +Hao Lu’s analytical HPL model adapter for ExaDigiT. + +Usage: + python main.py run -w hpl -d +or: + python raps/workloads/hpl.py +""" + +from raps.job import Job, job_dict +import numpy as np +import math + + +class HPL: + """Analytical HPL workload generator for ExaDigiT.""" + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + # ------------------------------------------------------------------------- + # Public entry + # ------------------------------------------------------------------------- + def hpl(self, **kwargs): + jobs = [] + + # You can add more scenarios; comment out big ones while testing. + hpl_tests = [ + # Smaller grid (quick sanity check) + {"M": 2_097_152, "b": 576, "P": 16, "Q": 32, "Rtype": "1-ring", "f": 0.6}, + # Frontier-scale shape (comment in when ready) + {"M": 8_900_000, "b": 576, "P": 192, "Q": 384, "Rtype": "1-ring", "f": 0.6}, + ] + + for test in hpl_tests: + for partition in self.partitions: + cfg = self.config_map[partition] + trace_quanta = cfg["TRACE_QUANTA"] + + # Per-iteration timings (already concurrency-aware) + iterations = self._run_hpl_model(**test) + + # Convert iteration timings to sampled traces on TRACE_QUANTA grid + gpu_trace, cpu_trace = self._emit_traces_from_iters( + iterations, trace_quanta, cfg + ) + total_time = len(gpu_trace) * trace_quanta + + # Node count: ranks / (GPUs_per_node * GCDs_per_GPU) + gpus = cfg["GPUS_PER_NODE"] + gcds = cfg.get("GCDS_PER_GPU", 2) # Frontier MI250X default: 2 + ranks = test["P"] * test["Q"] + nodes_required = max(1, ranks // (gpus * gcds)) + + job_info = job_dict( + nodes_required=nodes_required, + scheduled_nodes=[], + name=f"HPL_{test['M']}x{test['M']}_P{test['P']}Q{test['Q']}", + account="benchmark", + cpu_trace=cpu_trace, + gpu_trace=gpu_trace, + ntx_trace=[], + nrx_trace=[], + id=None, + end_state="COMPLETED", + priority=100, + partition=partition, + time_limit=total_time, + start_time=0, + end_time=total_time, + expected_run_time=total_time, + trace_quanta=trace_quanta, + trace_time=total_time, + trace_start_time=0, + trace_end_time=total_time, + ) + jobs.append(Job(job_info)) + + return jobs + + # ------------------------------------------------------------------------- + # Analytical per-iteration model (concurrency-aware) + # ------------------------------------------------------------------------- + def _run_hpl_model(self, M, b, P, Q, Rtype="1-ring", f=0.6): + """ + Returns a list of dicts, one per iteration: + { + "T_iter": , + "gpu_active": , + "cpu_active": , + "net_active": , + } + + Concurrency-aware scaling: + - UPDATE (DGEMM) work is distributed over the full P*Q ranks → divide by (P*Q) + - PDFACT/LBCAST/RS* progress along process columns (Q) → divide by Q + This makes the per-iteration times reflect global wall-time. + """ + # Effective per-rank throughputs/bandwidths (empirical constants) + CAllgather = 6.3e9 # bytes/s + C1ring = 7.0e9 # bytes/s + Creduce = 46e6 # bytes/s + Fcpublas = 240e9 # FLOP/s + Fgemm = 24e12 # FLOP/s + + Ml = M / P + Nl = M / Q + nb = int(M / b) + iterations = [] + + for i in range(nb): + Ml_i = Ml - (i * b / P) + if Ml_i <= 0: + break + + # Local column partition sizes (A = [A1 | A2]), f is the split ratio + Nl1_i = max((1.0 - f) * Nl - (i * b / Q), 0.0) + Nl2_i = (f * Nl) if (i * b) < (f * Nl) else max(Nl - (i * b / Q), 0.0) + + # Component times (per-rank formulations) + # NOTE: units already account for bytes vs. elements (coeffs 16, 2/3, etc.) + TPDFACT_rank = (b**2) / Creduce + (2.0 / 3.0) * (b**2) * Ml_i / Fcpublas + TLBCAST_rank = 16.0 * b * Ml_i / C1ring + TUPD1_rank = 2.0 * b * Ml_i * Nl1_i / Fgemm + TUPD2_rank = 2.0 * b * Ml_i * Nl2_i / Fgemm + TRS1_rank = 16.0 * b * Nl1_i / CAllgather + TRS2_rank = 16.0 * b * Nl2_i / CAllgather + + # Concurrency: convert rank-local times to global wall-time contributions + # (coarse but effective partitioning of the communicators) + TPDFACT = TPDFACT_rank #/ Q + TLBCAST = TLBCAST_rank #/ Q + TRS1 = TRS1_rank #/ Q + TRS2 = TRS2_rank #/ Q + TUPD1 = TUPD1_rank #/ (P * Q) + TUPD2 = TUPD2_rank #/ (P * Q) + + # Two pipeline stages per iteration (HPL) + stage1 = max(TPDFACT + TLBCAST + TRS1, TUPD2) + stage2 = max(TRS2, TUPD1) + T_iter = stage1 + stage2 + + # Attribute activity (for utilization duty fractions) + gpu_active = max(TUPD1, TUPD2) + cpu_active = TPDFACT + net_active = TLBCAST + TRS1 + TRS2 + + iterations.append( + dict( + T_iter=T_iter, + gpu_active=gpu_active, + cpu_active=cpu_active, + net_active=net_active, + ) + ) + + return iterations + + def _emit_traces_from_iters(self, iterations, trace_quanta, cfg): + gpn = cfg["GPUS_PER_NODE"] + gpu_trace, cpu_trace = [], [] + acc_time = 0.0 + acc_gpu = 0.0 + acc_cpu = 0.0 + + for it in iterations: + T = it["T_iter"] + if T <= 0: + continue + + total_act = it["gpu_active"] + it["cpu_active"] + it["net_active"] + compute_ratio = it["gpu_active"] / total_act if total_act > 0 else 0.0 + cpu_ratio = it["cpu_active"] / total_act if total_act > 0 else 0.0 + fg = 0.8 + 0.2 * compute_ratio + fc = 0.6 + 0.3 * cpu_ratio + + acc_time += T + acc_gpu += gpn * fg * T + acc_cpu += fc * T + + # emit one sample each time we accumulate ≥ trace_quanta + while acc_time >= trace_quanta: + gpu_trace.append(acc_gpu / acc_time) + cpu_trace.append(acc_cpu / acc_time) + acc_time -= trace_quanta + acc_gpu = acc_cpu = 0.0 + + # flush remainder + if acc_time > 0: + gpu_trace.append(acc_gpu / acc_time) + cpu_trace.append(acc_cpu / acc_time) + + return np.array(gpu_trace), np.array(cpu_trace) + +# ----------------------------------------------------------------------------- +# Stand-alone test +# ----------------------------------------------------------------------------- +if __name__ == "__main__": + + class DummyHPL(HPL): + def __init__(self): + self.partitions = ["gpu"] + self.config_map = { + "gpu": { + "TRACE_QUANTA": 15.0, # seconds/sample + "GPUS_PER_NODE": 4, # Frontier physical GPUs/node + "GCDS_PER_GPU": 2, # MI250X logical ranks/GPU + "CPUS_PER_NODE": 64, + } + } + + hpl = DummyHPL() + jobs = hpl.hpl() + + print(f"Generated {len(jobs)} HPL job(s)\n") + for i, job in enumerate(jobs): + print(f"--- Job {i} ---") + print(f"Name: {job.name}") + print(f"Nodes required: {job.nodes_required}") + print(f"Wall time: {job.trace_time:.1f}s") + print(f"Trace samples: {len(job.gpu_trace)}") + print(f"Avg GPU util: {np.mean(job.gpu_trace):.2f} (0..{hpl.config_map['gpu']['GPUS_PER_NODE']})") + print(f"Avg CPU util: {np.mean(job.cpu_trace):.2f} (0..1)") + # Peek at starts/ends + print("GPU head:", np.round(job.gpu_trace[:8], 3)) + print("GPU tail:", np.round(job.gpu_trace[-8:], 3)) + print("CPU head:", np.round(job.cpu_trace[:8], 3)) + print("CPU tail:", np.round(job.cpu_trace[-8:], 3)) + print()