diff --git a/README.md b/README.md index 160549fc97b53e55677a5e6d30625e02a8c6875e..c87ac5ed3e180c116220d068ad2f7bb1c095362d 100644 --- a/README.md +++ b/README.md @@ -29,7 +29,7 @@ Note: Requires python3.12 or greater. # Frontier DATEDIR="date=2024-01-18" - DPATH=~/data/frontier-sample-2024-01-18 + DPATH=/opt/data/frontier raps run -f $DPATH/slurm/joblive/$DATEDIR,$DPATH/jobprofile/$DATEDIR ## Open Telemetry dataset @@ -37,7 +37,7 @@ Note: Requires python3.12 or greater. For Marconi supercomputer, download `job_table.parquet` from https://zenodo.org/records/10127767 # Marconi100 - raps run --system marconi100 -f ~/data/marconi100/job_table.parquet + raps run --system marconi100 -f /opt/data/marconi100/job_table.parquet For Adastra MI250 supercomputer, download 'AdastaJobsMI250_15days.parquet' from https://zenodo.org/records/14007065 @@ -46,10 +46,10 @@ For Adastra MI250 supercomputer, download 'AdastaJobsMI250_15days.parquet' from For Google cluster trace v2 - raps run --system gcloudv2 -f ~/data/gcloud/v2/google_cluster_data_2011_sample --start '2011-05-02T00:10:00Z' + raps run --system gcloudv2 -f /opt/data/gcloud/v2/google_cluster_data_2011_sample --start '2011-05-02T00:10:00Z' # analyze dataset - raps telemetry --system gcloudv2 -f ~/data/gcloud/v2/google_cluster_data_2011_sample -v + raps telemetry --system gcloudv2 -f /opt/data/gcloud/v2/google_cluster_data_2011_sample -v For MIT Supercloud @@ -95,7 +95,17 @@ For Lumi Lassen is one of the few datasets that has networking data. See `raps/dataloaders/lassen.py` for how to get the datasets. To run a network simulation, use the following command: - raps run -f ~/data/lassen/Lassen-Supercomputer-Job-Dataset --system lassen --policy fcfs --backfill firstfit --start '2019-08-22T00:00:00+00:00' -t 12h --arrival poisson --net + raps run -f /opt/data/lassen/Lassen-Supercomputer-Job-Dataset --system lassen --policy fcfs --backfill firstfit --start '2019-08-22T00:00:00+00:00' -t 12h --arrival poisson --net + +To simulate synthetic network tests: + + raps run --system lassen -w network_test --net -t 15m + + raps run --system lassen -w inter_job_congestion --net -t 15m + +Run network congestion tests outside of RAPS: + + python scripts/run_inter_job_congestion.py --config config/lassen.yaml -v ## Snapshot of extracted workload data diff --git a/config/lassen.yaml b/config/lassen.yaml index 594479d221d08f5606574682001138aa99281de9..ad636f0d7c89b9309e287f0ea22170cff71a2659 100644 --- a/config/lassen.yaml +++ b/config/lassen.yaml @@ -120,9 +120,15 @@ cooling: w_cts_key: "simulator[1].centralEnergyPlant[1].coolingTowerLoop[1].summary.W_flow_CT_kW" network: topology: fat-tree - network_max_bw: 1000000000.0 - fattree_k: 16 + network_max_bw: 12.5e9 + fattree_k: 32 dragonfly_d: 11 dragonfly_a: 9 dragonfly_p: 8 latency: 1 + torus_x: 17 + torus_y: 17 + torus_z: 8 + torus_wrap: true + hosts_per_router: 2 + torus_routing: DOR_XYZ diff --git a/raps/engine.py b/raps/engine.py index ebf99c5ac69a1caf02287f36bd472fbb8cdf087f..ac7e8c76524616cae0e5c6a68a480bf53303688e 100644 --- a/raps/engine.py +++ b/raps/engine.py @@ -30,7 +30,8 @@ from raps.power import ( from raps.network import ( NetworkModel, apply_job_slowdown, - compute_system_network_stats + compute_system_network_stats, + simulate_inter_job_congestion ) from raps.telemetry import Telemetry from raps.cooling import ThermoFluidsModel @@ -292,6 +293,7 @@ class Engine: self.avg_net_tx = [] self.avg_net_rx = [] self.net_util_history = [] + self.net_congestion_history = [] self.avg_slowdown_history = [] self.max_slowdown_history = [] self.node_occupancy_history = [] @@ -328,7 +330,7 @@ class Engine: available_nodes = self.resource_manager.available_nodes self.network_model = NetworkModel( available_nodes=available_nodes, - config=self.config, + config=self.config ) else: self.network_model = None @@ -621,6 +623,18 @@ class Engine: system_util = self.num_active_nodes / self.config['AVAILABLE_NODES'] * 100 self.record_util_stats(system_util=system_util) + # --- Inter-Job Network Congestion --- + if self.simulate_network and self.network_model and self.running: + congestion_stats = simulate_inter_job_congestion( + self.network_model, self.running, self.config, self.debug + ) + if isinstance(congestion_stats, dict): + total_congestion = congestion_stats['mean'] + else: + total_congestion = congestion_stats + self.net_congestion_history.append((self.current_timestep, total_congestion)) + # --- + # System Power if self.power_manager: # Power is always simulated power_df, rack_power, total_power_kw, total_loss_kw, jobs_power = \ diff --git a/raps/network/__init__.py b/raps/network/__init__.py index eb49ee619c5577ecb8c62757c68c2e5487bf4d91..3522b39fa38e0cb94f7e5e77780cdca6a4ac0e7d 100644 --- a/raps/network/__init__.py +++ b/raps/network/__init__.py @@ -1,3 +1,6 @@ +import os +import warnings + from .base import ( all_to_all_paths, apply_job_slowdown, @@ -7,11 +10,16 @@ from .base import ( network_slowdown, network_utilization, worst_link_util, + get_link_util_stats, + simulate_inter_job_congestion, + max_throughput_per_tick, ) -from .fat_tree import build_fattree, node_id_to_host_name -from .torus3d import build_torus3d, link_loads_for_job_torus -from .dragonfly import build_dragonfly, dragonfly_node_id_to_host_name +from .fat_tree import build_fattree, node_id_to_host_name, subsample_hosts +from .torus3d import build_torus3d, link_loads_for_job_torus, torus_host_from_real_index +from .dragonfly import build_dragonfly, dragonfly_node_id_to_host_name, build_dragonfly_idx_map +from raps.plotting import plot_fattree_hierarchy, plot_dragonfly, plot_torus2d, plot_torus3d + from raps.utils import get_current_utilization __all__ = [ @@ -28,6 +36,9 @@ __all__ = [ "build_torus3d", "build_dragonfly", "dragonfly_node_id_to_host_name", + "simulate_inter_job_congestion", + "max_throughput_per_tick", + "get_link_util_stats", ] @@ -39,8 +50,11 @@ class NetworkModel: self.real_to_fat_idx = kwargs.get("real_to_fat_idx", {}) if self.topology == "fat-tree": + total_nodes = config['TOTAL_NODES'] - len(config['DOWN_NODES']) self.fattree_k = config.get("FATTREE_K") - self.net_graph = build_fattree(self.fattree_k) + self.net_graph = build_fattree(self.fattree_k, total_nodes) + # TODO: future testing of subsampling feature + #self.net_graph = subsample_hosts(self.net_graph, num_hosts=4626) elif self.topology == "torus3d": dims = ( @@ -67,11 +81,22 @@ class NetworkModel: nid += 1 elif self.topology == "dragonfly": - self.net_graph = build_dragonfly( - int(config["DRAGONFLY_D"]), - int(config["DRAGONFLY_A"]), - int(config.get("DRAGONFLY_P", 1)) - ) + D = self.config["DRAGONFLY_D"] + A = self.config["DRAGONFLY_A"] + P = self.config["DRAGONFLY_P"] + self.net_graph = build_dragonfly(D, A, P) + + # total nodes seen by scheduler or job trace + total_real_nodes = getattr(self, "available_nodes", None) + if total_real_nodes is None: + total_real_nodes = 4626 # fallback for Lassen + + # if available_nodes is a list, take its length + if not isinstance(total_real_nodes, int): + total_real_nodes = len(total_real_nodes) + + self.real_to_fat_idx = build_dragonfly_idx_map(D, A, P, total_real_nodes) + print(f"[DEBUG] Dragonfly mapping: {len(self.real_to_fat_idx)} entries") elif self.topology == "capacity": # Capacity-only model: no explicit graph @@ -100,18 +125,28 @@ class NetworkModel: print(" fat-tree hosts:", host_list) elif self.topology == "dragonfly": - D, A, P = self.config["DRAGONFLY_D"], self.config["DRAGONFLY_A"], self.config["DRAGONFLY_P"] - host_list = [ - dragonfly_node_id_to_host_name(self.real_to_fat_idx[real_n], D, A, P) - for real_n in job.scheduled_nodes - ] + D = self.config["DRAGONFLY_D"] + A = self.config["DRAGONFLY_A"] + P = self.config["DRAGONFLY_P"] + # Directly use mapped host names + host_list = [self.real_to_fat_idx[real_n] for real_n in job.scheduled_nodes] if debug: print(" dragonfly hosts:", host_list) + print("Example nodes in graph:", list(self.net_graph.nodes)[:10]) + print("Contains h_0_9_0?", "h_0_9_0" in self.net_graph) loads = link_loads_for_job(self.net_graph, host_list, net_tx) net_cong = worst_link_util(loads, max_throughput) elif self.topology == "torus3d": - host_list = [self.id_to_host[n] for n in job.scheduled_nodes] + X = self.config["TORUS_X"] + Y = self.config["TORUS_Y"] + Z = self.config["TORUS_Z"] + hosts_per_router = self.config["HOSTS_PER_ROUTER"] + #host_list = [self.id_to_host[n] for n in job.scheduled_nodes] + host_list = [ + torus_host_from_real_index(n, X, Y, Z, hosts_per_router) + for n in job.scheduled_nodes + ] loads = link_loads_for_job_torus(self.net_graph, self.meta, host_list, net_tx) net_cong = worst_link_util(loads, max_throughput) if debug: @@ -124,3 +159,23 @@ class NetworkModel: raise ValueError(f"Unsupported topology: {self.topology}") return net_util, net_cong, net_tx, net_rx, max_throughput + + def plot_topology(self, output_dir): + """Plot network topology - save as png file in output_dir.""" + if output_dir: + if self.topology == "fat-tree": + save_path = output_dir / "net-fat-tree.png" + plot_fattree_hierarchy(self.net_graph, k=self.fattree_k, save_path=save_path) + elif self.topology == "dragonfly": + save_path = output_dir / "net-dragonfly.png" + plot_dragonfly(self.net_graph, save_path=save_path) + elif self.topology == "torus3d": + save_path = output_dir / "net-torus2d.png" + plot_torus2d(self.net_graph, save_path=save_path) + save_path = output_dir / "net-torus3d.png" + plot_torus3d(self.net_graph, save_path=save_path) + else: + warnings.warn( + f"plotting not supported for {self.topology} topology", + UserWarning + ) diff --git a/raps/network/base.py b/raps/network/base.py index f14c523a230add1fe555b8b4eda5f6c2bf74ca1a..bab2ec8eabb187b03affcb4e2079f2b145af3b97 100644 --- a/raps/network/base.py +++ b/raps/network/base.py @@ -1,5 +1,8 @@ import networkx as nx - +import numpy as np +from raps.utils import get_current_utilization +from raps.network.fat_tree import node_id_to_host_name +from raps.network.torus3d import link_loads_for_job_torus, torus_host_from_real_index def debug_print_trace(job, label: str = ""): """Print either the length (if iterable) or the value of job.gpu_trace.""" @@ -134,3 +137,82 @@ def worst_link_util(loads, throughput): if util > max_util: max_util = util return max_util + +def get_link_util_stats(loads, throughput, top_n=10): + """ + Calculates a distribution of link utilization stats. + Returns a dictionary with min, mean, max, std_dev, and top N congested links. + """ + if not loads: + return {'max': 0, 'mean': 0, 'min': 0, 'std_dev': 0, 'top_links': []} + + # Calculate utilization for every link + utilizations = {(edge): (byte_load * 8) / throughput for edge, byte_load in loads.items()} + + util_values = list(utilizations.values()) + + stats = { + 'max': np.max(util_values), + 'mean': np.mean(util_values), + 'min': np.min(util_values), + 'std_dev': np.std(util_values) + } + + # Get top N congested links + sorted_links = sorted(utilizations.items(), key=lambda item: item[1], reverse=True) + stats['top_links'] = sorted_links[:top_n] + + return stats + +def max_throughput_per_tick(legacy_cfg: dict, trace_quanta: int) -> float: + """Return bytes-per-tick throughput of a single link.""" + bw = legacy_cfg.get("NETWORK_MAX_BW") or 12.5e9 + return float(bw) * trace_quanta + +def simulate_inter_job_congestion(network_model, jobs, legacy_cfg, debug=False): + """ + Simulates network congestion from a list of concurrently running jobs. + """ + if not network_model.net_graph: + print("[WARN] Network graph is not defined. Skipping congestion simulation.") + return 0.0 + + total_loads = {tuple(sorted(edge)): 0.0 for edge in network_model.net_graph.edges()} + trace_quanta = jobs[0].trace_quanta if jobs else 0 + + for job in jobs: + # Assuming job.running_time is 0 for this static simulation + job.running_time = 0 + job.trace_start_time = 0 + net_tx = get_current_utilization(job.ntx_trace, job) + + job_loads = {} + if network_model.topology in ("fat-tree", "dragonfly"): + if network_model.topology == "fat-tree": + k = int(legacy_cfg.get("FATTREE_K", 32)) + host_list = [node_id_to_host_name(n, k) for n in job.scheduled_nodes] + else: # dragonfly + host_list = [network_model.real_to_fat_idx[real_n] for real_n in job.scheduled_nodes] + + job_loads = link_loads_for_job(network_model.net_graph, host_list, net_tx) + + elif network_model.topology == "torus3d": + X = int(legacy_cfg.get("TORUS_X", 12)) + Y = int(legacy_cfg.get("TORUS_Y", 12)) + Z = int(legacy_cfg.get("TORUS_Z", 12)) + hosts_per_router = int(legacy_cfg.get("HOSTS_PER_ROUTER", 1)) + host_list = [ + torus_host_from_real_index(n, X, Y, Z, hosts_per_router) + for n in job.scheduled_nodes + ] + job_loads = link_loads_for_job_torus(network_model.net_graph, network_model.meta, host_list, net_tx) + + for edge, load in job_loads.items(): + edge_key = tuple(sorted(edge)) + if edge_key in total_loads: + total_loads[edge_key] += load + + max_throughput = max_throughput_per_tick(legacy_cfg, trace_quanta) + net_stats = get_link_util_stats(total_loads, max_throughput) + + return net_stats diff --git a/raps/network/dragonfly.py b/raps/network/dragonfly.py index a13d1dc3292f1e89c75c2cde3e6538b016eac529..0f29b68c556571a0193241c938d198f20b167152 100644 --- a/raps/network/dragonfly.py +++ b/raps/network/dragonfly.py @@ -2,7 +2,50 @@ import networkx as nx from itertools import combinations -def build_dragonfly(D: int, A: int, P: int) -> nx.Graph: +import networkx as nx + +def build_dragonfly(d, a, p): + """ + Build a Dragonfly network graph. + d = routers per group + a = global connections per router + p = compute nodes per router + """ + G = nx.Graph() + num_groups = a + 1 # standard Dragonfly rule + + # --- Routers and hosts --- + for g in range(num_groups): + for r in range(d): + router = f"r_{g}_{r}" + G.add_node(router, layer="router", group=g) + + # attach p hosts to each router + for h in range(p): + host = f"h_{g}_{r}_{h}" + G.add_node(host, layer="host", group=g) + G.add_edge(router, host) + + # --- Intra-group full mesh --- + for g in range(num_groups): + routers = [f"r_{g}_{r}" for r in range(d)] + for i in range(d): + for j in range(i + 1, d): + G.add_edge(routers[i], routers[j]) + + # --- Inter-group (global) links --- + for g in range(num_groups): + for r in range(d): + src = f"r_{g}_{r}" + for offset in range(1, a + 1): + dst_group = (g + offset) % num_groups + dst = f"r_{dst_group}_{r % d}" + G.add_edge(src, dst) + + return G + + +def build_dragonfly2(D: int, A: int, P: int) -> nx.Graph: """ Build a “simple” k-ary Dragonfly with: D = # of groups @@ -17,6 +60,12 @@ def build_dragonfly(D: int, A: int, P: int) -> nx.Graph: 1. All routers within a group form a full clique. 2. Each router r in group g has exactly one “global link” to router r in each other group. 3. Each router r in group g attaches to P hosts ("h_{g}_{r}_{0..P−1}"). + + Examples + -------- + >>> from raps.plotting import plot_network_graph + >>> G = build_dragonfly(D=2, A=2, P=2) + >>> plot_network_graph(G, 'dragonfly.png') """ G = nx.Graph() @@ -55,21 +104,41 @@ def build_dragonfly(D: int, A: int, P: int) -> nx.Graph: def dragonfly_node_id_to_host_name(fat_idx: int, D: int, A: int, P: int) -> str: """ - Given a contiguous fat‐index ∈ [0..(D*A*P − 1)], return "h_{g}_{r}_{p}". - Hosts are laid out in order: - 0..(P−1) → group=0, router=0, p=0..P−1 - P..2P−1 → group=0, router=1, p=0..P−1 - … - (A*P)..(2A*P−1) → group=1, router=0, … - In general: - host_offset = fat_idx % P - router_offset = (fat_idx // P) % A - group = fat_idx // (A*P) + Convert a contiguous Dragonfly host index to its hierarchical name. + + For a Dragonfly with: + D routers per group, + A global links per router ⇒ num_groups = A + 1, + P compute nodes per router. + + Hosts are laid out in contiguous order: + group g = floor(fat_idx / (D * P)) + router r = (fat_idx // P) % D + host h = fat_idx % P + """ + num_groups = A + 1 + total_hosts = num_groups * D * P + assert 0 <= fat_idx < total_hosts, f"fat_idx {fat_idx} out of range (max {total_hosts-1})" + + group = fat_idx // (D * P) + router = (fat_idx // P) % D + host = fat_idx % P + return f"h_{group}_{router}_{host}" + + +def build_dragonfly_idx_map(d: int, a: int, p: int, total_real_nodes: int) -> dict[int, str]: + """ + Build a mapping {real_node_index: host_name} for Dragonfly. + Wrap around if total_real_nodes > total_hosts. """ - total_hosts = D * A * P - assert 0 <= fat_idx < total_hosts, "fat_idx out of range" + num_groups = a + 1 + total_hosts = num_groups * d * p - host_offset = fat_idx % P - router_group = (fat_idx // P) % A - pod = fat_idx // (A * P) - return f"h_{pod}_{router_group}_{host_offset}" + mapping = {} + for i in range(total_real_nodes): + fat_idx = i % total_hosts # <- wrap safely + group = fat_idx // (d * p) + router = (fat_idx // p) % d + host = fat_idx % p + mapping[i] = f"h_{group}_{router}_{host}" + return mapping diff --git a/raps/network/fat_tree.py b/raps/network/fat_tree.py index 2d27b39d980d244397d92684850709450bfe09d2..c514b836b7a632d7f3f43e010ca2c6e29b50d907 100644 --- a/raps/network/fat_tree.py +++ b/raps/network/fat_tree.py @@ -1,3 +1,4 @@ +import random import networkx as nx @@ -13,7 +14,7 @@ def node_id_to_host_name(node_id: int, k: int) -> str: return f"h_{pod}_{edge}_{host}" -def build_fattree(k): +def build_fattree(k, total_nodes): """ Build a k-ary fat-tree: - k pods @@ -25,7 +26,19 @@ def build_fattree(k): - edge switches "e_{pod}_{edge}" - agg switches "a_{pod}_{agg}" - core switches "c_{i}_{j}" + + Examples + -------- + >>> from raps.plotting import plot_network_graph + >>> G = build_fattree(k=4, total_nodes=16) + >>> plot_network_graph(G, 'fat_tree.png') """ + num_hosts = (k**3) // 4 + if num_hosts < total_nodes: + raise ValueError( + f"Fat-tree network with k={k} has {num_hosts} hosts, but the system has {total_nodes} nodes. " + f"Please increase the value of 'fattree_k' in the system configuration file." + ) G = nx.Graph() # core # num_core = (k//2)**2 # Unused! @@ -57,3 +70,13 @@ def build_fattree(k): G.add_node(host, type="host") G.add_edge(e, host) return G + + +def subsample_hosts(G, num_hosts): + """Reduce the number of host nodes in the FatTree graph to match system size.""" + hosts = [n for n in G if n.startswith("h")] + if num_hosts < len(hosts): + keep = set(random.sample(hosts, num_hosts)) + remove = [n for n in hosts if n not in keep] + G.remove_nodes_from(remove) + return G diff --git a/raps/network/torus3d.py b/raps/network/torus3d.py index 50c988f567b59f2b09d5845a78dbdeb4980b239c..b88e1d2aba4f9b13ed7612107ebb42c6b52ac478 100644 --- a/raps/network/torus3d.py +++ b/raps/network/torus3d.py @@ -3,90 +3,120 @@ import networkx as nx from pathlib import Path -def build_torus3d(dims, wrap=True, link_bw=1e9, hosts_per_router=1, routing="DOR_XYZ", coords_csv=None): +def build_torus3d( + dims, + wrap=True, + hosts_per_router: int = 1, + torus_link_bw: float = None, + latency_per_hop: float = None, + network_max_bw: float = None, +): """ - Build a 3D torus at router granularity, then attach host nodes to routers. - Node ids in the returned graph are host names ("h_x_y_z_i") and router names ("r_x_y_z"). - Edges have attribute 'capacity' (bytes/s) and 'latency' (per hop). + Build a 3D torus network (routers + hosts). + Each router r_x_y_z connects to 6 neighbors (±X, ±Y, ±Z) + and attaches hosts h_x_y_z_p for p ∈ [0..hosts_per_router-1]. + + Returns: + (G, meta) where: + - G: networkx.Graph + - meta: dict with topology info for plotting/simulation """ - X, Y, Z = map(int, dims) + X, Y, Z = dims G = nx.Graph() - # Routers - def rname(x, y, z): - return f"r_{x}_{y}_{z}" - + # --- Add routers with normalized coordinates --- for x in range(X): for y in range(Y): for z in range(Z): - G.add_node(rname(x, y, z), kind="router", coord=(x, y, z)) - - # Toroidal links between routers (±x, ±y, ±z) - def wrapi(i, n): - return (i + n) % n if wrap else (None if i < 0 or i >= n else i) - + name = f"r_{x}_{y}_{z}" + G.add_node( + name, + type="router", + x=x / (X - 1 if X > 1 else 1), + y=y / (Y - 1 if Y > 1 else 1), + z=z / (Z - 1 if Z > 1 else 1), + ) + + # --- Add wrap-around router-to-router edges --- for x in range(X): for y in range(Y): for z in range(Z): - u = rname(x, y, z) - # x+ - nxp = wrapi(x + 1, X) - v = rname(nxp, y, z) if nxp is not None else None - if v and not G.has_edge(u, v): - G.add_edge(u, v, capacity=link_bw) - # y+ - nyp = wrapi(y + 1, Y) - v = rname(x, nyp, z) if nyp is not None else None - if v and not G.has_edge(u, v): - G.add_edge(u, v, capacity=link_bw) - # z+ - nzp = wrapi(z + 1, Z) - v = rname(x, y, nzp) if nzp is not None else None - if v and not G.has_edge(u, v): - G.add_edge(u, v, capacity=link_bw) - - # Attach hosts to routers + src = f"r_{x}_{y}_{z}" + + nx_ = (x + 1) % X if wrap else x + 1 + if nx_ < X: + G.add_edge( + src, f"r_{nx_}_{y}_{z}", + bandwidth=torus_link_bw, + latency=latency_per_hop, + type="router_link" + ) + + ny_ = (y + 1) % Y if wrap else y + 1 + if ny_ < Y: + G.add_edge( + src, f"r_{x}_{ny_}_{z}", + bandwidth=torus_link_bw, + latency=latency_per_hop, + type="router_link" + ) + + nz_ = (z + 1) % Z if wrap else z + 1 + if nz_ < Z: + G.add_edge( + src, f"r_{x}_{y}_{nz_}", + bandwidth=torus_link_bw, + latency=latency_per_hop, + type="router_link" + ) + + # --- Add hosts and host-router edges --- + for x in range(X): + for y in range(Y): + for z in range(Z): + router = f"r_{x}_{y}_{z}" + for p in range(hosts_per_router): + host = f"h_{x}_{y}_{z}_{p}" + G.add_node( + host, + type="host", + x=(x + 0.1) / (X - 1 if X > 1 else 1), + y=(y + 0.1) / (Y - 1 if Y > 1 else 1), + z=(z + 0.1 * (p + 1)) / (Z - 1 if Z > 1 else 1), + ) + G.add_edge( + host, router, + bandwidth=network_max_bw, + latency=latency_per_hop, + type="host_link" + ) + + # --- Build host <-> router mappings for simulator use --- host_to_router = {} router_to_hosts = {} - def hname(x, y, z, i): - return f"h_{x}_{y}_{z}_{i}" - - # If a nid→(x,y,z) CSV is supplied, place accordingly; else dense round-robin - # CSV format: nid,x,y,z[,i] - nid_placement = {} - if coords_csv: - p = Path(coords_csv) - with p.open("rt") as fh: - rd = csv.reader(fh) - for row in rd: - if not row: - continue - nid = int(row[0]) - x, y, z = map(int, row[1:4]) - i = int(row[4]) if len(row) > 4 else 0 - nid_placement[nid] = (x, y, z, i) - - # Build hosts for x in range(X): for y in range(Y): for z in range(Z): - r = rname(x, y, z) - router_to_hosts[r] = [] - for i in range(hosts_per_router): - h = hname(x, y, z, i) - G.add_node(h, kind="host", coord=(x, y, z), local_index=i) - G.add_edge(h, r, capacity=link_bw) # host↔router edge; you can cap with NETWORK_MAX_BW instead - host_to_router[h] = r - router_to_hosts[r].append(h) + router = f"r_{x}_{y}_{z}" + router_to_hosts[router] = [] + for p in range(hosts_per_router): + host = f"h_{x}_{y}_{z}_{p}" + host_to_router[host] = router + router_to_hosts[router].append(host) meta = { + "topology": "torus3d", "dims": (X, Y, Z), + "hosts_per_router": hosts_per_router, "wrap": wrap, - "routing": routing, + "num_routers": X * Y * Z, + "num_hosts": X * Y * Z * hosts_per_router, "host_to_router": host_to_router, "router_to_hosts": router_to_hosts, } + + print(f"Built 3D torus with {meta['num_routers']} routers and {meta['num_hosts']} hosts.") return G, meta @@ -150,3 +180,15 @@ def link_loads_for_job_torus(G, meta, host_list, traffic_bytes): e = tuple(sorted((u, v))) loads[e] = loads.get(e, 0) + traffic_bytes return loads + + +def torus_host_from_real_index(real_n, X, Y, Z, hosts_per_router): + total_hosts = X * Y * Z * hosts_per_router + idx = real_n % total_hosts + r = idx // hosts_per_router + h = idx % hosts_per_router + z = r % Z + y = (r // Z) % Y + x = (r // (Y * Z)) % X + return f"h_{x}_{y}_{z}_{h}" + diff --git a/raps/plotting.py b/raps/plotting.py index 606ac7a5d3de1bcf948e8ea2b227683c984f6de2..44a66af608cf00611782ac9844e1e8f0495c3d9d 100644 --- a/raps/plotting.py +++ b/raps/plotting.py @@ -17,9 +17,13 @@ import itertools from pathlib import Path import matplotlib.pyplot as plt import matplotlib.ticker as ticker +from mpl_toolkits.mplot3d import Axes3D from matplotlib.ticker import MaxNLocator + import time import numpy as np +import networkx as nx +import random from uncertainties import unumpy from rich.progress import track @@ -404,6 +408,223 @@ def plot_nodes_gantt(*, ax=None, jobs): return ax +def plot_fattree_hierarchy(G, k=32, save_path='net_fattree.png'): + """Draw a hierarchical Fat-Tree layout with automatic scaling.""" + pos = {} + + # --- Layer order and matching prefixes --- + layers = ["core", "agg", "edge", "h"] + layer_prefixes = { + "core": ["core", "c_"], + "agg": ["agg", "a_"], + "edge": ["edge", "e_"], + "h": ["h", "host"] + } + + # --- Compute how many nodes per layer --- + layer_counts = {} + for layer in layers: + prefixes = layer_prefixes[layer] + layer_nodes = [n for n in G.nodes if any(n.startswith(p) for p in prefixes)] + layer_counts[layer] = len(layer_nodes) + + max_nodes = max(layer_counts.values()) or 1 + y_gap = 1.0 / (len(layers) - 1) + + # --- Assign positions, normalized to [0,1] range --- + for j, layer in enumerate(layers): + prefixes = layer_prefixes[layer] + layer_nodes = [n for n in G.nodes if any(n.startswith(p) for p in prefixes)] + n_layer = len(layer_nodes) + if n_layer == 0: + continue + x_spacing = 1.0 / n_layer + y = 1.0 - j * y_gap + for i, node in enumerate(layer_nodes): + x = (i + 0.5) * x_spacing # center each node + pos[node] = (x, y) + + # --- Draw figure --- + plt.figure(figsize=(10, 8)) + color_map = {"core": "red", "agg": "orange", "edge": "green", "h": "blue"} + size_map = {"core": 30, "agg": 20, "edge": 10, "h": 5} + + for layer in layers: + nodes = [n for n in G.nodes if any(n.startswith(p) for p in layer_prefixes[layer])] + if nodes: + nx.draw_networkx_nodes( + G, pos, nodelist=nodes, node_color=color_map[layer], + node_size=size_map[layer], label=layer.capitalize(), alpha=0.7 + ) + + # --- Only draw inter-layer edges for clarity --- + edgelist = [ + (u, v) for (u, v) in G.edges + if not any(u.startswith(p) and v.startswith(p) + for p in ["c_", "a_", "e_", "h", "core", "agg", "edge", "host"]) + ] + nx.draw_networkx_edges(G, pos, edgelist=edgelist, alpha=0.05, width=0.4) + + plt.legend() + plt.axis("off") + plt.tight_layout() + if save_path: + plt.savefig(save_path, dpi=300) + + +def plot_dragonfly(G, save_path='net_dragonfly.png'): + """ + Draw a circular Dragonfly layout: groups in a large ring, + routers in small inner rings, hosts hanging around each router. + """ + import math + import matplotlib.pyplot as plt + import networkx as nx + + # Identify groups + groups = sorted({G.nodes[n]["group"] for n in G if "group" in G.nodes[n]}) + num_groups = len(groups) + + pos = {} + R_outer = 1.0 # radius of the outer ring (groups) + R_inner = 0.15 # radius of each group's internal ring + + # --- compute positions --- + for i, g in enumerate(groups): + # center of this group + theta_g = 2 * math.pi * i / num_groups + cx = R_outer * math.cos(theta_g) + cy = R_outer * math.sin(theta_g) + + routers = [n for n in G if n.startswith("r_") and G.nodes[n]["group"] == g] + hosts = [n for n in G if n.startswith("h_") and G.nodes[n]["group"] == g] + + # routers in small ring + for j, r in enumerate(routers): + theta_r = 2 * math.pi * j / len(routers) + x = cx + R_inner * math.cos(theta_r) + y = cy + R_inner * math.sin(theta_r) + pos[r] = (x, y) + + # hosts slightly further out around each router + for j, h in enumerate(hosts): + router = f"r_{g}_{j // 8}" if len(routers) > 0 else None + # angle toward router’s position if available + angle = 2 * math.pi * (j / len(hosts)) + r_off = R_inner + 0.05 + x = cx + r_off * math.cos(angle) + y = cy + r_off * math.sin(angle) + pos[h] = (x, y) + + # --- Draw figure --- + plt.figure(figsize=(10, 10)) + nx.draw_networkx_nodes(G, pos, + nodelist=[n for n in G if n.startswith("r_")], + node_color="orange", node_size=20, label="Routers", alpha=0.9) + nx.draw_networkx_nodes(G, pos, + nodelist=[n for n in G if n.startswith("h_")], + node_color="blue", node_size=8, label="Hosts", alpha=0.7) + + # intra-group edges light gray, inter-group black + intra = [(u, v) for (u, v) in G.edges if G.nodes[u]["group"] == G.nodes[v]["group"]] + inter = [(u, v) for (u, v) in G.edges if G.nodes[u]["group"] != G.nodes[v]["group"]] + nx.draw_networkx_edges(G, pos, edgelist=intra, alpha=0.1, width=0.3, edge_color="gray") + nx.draw_networkx_edges(G, pos, edgelist=inter, alpha=0.4, width=0.4, edge_color="black") + + plt.axis("off") + plt.legend() + plt.tight_layout() + if save_path: + plt.savefig(save_path, dpi=300) + + +def plot_torus2d(G, save_path="net_torus2d.png"): + import matplotlib.pyplot as plt + + routers = [n for n, d in G.nodes(data=True) if d["type"] == "router"] + hosts = [n for n, d in G.nodes(data=True) if d["type"] == "host"] + + fig, ax = plt.subplots(figsize=(8,8)) + + for u, v, d in G.edges(data=True): + if d.get("type") == "router_link": + x1, y1 = G.nodes[u]["x"], G.nodes[u]["y"] + x2, y2 = G.nodes[v]["x"], G.nodes[v]["y"] + ax.plot([x1, x2], [y1, y2], color="gray", alpha=0.1, linewidth=0.5) + + # flatten z by adding it to y or x offset + xs = [G.nodes[n]["x"] for n in routers] + ys = [G.nodes[n]["y"] + 0.05*G.nodes[n]["z"] for n in routers] + ax.scatter(xs, ys, c="orange", s=10, label="Routers", alpha=0.8) + + hx = [G.nodes[n]["x"] for n in hosts] + hy = [G.nodes[n]["y"] + 0.05*G.nodes[n]["z"] for n in hosts] + ax.scatter(hx, hy, c="blue", s=4, label="Hosts", alpha=0.5) + + ax.set_xlabel("X") + ax.set_ylabel("Y + (scaled Z)") + ax.legend() + if save_path: + plt.savefig(save_path, dpi=300) + + +def plot_torus3d(G, active_edges=None, max_edges=4000, save_path="net_torus3d.png"): + """ + Plot a 3D torus with routers, hosts, and optional job link highlights. + Args: + G : networkx.Graph + active_edges : list of (u,v) tuples for job links to highlight + max_edges : subsample edges to avoid clutter + """ + fig = plt.figure(figsize=(8, 8)) + ax = fig.add_subplot(111, projection="3d") + + # --- Separate routers and hosts --- + routers = [n for n, d in G.nodes(data=True) if d["type"] == "router"] + hosts = [n for n, d in G.nodes(data=True) if d["type"] == "host"] + + # --- Plot routers --- + xs, ys, zs = [G.nodes[n]["x"] for n in routers], [G.nodes[n]["y"] for n in routers], [G.nodes[n]["z"] for n in routers] + ax.scatter(xs, ys, zs, c="orange", s=6, label="Routers", alpha=0.8) + + # --- Plot hosts --- + hx, hy, hz = [G.nodes[n]["x"] for n in hosts], [G.nodes[n]["y"] for n in hosts], [G.nodes[n]["z"] for n in hosts] + ax.scatter(hx, hy, hz, c="dodgerblue", s=3, label="Hosts", alpha=0.6) + + # --- Draw router-to-router edges (subsampled) --- + all_router_edges = [(u, v) for u, v, d in G.edges(data=True) if d.get("type") == "router_link"] + if len(all_router_edges) > max_edges: + all_router_edges = random.sample(all_router_edges, max_edges) + for u, v in all_router_edges: + x1, y1, z1 = G.nodes[u]["x"], G.nodes[u]["y"], G.nodes[u]["z"] + x2, y2, z2 = G.nodes[v]["x"], G.nodes[v]["y"], G.nodes[v]["z"] + ax.plot([x1, x2], [y1, y2], [z1, z2], color="gray", alpha=0.05, linewidth=0.5) + + # --- Draw host links lightly --- + for u, v, d in G.edges(data=True): + if d.get("type") == "host_link": + x1, y1, z1 = G.nodes[u]["x"], G.nodes[u]["y"], G.nodes[u]["z"] + x2, y2, z2 = G.nodes[v]["x"], G.nodes[v]["y"], G.nodes[v]["z"] + ax.plot([x1, x2], [y1, y2], [z1, z2], color="lightblue", alpha=0.05, linewidth=0.3) + + # --- Overlay active job edges --- + if active_edges: + for u, v in active_edges: + if u in G.nodes and v in G.nodes: + x1, y1, z1 = G.nodes[u]["x"], G.nodes[u]["y"], G.nodes[u]["z"] + x2, y2, z2 = G.nodes[v]["x"], G.nodes[v]["y"], G.nodes[v]["z"] + ax.plot([x1, x2], [y1, y2], [z1, z2], color="red", linewidth=1.8, alpha=0.8) + + ax.set_xlabel("X") + ax.set_ylabel("Y") + ax.set_zlabel("Z") + ax.legend() + plt.tight_layout() + if save_path: + plt.savefig(save_path, dpi=300) + + + if __name__ == "__main__": plotter = Plotter() # plotter.plot_history([1, 2, 3, 4]) diff --git a/raps/run_sim.py b/raps/run_sim.py index aa2d9d951c491d383c90504abb2c78f648d1da44..db50465bfb738033fada924fcd0d55d09fe9b32c 100644 --- a/raps/run_sim.py +++ b/raps/run_sim.py @@ -53,7 +53,7 @@ def run_sim(sim_config: SingleSimConfig): out = sim_config.get_output() if out: - out.mkdir(parents=True) + out.mkdir(parents=True, exist_ok=True) engine.telemetry.save_snapshot( dest=str(out / 'snapshot.npz'), result=engine.get_workload_data(), @@ -137,6 +137,9 @@ def run_sim(sim_config: SingleSimConfig): else: print('Cooling model not enabled... skipping output of plot') + if 'net' in sim_config.plot: + engine.network_model.plot_topology(out) + if 'temp' in sim_config.plot: if engine.cooling_model: ylabel = 'Tr_pri_Out[1]' diff --git a/raps/sim_config.py b/raps/sim_config.py index a69c2884716d0199b7c10024ed6ee85223a4bab6..254859ad959b8ca2ed5ac97c95963f7bdd9d09c9 100644 --- a/raps/sim_config.py +++ b/raps/sim_config.py @@ -109,7 +109,7 @@ class SimConfig(RAPSBaseModel, abc.ABC): """ Enable verbose output """ layout: Literal["layout1", "layout2"] = "layout1" """ UI layout """ - plot: list[Literal["power", "loss", "pue", "temp", "util"]] | None = None + plot: list[Literal["power", "loss", "pue", "temp", "util", "net"]] | None = None """ Plots to generate """ imtype: Literal["png", "svg", "jpg", "pdf", "eps"] = "png" @@ -134,8 +134,9 @@ class SimConfig(RAPSBaseModel, abc.ABC): """ Grab data from live system. """ # Workload arguments (TODO split into separate model) - workload: Literal['random', 'benchmark', 'peak', 'idle', 'synthetic', - 'multitenant', 'replay', 'randomAI', 'calculon'] = "random" + workload: Literal['random', 'benchmark', 'peak', 'idle', 'synthetic', + 'multitenant', 'replay', 'randomAI', 'network_test', + 'inter_job_congestion', 'calculon'] = "random" """ Type of synthetic workload """ multimodal: list[float] = [1.0] @@ -327,7 +328,7 @@ class SimConfig(RAPSBaseModel, abc.ABC): if self.jobsize_is_power_of is not None and self.jobsize_is_of_degree is not None: raise ValueError("jobsize_is_power_of and jobsize_is_of_degree are mutually exclusive") - if self.plot and not self.output: + if self.plot and self.output == "none": raise ValueError("plot requires an output directory to be set") if self.live and not self.replay and self.time is None: diff --git a/raps/stats.py b/raps/stats.py index e5824a931fe80ec64ef157869c0c087f9e3873ae..7df6208a3c15f4bafcf5a3125307fe3294c1c471 100644 --- a/raps/stats.py +++ b/raps/stats.py @@ -140,6 +140,16 @@ def get_network_stats(engine: Engine): max_job_slow = 1.0 stats["max_per_job_slowdown"] = max_job_slow + if engine.net_congestion_history: + congestion_values = [c for t, c in engine.net_congestion_history] + stats['avg_inter_job_congestion'] = sum(congestion_values) / len(congestion_values) + stats['max_inter_job_congestion'] = max(congestion_values) + stats['min_inter_job_congestion'] = min(congestion_values) + else: + stats['avg_inter_job_congestion'] = 0.0 + stats['max_inter_job_congestion'] = 0.0 + stats['min_inter_job_congestion'] = 0.0 + return stats @@ -414,6 +424,9 @@ def print_formatted_report(engine_stats=None, "avg_network_util": "{:.2f}%", "avg_per_job_slowdown": "{:.2f}x", "max_per_job_slowdown": "{:.2f}x", + "avg_inter_job_congestion": "{:.2f}", + "max_inter_job_congestion": "{:.2f}", + "min_inter_job_congestion": "{:.2f}", }) diff --git a/raps/workloads/__init__.py b/raps/workloads/__init__.py index d61befc50078cee6654b40acf2f70eb5278b31ef..9bcb41a4a84d7ce423157894109ab78a77dcd224 100644 --- a/raps/workloads/__init__.py +++ b/raps/workloads/__init__.py @@ -15,6 +15,8 @@ from .constants import JOB_NAMES, ACCT_NAMES, MAX_PRIORITY from .distribution import DistributionWorkload from .live import continuous_job_generation from .multitenant import MultitenantWorkload +from .network import NetworkTestWorkload +from .inter_job_congestion import InterJobCongestionWorkload from .utils import plot_job_hist @@ -53,6 +55,8 @@ class Workload( DistributionWorkload, BasicWorkload, MultitenantWorkload, + NetworkTestWorkload, + InterJobCongestionWorkload, Calculon ): """Final workload class with all workload types.""" diff --git a/raps/workloads/inter_job_congestion.py b/raps/workloads/inter_job_congestion.py new file mode 100644 index 0000000000000000000000000000000000000000..3fd569d61d36f5714a32ff2deac3eacf4f9a7951 --- /dev/null +++ b/raps/workloads/inter_job_congestion.py @@ -0,0 +1,141 @@ +import math +import random +from typing import List, Tuple + +from raps.job import Job, job_dict +from raps.network import max_throughput_per_tick + +class InterJobCongestionWorkload: + """ Workload generator for inter-job congestion test """ + def inter_job_congestion(self, args) -> List[Job]: + legacy_cfg = self.config_map[self.partitions[0]] + topology = legacy_cfg.get("TOPOLOGY", "").lower() + return generate_jobs( + legacy_cfg=legacy_cfg, + topology=topology, + J=args.numjobs, + trace_quanta=legacy_cfg.get("TRACE_QUANTA", 20), + tx_fraction_per_job=getattr(args, 'txfrac', 0.35), # Assuming txfrac might be an arg + seed=args.seed + ) + + +def infer_group_params(legacy_cfg: dict, topology: str) -> Tuple[int, int, str]: + """ + Infer (hosts_per_group, total_groups, group_label) + depending on network topology. + """ + total_nodes = int(legacy_cfg["TOTAL_NODES"]) + + if topology == "fat-tree": + k = int(legacy_cfg.get("FATTREE_K", 32)) + H = k // 2 # hosts per ToR + R = math.ceil(total_nodes / H) + return H, R, "rack" + + elif topology == "dragonfly": + routers_per_group = int(legacy_cfg.get("ROUTERS_PER_GROUP", 8)) + nodes_per_router = int(legacy_cfg.get("NODES_PER_ROUTER", 4)) + H = routers_per_group * nodes_per_router + R = max(1, total_nodes // H) + return H, R, "group" + + elif topology == "torus3d": + dims = ( + int(legacy_cfg.get("TORUS_X", 12)), + int(legacy_cfg.get("TORUS_Y", 12)), + int(legacy_cfg.get("TORUS_Z", 12)), + ) + R = math.prod(dims) + return 1, R, "torus" + + else: + return 1, 1, "flat" + + +def pick_two_distinct_groups(R: int) -> Tuple[int, int]: + """Pick two distinct group indices (far apart if possible).""" + if R <= 2: + return (0, 1 if R > 1 else 0) + a = random.randrange(0, R // 2) + b = random.randrange(R // 2, R) + if a == b: + b = (b + 1) % R + return a, b + + +def nodes_in_group(group_idx: int, H: int, total_nodes: int, n: int) -> List[int]: + """Pick n contiguous nodes from a group.""" + start = group_idx * H + end = min(start + H, total_nodes) + n = min(n, end - start) + base = random.randrange(start, end - n + 1) if (end - start - n) > 0 else start + return list(range(base, base + n)) + + +def generate_jobs( + legacy_cfg: dict, + topology: str, + J: int = 60, + trace_quanta: int = 20, + tx_fraction_per_job: float = 0.35, + seed: int = 42 +) -> List[Job]: + """Generate synthetic jobs spanning and overlapping local groups.""" + random.seed(seed) + total_nodes = int(legacy_cfg["TOTAL_NODES"]) + H, R, label = infer_group_params(legacy_cfg, topology) + per_tick_bw = max_throughput_per_tick(legacy_cfg, trace_quanta) + per_dir = tx_fraction_per_job * per_tick_bw + + print(f"[INFO] topology={topology}, {label}s={R}, hosts_per_{label}={H}") + print(f"[INFO] total_nodes={total_nodes}, per-dir={per_dir:.2e} B/tick") + + jobs: List[Job] = [] + jid = 1 + + # Roughly 60% cross-group, 25% intra-group, 15% multi-group + n_cross = int(J * 0.6) + n_intra = int(J * 0.25) + n_multi = J - n_cross - n_intra + + for _ in range(n_cross): + a, b = pick_two_distinct_groups(R) + nodes = nodes_in_group(a, H, total_nodes, 1) + nodes_in_group(b, H, total_nodes, 1) + jobs.append(make_job(jid, nodes, per_dir, trace_quanta)) + jid += 1 + + for _ in range(n_intra): + g = random.randrange(0, R) + nodes = nodes_in_group(g, H, total_nodes, 2) + jobs.append(make_job(jid, nodes, per_dir, trace_quanta)) + jid += 1 + + for _ in range(n_multi): + a, b = pick_two_distinct_groups(R) + nodes = nodes_in_group(a, H, total_nodes, 2) + nodes_in_group(b, H, total_nodes, 2) + jobs.append(make_job(jid, nodes, per_dir, trace_quanta)) + jid += 1 + + print(f"[INFO] jobs={len(jobs)} (cross={n_cross}, intra={n_intra}, multi={n_multi})") + return jobs + + +def make_job(jid: int, nodes: List[int], per_dir: float, trace_quanta: int) -> Job: + """Helper: create one synthetic Job object.""" + trace_len = 900 // trace_quanta + return Job(job_dict( + id=jid, + name=f"job_{jid}", + account="test", + nodes_required=len(nodes), + scheduled_nodes=nodes, + cpu_trace=[0] * trace_len, + gpu_trace=[0] * trace_len, + ntx_trace=[per_dir] * trace_len, + nrx_trace=[per_dir] * trace_len, + trace_quanta=trace_quanta, + expected_run_time=900, + time_limit=1800, + end_state="COMPLETED" + )) diff --git a/raps/workloads/network.py b/raps/workloads/network.py new file mode 100644 index 0000000000000000000000000000000000000000..e5302c89efc6bb986576b65bfa385f39cbe77a53 --- /dev/null +++ b/raps/workloads/network.py @@ -0,0 +1,62 @@ + +from raps.job import Job, job_dict + + +class NetworkTestWorkload: + def network_test(self, **kwargs): + """ + Synthetic workload to test network congestion. + Generates several jobs with varying sizes and bandwidths, + including overlapping node assignments to induce interference. + """ + jobs = [] + trace_len = 180 # 15 minutes with 20s quanta + + # -------------------------------------------------------- + # Hard-coded configuration + # -------------------------------------------------------- + # Define per-job properties + bw = 1e10 + job_configs = [ + # (job_id, node_list, bandwidth_bytes_per_tick) + (1, [0, 1], bw), # 2-node job +# (2, [1, 2], bw), # Job 2 overlaps node 1 (causes congestion) + (2, [128, 129], bw), # Job 2 on a distant rack (no shared link) + (3, [256], bw), # isolated single-node job + (4, [512, 513, 514], 5e11), # multi-node but separate + (5, [1020], bw), # distant single-node job + ] + + runtime = 900 # seconds + time_limit = 1800 # seconds + trace_quanta = 20 # seconds + + # -------------------------------------------------------- + # Job creation loop + # -------------------------------------------------------- + for job_id, node_list, bw in job_configs: + job_info = job_dict( + id=job_id, + name=f"net_job_{job_id}", + account="test", + nodes_required=len(node_list), + scheduled_nodes=node_list, + cpu_trace=[1] * trace_len, + gpu_trace=[1] * trace_len, + ntx_trace=[bw] * trace_len, + nrx_trace=[bw] * trace_len, + submit_time=0, + start_time=0, + expected_run_time=runtime, + time_limit=time_limit, + end_state="COMPLETED", + trace_quanta=trace_quanta, + ) + jobs.append(Job(job_info)) + print(f"[DEBUG] Created net_job_{job_id} nodes={node_list} bw={bw:.2e}") + + print("\n[DEBUG] Requested node assignments:") + for job in jobs: + print(f" Job {job.id}: nodes_required={job.nodes_required}, scheduled_nodes={job.scheduled_nodes}") + + return jobs diff --git a/scripts/run_inter_job_congestion.py b/scripts/run_inter_job_congestion.py new file mode 100644 index 0000000000000000000000000000000000000000..9312fd43b97bb1362c91960aac96b406f7c835db --- /dev/null +++ b/scripts/run_inter_job_congestion.py @@ -0,0 +1,97 @@ +#!/usr/bin/env python3 +""" +RAPS Network Congestion Test (Inter-Job Interference) +====================================================== + +This script is a wrapper that uses the integrated `inter_job_congestion` +workload from the RAPS library to run a standalone network simulation. + +It evaluates inter-job network congestion by simulating multiple jobs +running concurrently on the same network and finding the total congestion +on the most loaded link. + +Usage: + python scripts/run_inter_job_congestion.py --config config/lassen.yaml + +Example: + python scripts/run_inter_job_congestion.py --config config/lassen.yaml --jobs 80 --txfrac 0.35 -v +""" + +from __future__ import annotations +import argparse +from pathlib import Path + +from raps.system_config import get_system_config +from raps.network import ( + NetworkModel, + simulate_inter_job_congestion, +) +from raps.workloads import Workload + + +def print_verbose_stats(stats): + print("\n--- Detailed Network Congestion Stats ---") + print(f" Max Congestion (Worst Link): {stats['max']:.2f}") + print(f" Mean Link Congestion: {stats['mean']:.2f}") + print(f" Min Link Congestion: {stats['min']:.2f}") + print(f" Std Dev of Congestion: {stats['std_dev']:.2f}") + print("\n Top 10 Most Congested Links:") + for (link, congestion) in stats['top_links']: + print(f" - Link {link}: {congestion:.2f}") + print("---------------------------------------") + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + +def main(): + parser = argparse.ArgumentParser(description="Standalone inter-job network congestion test for RAPS.") + parser.add_argument("--config", required=True, help="Path to system YAML (e.g., config/lassen.yaml)") + parser.add_argument("--jobs", type=int, default=60, help="Number of synthetic jobs") + parser.add_argument("--txfrac", type=float, default=0.35, help="Fraction of per-link bandwidth per job") + parser.add_argument("--debug", action="store_true", help="Enable network debug output") + parser.add_argument("--verbose", "-v", action="store_true", help="Print detailed statistics") + args = parser.parse_args() + + # --- Load config and detect topology --- + sys_cfg = get_system_config(args.config) + legacy = sys_cfg.get_legacy() + + topology = legacy.get("TOPOLOGY", "").lower() + if not topology: + raise ValueError(f"Could not infer topology from {args.config}. Found: {topology!r}") + + # --- Generate Jobs via Workload module --- + # The workload class expects specific attribute names, so we add them to the args object. + args.workload = 'inter_job_congestion' + args.numjobs = args.jobs + args.seed = 42 # Keep seed consistent for this test script + args.start = None + + workload_generator = Workload(args, legacy) + workload_data = workload_generator.generate_jobs() + jobs = workload_data.jobs + + print(f"[INFO] Detected topology: {topology}") + print(f"[INFO] Generated {len(jobs)} jobs for congestion test.") + + # --- Initialize network model --- + net = NetworkModel( + config=legacy, + available_nodes=list(range(legacy["TOTAL_NODES"])), + output_dir=Path(f"test-{Path(args.config).stem}"), + debug=args.debug, + ) + + # --- Simulate all jobs running concurrently --- + congestion_stats = simulate_inter_job_congestion(net, jobs, legacy, debug=args.debug) + + print(f"[RESULT] config={args.config}, topology={topology}, jobs={len(jobs)}, " + f"total_congestion={congestion_stats['max']:.2f}") + + if args.verbose: + print_verbose_stats(congestion_stats) + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/tests/unit/test_net_dragonfly.py b/tests/unit/test_net_dragonfly.py new file mode 100644 index 0000000000000000000000000000000000000000..a36afdc46e0cc6c38f127c4043bf0990f4383e12 --- /dev/null +++ b/tests/unit/test_net_dragonfly.py @@ -0,0 +1,38 @@ +import pytest +from raps.network.dragonfly import build_dragonfly, dragonfly_node_id_to_host_name + +def test_build_dragonfly(): + """Test building a small dragonfly network.""" + D, A, P = 2, 2, 2 + G = build_dragonfly(D, A, P) + + # Check number of nodes + num_routers = D * A + num_hosts = D * A * P + total_nodes = num_routers + num_hosts + assert len(G.nodes) == total_nodes + + # Check number of edges + # Intra-group edges (clique) + intra_group_edges = D * (A * (A - 1) // 2) + # Inter-group edges + inter_group_edges = A * (D * (D - 1) // 2) + # Host to router edges + host_router_edges = num_hosts + total_edges = intra_group_edges + inter_group_edges + host_router_edges + assert len(G.edges) == total_edges + + # Check node types + node_types = [data["type"] for _, data in G.nodes(data=True)] + assert node_types.count("router") == num_routers + assert node_types.count("host") == num_hosts + +def test_dragonfly_node_id_to_host_name(): + """Test the dragonfly_node_id_to_host_name function.""" + D, A, P = 2, 2, 2 + # Test a few node IDs + assert dragonfly_node_id_to_host_name(0, D, A, P) == "h_0_0_0" + assert dragonfly_node_id_to_host_name(1, D, A, P) == "h_0_0_1" + assert dragonfly_node_id_to_host_name(2, D, A, P) == "h_0_1_0" + assert dragonfly_node_id_to_host_name(3, D, A, P) == "h_0_1_1" + assert dragonfly_node_id_to_host_name(4, D, A, P) == "h_1_0_0" diff --git a/tests/unit/test_net_fat_tree.py b/tests/unit/test_net_fat_tree.py new file mode 100644 index 0000000000000000000000000000000000000000..93750e0187b5afb4410438f9389f95163fdc5cba --- /dev/null +++ b/tests/unit/test_net_fat_tree.py @@ -0,0 +1,42 @@ +import pytest +from raps.network.fat_tree import build_fattree, node_id_to_host_name + +def test_build_fattree_k4(): + """Test building a k=4 fat-tree.""" + k = 4 + G = build_fattree(k, 16) + + # Check number of nodes + num_hosts = k * (k // 2) * (k // 2) + num_edge_switches = k * (k // 2) + num_agg_switches = k * (k // 2) + num_core_switches = (k // 2) ** 2 + total_nodes = num_hosts + num_edge_switches + num_agg_switches + num_core_switches + assert len(G.nodes) == total_nodes + + # Check number of edges + # Host to edge switch edges + host_edges = num_hosts + # Edge to agg switch edges + edge_agg_edges = k * (k // 2) * (k // 2) + # Agg to core switch edges + agg_core_edges = k * (k // 2) * (k // 2) + total_edges = host_edges + edge_agg_edges + agg_core_edges + assert len(G.edges) == total_edges + + # Check node types + node_types = [data["type"] for _, data in G.nodes(data=True)] + assert node_types.count("host") == num_hosts + assert node_types.count("edge") == num_edge_switches + assert node_types.count("agg") == num_agg_switches + assert node_types.count("core") == num_core_switches + +def test_node_id_to_host_name(): + """Test the node_id_to_host_name function.""" + k = 4 + # Test a few node IDs + assert node_id_to_host_name(0, k) == "h_0_0_0" + assert node_id_to_host_name(1, k) == "h_0_0_1" + assert node_id_to_host_name(2, k) == "h_0_1_0" + assert node_id_to_host_name(3, k) == "h_0_1_1" + assert node_id_to_host_name(4, k) == "h_1_0_0" diff --git a/tests/unit/test_net_torus3d.py b/tests/unit/test_net_torus3d.py new file mode 100644 index 0000000000000000000000000000000000000000..b18cbfa2caec2d48e9e5efc1a2e6d492ae6de589 --- /dev/null +++ b/tests/unit/test_net_torus3d.py @@ -0,0 +1,41 @@ +import pytest +from raps.network.torus3d import build_torus3d, torus_route_xyz + +def test_build_torus3d(): + """Test building a small 3D torus network.""" + dims = (2, 2, 2) + G, meta = build_torus3d(dims) + + # Check number of nodes + num_routers = dims[0] * dims[1] * dims[2] + num_hosts = num_routers # hosts_per_router=1 + total_nodes = num_routers + num_hosts + assert len(G.nodes) == total_nodes + + # Check number of edges + # Router to router edges + router_edges = (num_routers * 3) // 2 # Each router has 3 neighbors in a 3D torus + # Host to router edges + host_router_edges = num_hosts + total_edges = router_edges + host_router_edges + assert len(G.edges) == total_edges + + # Check node types + node_types = [data["kind"] for _, data in G.nodes(data=True)] + assert node_types.count("router") == num_routers + assert node_types.count("host") == num_hosts + +def test_torus_route_xyz(): + """Test the torus_route_xyz function.""" + dims = (4, 4, 4) + # Test a simple route + path = torus_route_xyz("r_0_0_0", "r_1_1_1", dims) + assert path == ["r_0_0_0", "r_1_0_0", "r_1_1_0", "r_1_1_1"] + + # Test a route with wrap-around + path = torus_route_xyz("r_3_3_3", "r_0_0_0", dims, wrap=True) + assert path == ["r_3_3_3", "r_0_3_3", "r_0_0_3", "r_0_0_0"] + + # Test a route without wrap-around + path = torus_route_xyz("r_0_0_0", "r_1_1_1", dims, wrap=False) + assert path == ["r_0_0_0", "r_1_0_0", "r_1_1_0", "r_1_1_1"]