From 3a1ab7c82105d5b8dea1fc2259c2465d9bb23bba Mon Sep 17 00:00:00 2001 From: Wes Brewer Date: Fri, 22 Aug 2025 02:02:33 -0400 Subject: [PATCH 1/8] Remove many of the debug statements which make the option useless --- raps/engine.py | 20 -------------------- 1 file changed, 20 deletions(-) diff --git a/raps/engine.py b/raps/engine.py index 64bf218..d07b9c5 100644 --- a/raps/engine.py +++ b/raps/engine.py @@ -350,24 +350,16 @@ class Engine: Adds running jobs to the queue, and removes them from the jobs_to_submit jobs_to_submit still holds the jobs that need be submitted in the future. """ - if self.debug: - print(f"[DEBUG] add_running_jobs_to_queue: current_time={self.current_timestep}") # Build a list of jobs whose start_time is <= current_time. eligible_jobs = [job for job in jobs_to_submit if job.start_time is not None and job.start_time < self.current_timestep] - if self.debug: - print(f"[DEBUG] add_running_jobs_to_queue: Found {len(eligible_jobs)} eligible jobs.") # Remove those jobs from jobs_to_submit: jobs_to_submit[:] = [job for job in jobs_to_submit if job.start_time is None or job.start_time >= self.current_timestep] - if self.debug: - print(f"[DEBUG] add_running_jobs_to_queue: {len(jobs_to_submit)} jobs remaining in jobs_to_submit.") # Convert them to Job instances and build list of eligible jobs. self.queue += eligible_jobs - if self.debug: - print(f"[DEBUG] add_running_jobs_to_queue: self.queue now has {len(self.queue)} jobs.") def add_eligible_jobs_to_queue(self, jobs_to_submit: List): """ @@ -379,20 +371,12 @@ class Engine: - true if new jobs are present - false if no new jobs are present """ - if self.debug: - print(f"[DEBUG] add_eligible_jobs_to_queue: current_time={self.current_timestep}") # Build a list of jobs whose submit_time is <= current_time. eligible_jobs = [job for job in jobs_to_submit if job.submit_time <= self.current_timestep] - if self.debug: - print(f"[DEBUG] add_eligible_jobs_to_queue: Found {len(eligible_jobs)} eligible jobs.") # Remove those jobs from jobs_to_submit: jobs_to_submit[:] = [job for job in jobs_to_submit if job.submit_time > self.current_timestep] - if self.debug: - print(f"[DEBUG] add_eligible_jobs_to_queue: {len(jobs_to_submit)} jobs remaining in jobs_to_submit.") # Convert them to Job instances and build list of eligible jobs. self.queue += eligible_jobs - if self.debug: - print(f"[DEBUG] add_eligible_jobs_to_queue: self.queue now has {len(self.queue)} jobs.") if eligible_jobs != []: return True else: @@ -544,14 +528,10 @@ class Engine: net_utils = [] net_tx_list = [] net_rx_list = [] - if self.debug: - print(f"Current Time: {self.current_timestep}") slowdown_factors = [] for job in self.running: - if self.debug: - print(f"JobID: {job.id}") job.running_time = self.current_timestep - job.start_time -- GitLab From 3502e4e5a9a8376c80d4fd00e4beeaaee4d94a32 Mon Sep 17 00:00:00 2001 From: Matthias Maiterth Date: Tue, 2 Sep 2025 16:45:32 -0400 Subject: [PATCH 2/8] Fix issues with bluewaters.yaml - demonstrates slowdown now --- config/bluewaters.yaml | 6 ++-- experiments/bluewaters.yaml | 6 ++++ raps/dataloaders/bluewaters.py | 54 ++++++++++++++++++++-------------- raps/system_config.py | 4 +-- 4 files changed, 44 insertions(+), 26 deletions(-) create mode 100644 experiments/bluewaters.yaml diff --git a/config/bluewaters.yaml b/config/bluewaters.yaml index 90be71d..8c7ae1d 100644 --- a/config/bluewaters.yaml +++ b/config/bluewaters.yaml @@ -48,8 +48,10 @@ scheduler: TIMEOUT: 0.11 NODE_FAIL: 0.01 network: - topology: torus3d - network_max_bw: 9600000000.0 + #topology: torus3d + topology: capacity + #network_max_bw: 9.6E9 + network_max_bw: 1E7 torus_x: 24 torus_y: 24 torus_z: 24 diff --git a/experiments/bluewaters.yaml b/experiments/bluewaters.yaml new file mode 100644 index 0000000..ac35857 --- /dev/null +++ b/experiments/bluewaters.yaml @@ -0,0 +1,6 @@ +system: bluewaters +replay: + - /opt/data/bluewaters +start: "20170328" +simulate-network: True +filter: "traffic > 1e8" diff --git a/raps/dataloaders/bluewaters.py b/raps/dataloaders/bluewaters.py index 46fa462..55c3fa5 100644 --- a/raps/dataloaders/bluewaters.py +++ b/raps/dataloaders/bluewaters.py @@ -32,6 +32,7 @@ import math import re import pandas as pd from pathlib import Path +from pprint import pprint from raps.telemetry import Job, job_dict @@ -62,20 +63,10 @@ def build_sampler_df(root, day, nodes, tmin, tmax, tx_idx, rx_idx, chunksize=Non df = df[df["nid"].isin(nodes)] if df.empty: return None - # sort & compute deltas per node + # sort values (optional, for consistency) df = df.sort_values(["nid", "ts"]) - df["ts_prev"] = df.groupby("nid")["ts"].shift(1) - df["tx_prev"] = df.groupby("nid")["tx"].shift(1) - df["rx_prev"] = df.groupby("nid")["rx"].shift(1) - # positive deltas only - df["dtx"] = df["tx"] - df["tx_prev"] - df["drx"] = df["rx"] - df["rx_prev"] - df = df[(df["dtx"] > 0) | (df["drx"] > 0)] - if df.empty: - return None - # mid-interval timestamp for window inclusion - df["mid_ts"] = 0.5 * (df["ts"] + df["ts_prev"]) - df = df[["nid", "mid_ts", "dtx", "drx"]].dropna() + # keep raw values + df = df[["nid", "ts", "tx", "rx"]].dropna() return df for fp in files: @@ -180,6 +171,8 @@ def load_data(local_dataset_path, **kwargs): root = Path(local_dataset_path[0]) day = kwargs.get("start") fp = root / "torque_logs" / day + filter_str = kwargs.get("filter") + debug = kwargs.get("debug") jobs_raw = [] @@ -277,23 +270,30 @@ def load_data(local_dataset_path, **kwargs): # Filter by nodes, sum positive deltas dfj = sampler_df[sampler_df["nid"].isin(nodes)] - total_tx = int(dfj["dtx"].sum()) if not dfj.empty else 0 - total_rx = int(dfj["drx"].sum()) if not dfj.empty else 0 - # total_tx and total_rx are bytes per node + + # Print first 10 rows (node, tx, rx) + if debug: + print(dfj[["nid", "tx", "rx"]].head(10)) + + total_tx = int(dfj["tx"].sum()) if not dfj.empty else 0 + total_rx = int(dfj["rx"].sum()) if not dfj.empty else 0 nodes_required = r.get("nodes_required") + avg_tx_per_node = total_tx / nodes_required if nodes_required > 0 else 0 + avg_rx_per_node = total_rx / nodes_required if nodes_required > 0 else 0 + # Smear totals evenly across bins (simple first pass) duration = max(1, et_abs - st_abs) samples = max(1, math.ceil(duration / bin_s)) - ntx, nrx = throughput_traces(total_tx, total_rx, samples) # bytes per bin + ntx, nrx = throughput_traces(avg_tx_per_node, avg_rx_per_node, samples) job_d = job_dict( nodes_required=nodes_required, name=r.get("name"), account=r.get("account", "unknown"), - cpu_trace=0, - gpu_trace=0, + cpu_trace=[0]*samples, + gpu_trace=[0]*samples, nrx_trace=nrx, ntx_trace=ntx, end_state="UNKNOWN", @@ -312,7 +312,16 @@ def load_data(local_dataset_path, **kwargs): trace_quanta=bin_s, trace_missing_values=False, ) - jobs.append(Job(job_d)) + + if filter_str: + traffic = (avg_tx_per_node + avg_rx_per_node) / 2. + keep_jobs = eval(filter_str) + print(job_d["id"], filter_str, traffic, keep_jobs) + else: + keep_jobs = True + + if keep_jobs: + jobs.append(Job(job_d)) # Normalize times so first start = 0 t0 = min((j.start_time for j in jobs), default=0) @@ -324,8 +333,9 @@ def load_data(local_dataset_path, **kwargs): j.trace_start_time -= t0 j.trace_end_time -= t0 - # pprint(jobs) + if debug: + pprint(jobs) + simulation_start = 0 simulation_end = max((j.end_time for j in jobs), default=0) - return jobs, simulation_start, simulation_end diff --git a/raps/system_config.py b/raps/system_config.py index 642bb98..726c086 100644 --- a/raps/system_config.py +++ b/raps/system_config.py @@ -1,6 +1,6 @@ -import functools import glob import fnmatch +import functools from typing import Any, Literal from pathlib import Path from functools import cached_property @@ -141,7 +141,7 @@ class SystemCoolingConfig(BaseModel): class SystemNetworkConfig(BaseModel): - topology: Literal["fat-tree", "dragonfly", "torus3d"] + topology: Literal["capacity", "fat-tree", "dragonfly", "torus3d"] network_max_bw: float latency: float | None = None -- GitLab From 05887b947bd8e605bb3260a5fa2368871e79a673 Mon Sep 17 00:00:00 2001 From: Matthias Maiterth Date: Tue, 2 Sep 2025 16:47:24 -0400 Subject: [PATCH 3/8] Refactor network - create directory to separate out topologies --- config/bluewaters.yaml | 4 +- experiments/lassen.yaml | 2 +- raps/dataloaders/bluewaters.py | 10 +- raps/network.py | 498 --------------------------------- raps/network/__init__.py | 126 +++++++++ raps/network/base.py | 127 +++++++++ raps/network/dragonfly.py | 75 +++++ raps/network/fat_tree.py | 59 ++++ raps/network/torus3d.py | 152 ++++++++++ 9 files changed, 548 insertions(+), 505 deletions(-) delete mode 100644 raps/network.py create mode 100644 raps/network/__init__.py create mode 100644 raps/network/base.py create mode 100644 raps/network/dragonfly.py create mode 100644 raps/network/fat_tree.py create mode 100644 raps/network/torus3d.py diff --git a/config/bluewaters.yaml b/config/bluewaters.yaml index 8c7ae1d..989ca78 100644 --- a/config/bluewaters.yaml +++ b/config/bluewaters.yaml @@ -48,8 +48,8 @@ scheduler: TIMEOUT: 0.11 NODE_FAIL: 0.01 network: - #topology: torus3d - topology: capacity + topology: torus3d + #topology: capacity #network_max_bw: 9.6E9 network_max_bw: 1E7 torus_x: 24 diff --git a/experiments/lassen.yaml b/experiments/lassen.yaml index 5434a1b..7ee04be 100644 --- a/experiments/lassen.yaml +++ b/experiments/lassen.yaml @@ -1,6 +1,6 @@ system: lassen replay: - - ~/data/lassen/Lassen-Supercomputer-Job-Dataset + - /opt/data/lassen/Lassen-Supercomputer-Job-Dataset policy: fcfs backfill: firstfit fastforward: 365d diff --git a/raps/dataloaders/bluewaters.py b/raps/dataloaders/bluewaters.py index 55c3fa5..1658806 100644 --- a/raps/dataloaders/bluewaters.py +++ b/raps/dataloaders/bluewaters.py @@ -247,8 +247,10 @@ def load_data(local_dataset_path, **kwargs): global_tmax = max(abs_ends) # Confirm the correct 0-based indices for ipogif0_* from the HEADER - tx_idx = 15 # kwargs.get("sampler_tx_idx", 15) # placeholder; pass real index via kwargs - rx_idx = 16 # kwargs.get("sampler_rx_idx", 16) # placeholder; pass real index via kwargs + # tx_idx = 15 # for the original file + # rx_idx = 16 + tx_idx = 2 # for a downselected file with just four columns: [timestamp, node, tx, rx] - for faster loading + rx_idx = 3 # Build once (chunk if files are huge) sampler_df = build_sampler_df(root, day, all_nodes, global_tmin, global_tmax, tx_idx, rx_idx, chunksize=None) @@ -292,8 +294,8 @@ def load_data(local_dataset_path, **kwargs): nodes_required=nodes_required, name=r.get("name"), account=r.get("account", "unknown"), - cpu_trace=[0]*samples, - gpu_trace=[0]*samples, + cpu_trace=[0] * samples, + gpu_trace=[0] * samples, nrx_trace=nrx, ntx_trace=ntx, end_state="UNKNOWN", diff --git a/raps/network.py b/raps/network.py deleted file mode 100644 index b4340e4..0000000 --- a/raps/network.py +++ /dev/null @@ -1,498 +0,0 @@ -import csv -import networkx as nx -from itertools import combinations -from raps.utils import get_current_utilization -from pathlib import Path - - -class NetworkModel: - """ """ - - def __init__(self, *, available_nodes, config): - self.topology = config.get("TOPOLOGY") - # if fat-tree, build the graph once - if self.topology == "fat-tree": - print("building fat-tree graph...") - self.fattree_k = config.get("FATTREE_K") - self.net_graph = build_fattree(self.fattree_k) - print(self.net_graph) - elif self.topology == "torus3d": - print("building torus3d graph...") - dims = (int(config["TORUS_X"]), int(config["TORUS_Y"]), int(config["TORUS_Z"])) - wrap = bool(config.get("TORUS_WRAP", True)) - link_bw = float(config.get("TORUS_LINK_BW", config.get("NETWORK_MAX_BW"))) - hpr = int(config.get("HOSTS_PER_ROUTER")) - routing = config.get("TORUS_ROUTING", "DOR_XYZ").upper() - coords_csv = config.get("NODE_COORDS_CSV") # optional - self.net_graph, self.torus_meta = build_torus3d( - dims=dims, wrap=wrap, link_bw=link_bw, hosts_per_router=hpr, routing=routing, coords_csv=coords_csv - ) - elif self.topology == "dragonfly": - print("building dragonfly graph...") - D = config["DRAGONFLY_D"] # groups - A = config["DRAGONFLY_A"] # routers per group - P = config["DRAGONFLY_P"] # hosts per router - self.net_graph = build_dragonfly(D, A, P) - print(self.net_graph) - - real_ids = available_nodes - real_ids.sort() - self.real_to_fat_idx = {rid: idx for idx, rid in enumerate(real_ids)} - # e.g. real_to_fat_idx[10] = 0, real_to_fat_idx[11] = 1, etc., up to 791 → 791 - self.max_link_bw = config.get("NETWORK_MAX_BW") - - def simulate_network_utilization(self, *, job, debug=False): - net_util = 0 - net_cong = 0 - net_tx = 0 - net_rx = 0 - # self.config.get('TRACE_QUANTA') # Why? What should this be? - max_throughput = self.max_link_bw * job.trace_quanta - - if job.nodes_required <= 1: - # single node, no network utilization or congestion. - pass - else: - - net_tx = get_current_utilization(job.ntx_trace, job) # Are these % or actual bytes? - net_rx = get_current_utilization(job.nrx_trace, job) - net_util = network_utilization(net_tx, net_rx, max_throughput) - - # Congestion depends on topology: - if self.topology == "fat-tree": - # Map integers to hostnames - host_list = [node_id_to_host_name(n, self.fattree_k) for n in job.scheduled_nodes] - loads = link_loads_for_job(self.net_graph, host_list, net_tx) # ? Only tx not rx or total net_util) - net_cong = worst_link_util(loads, max_throughput) - - if debug: - print(" fat-tree hosts:", host_list) - - elif self.topology == "dragonfly": - D = self.config["DRAGONFLY_D"] - A = self.config["DRAGONFLY_A"] - P = self.config["DRAGONFLY_P"] - - host_list = [] - for real_n in job.scheduled_nodes: - fat_idx = self.real_to_fat_idx[real_n] # contiguous in [0..(D*A*P−1)] - host_list.append(dragonfly_node_id_to_host_name(fat_idx, D, A, P)) - if debug: - print(" dragonfly hosts:", host_list) - # if len(host_list) <= 1: - # net_cong = 0.0 - # else: - loads = link_loads_for_job(self.net_graph, host_list, net_tx) # ? Only tx not rx or total net_util) - net_cong = worst_link_util(loads, max_throughput) - - else: # capacity model: simple α+β or normalized overload - net_cong = network_congestion(net_tx, net_rx, max_throughput) - - return net_util, net_cong, net_tx, net_rx, max_throughput - - -def apply_job_slowdown(*, job, max_throughput, net_util, net_cong, net_tx, net_rx, debug: bool = False): - # Get the maximum allowed bandwidth from the configuration. - if net_cong > 1: - if debug: - print(f"congested net_cong: {net_cong}, max_throughput: {max_throughput}") - print(f"length of {len(job.gpu_trace)} before dilation") - throughput = net_tx + net_rx - slowdown_factor = network_slowdown(throughput, max_throughput) - - if debug: - print("***", hasattr(job, "dilated"), throughput, max_throughput, slowdown_factor) - - # Only apply slowdown once per job to avoid compounding the effect. - if not job.dilated: - if debug: - print(f"Applying slowdown factor {slowdown_factor:.2f} to job {job.id} due to network congestion") - job.apply_dilation(slowdown_factor) - job.dilated = True - if debug: - print(f"length of {len(job.gpu_trace)} after dilation") - else: - slowdown_factor = 1 - job.slowdown_factor = slowdown_factor - - return slowdown_factor - - -def compute_system_network_stats(net_utils, net_tx_list, net_rx_list, slowdown_factors): - - # Compute network averages - n = len(net_utils) or 1 - avg_tx = sum(net_tx_list) / n - avg_rx = sum(net_rx_list) / n - avg_net = sum(net_utils) / n - # avg_slowdown_per_job = sum(slowdown_factors) / n - # self.avg_slowdown_history.append(avg_slowdown_per_job) - # max_slowdown_per_job = max(slowdown_factors) - # self.max_slowdown_history.append(max_slowdown_per_job) - - return avg_tx, avg_rx, avg_net - - -def network_congestion(tx, rx, max_throughput): - """ - Overload factor ≥0: average of send/recv NOT clamped. - >1.0 means you’re pushing above capacity. - """ - tx_util = float(tx) / max_throughput - rx_util = float(rx) / max_throughput - return (tx_util + rx_util) / 2.0 - - -def network_utilization(tx, rx, max_throughput): - """ - True utilization in [0,1]: average of send/recv clamped to 100%. - """ - tx_u = min(float(tx) / max_throughput, 1.0) - rx_u = min(float(rx) / max_throughput, 1.0) - return (tx_u + rx_u) / 2.0 - - -def network_slowdown(current_throughput, max_throughput): - """ - Calculate a slowdown factor based on current network bandwidth usage. - - If current_bw is within limits, the factor is 1.0 (no slowdown). - If current_bw exceeds max_bw, the factor is current_bw/max_bw. - """ - if current_throughput <= max_throughput: - return 1.0 - else: - return current_throughput / max_throughput - - -def build_fattree(k): - """ - Build a k-ary fat-tree: - - k pods - - each pod has k/2 edge switches, k/2 agg switches - - core layer has (k/2)^2 core switches - - each edge switch connects to k/2 hosts - Returns a NetworkX Graph where: - - hosts are named "h_{pod}_{edge}_{i}" - - edge switches "e_{pod}_{edge}" - - agg switches "a_{pod}_{agg}" - - core switches "c_{i}_{j}" - """ - G = nx.Graph() - # core - # num_core = (k//2)**2 # Unused! - for i in range(k // 2): - for j in range(k // 2): - core = f"c_{i}_{j}" - G.add_node(core, type="core") - # pods - for pod in range(k): - # agg switches - for agg in range(k // 2): - a = f"a_{pod}_{agg}" - G.add_node(a, type="agg") - # connect to all core switches in column agg - for i in range(k // 2): - core = f"c_{agg}_{i}" - G.add_edge(a, core) - # edge switches + hosts - for edge in range(k // 2): - e = f"e_{pod}_{edge}" - G.add_node(e, type="edge") - # connect edge→each agg in this pod - for agg in range(k // 2): - a = f"a_{pod}_{agg}" - G.add_edge(e, a) - # connect hosts - for h in range(k // 2): - host = f"h_{pod}_{edge}_{h}" - G.add_node(host, type="host") - G.add_edge(e, host) - return G - - -def all_to_all_paths(G, hosts): - """ - Given a list of host names, return shortest‐paths for every unordered pair. - """ - paths = [] - for i in range(len(hosts)): - for j in range(i + 1, len(hosts)): - src, dst = hosts[i], hosts[j] - p = nx.shortest_path(G, src, dst) - paths.append((src, dst, p)) - return paths - - -def link_loads_for_job(G, job_hosts, tx_volume_bytes): - """ - Distribute tx_volume_bytes from each host equally to all its peers; - accumulate per-link loads and return a dict {(u,v):bytes, …}. - """ - paths = all_to_all_paths(G, job_hosts) - loads = {edge: 0.0 for edge in G.edges()} - # each host sends tx_volume_bytes to each of the (N-1) peers - for src in job_hosts: - if len(job_hosts) >= 2: - per_peer = tx_volume_bytes / (len(job_hosts) - 1) - else: - per_peer = 0 - # find paths where src is the sender - for s, d, p in paths: - if s != src: - continue - # add per_peer to every link on p - for u, v in zip(p, p[1:]): - # ensure ordering matches loads keys - edge = (u, v) if (u, v) in loads else (v, u) - loads[edge] += per_peer - return loads - - -def worst_link_util(loads, throughput): - """ - Given loads in **bytes** and capacity in **bits/sec**, convert: - util = (bytes * 8) / throughput - Return the maximum util over all links. - """ - max_util = 0.0 - for edge, byte_load in loads.items(): - util = (byte_load * 8) / throughput - if util > max_util: - max_util = util - return max_util - - -def node_id_to_host_name(node_id: int, k: int) -> str: - """ - Map a 0-based integer node_id into one of the fat-tree hosts "h_{pod}_{edge}_{h}". - There are (k^3/4) total hosts, assigned in ascending order across pod → edge → h. - """ - hosts_per_pod = (k // 2) * (k // 2) # e.g. for k=8, hosts_per_pod = 16 - pod = node_id // hosts_per_pod - offset = node_id % hosts_per_pod - edge = offset // (k // 2) - idx = offset % (k // 2) - return f"h_{pod}_{edge}_{idx}" - - -def build_dragonfly(D: int, A: int, P: int) -> nx.Graph: - """ - Build a “simple” k-ary Dragonfly with: - D = # of groups - A = # of routers per group - P = # of hosts (endpoints) per router - - Naming convention: - - Router nodes: "r_{g}_{r}" with g ∈ [0..D−1], r ∈ [0..A−1] - - Host nodes: "h_{g}_{r}_{p}" with p ∈ [0..P−1] - - Topology: - 1. All routers within a group form a full clique. - 2. Each router r in group g has exactly one “global link” to router r in each other group. - 3. Each router r in group g attaches to P hosts ("h_{g}_{r}_{0..P−1}"). - """ - G = nx.Graph() - - # 1) Create all router nodes - for g in range(D): - for r in range(A): - router = f"r_{g}_{r}" - G.add_node(router, type="router", group=g, index=r) - - # 2) Intra‐group full mesh of routers - for g in range(D): - routers_in_group = [f"r_{g}_{r}" for r in range(A)] - for u, v in combinations(routers_in_group, 2): - G.add_edge(u, v) - - # 3) Inter‐group “one‐to‐one” global links - # (router index r in group g → router index r in group g2) - for g1 in range(D): - for g2 in range(g1 + 1, D): - for r in range(A): - u = f"r_{g1}_{r}" - v = f"r_{g2}_{r}" - G.add_edge(u, v) - - # 4) Attach hosts to each router - for g in range(D): - for r in range(A): - router = f"r_{g}_{r}" - for p in range(P): - host = f"h_{g}_{r}_{p}" - G.add_node(host, type="host", group=g, router=r, index=p) - G.add_edge(router, host) - - return G - - -def dragonfly_node_id_to_host_name(fat_idx: int, D: int, A: int, P: int) -> str: - """ - Given a contiguous fat‐index ∈ [0..(D*A*P − 1)], return "h_{g}_{r}_{p}". - Hosts are laid out in order: - 0..(P−1) → group=0, router=0, p=0..P−1 - P..2P−1 → group=0, router=1, p=0..P−1 - … - (A*P)..(2A*P−1) → group=1, router=0, … - In general: - host_offset = fat_idx % P - router_offset = (fat_idx // P) % A - group = fat_idx // (A*P) - """ - total_hosts = D * A * P - assert 0 <= fat_idx < total_hosts, "fat_idx out of range" - - host_offset = fat_idx % P - router_group = (fat_idx // P) % A - pod = fat_idx // (A * P) - return f"h_{pod}_{router_group}_{host_offset}" - - -def build_torus3d(dims, wrap=True, link_bw=1e9, hosts_per_router=1, routing="DOR_XYZ", coords_csv=None): - """ - Build a 3D torus at router granularity, then attach host nodes to routers. - Node ids in the returned graph are host names ("h_x_y_z_i") and router names ("r_x_y_z"). - Edges have attribute 'capacity' (bytes/s) and 'latency' (per hop). - """ - X, Y, Z = map(int, dims) - G = nx.Graph() - - # Routers - def rname(x, y, z): - return f"r_{x}_{y}_{z}" - - for x in range(X): - for y in range(Y): - for z in range(Z): - G.add_node(rname(x, y, z), kind="router", coord=(x, y, z)) - - # Toroidal links between routers (±x, ±y, ±z) - def wrapi(i, n): - return (i + n) % n if wrap else (None if i < 0 or i >= n else i) - - for x in range(X): - for y in range(Y): - for z in range(Z): - u = rname(x, y, z) - # x+ - nxp = wrapi(x + 1, X) - v = rname(nxp, y, z) if nxp is not None else None - if v and not G.has_edge(u, v): - G.add_edge(u, v, capacity=link_bw) - # y+ - nyp = wrapi(y + 1, Y) - v = rname(x, nyp, z) if nyp is not None else None - if v and not G.has_edge(u, v): - G.add_edge(u, v, capacity=link_bw) - # z+ - nzp = wrapi(z + 1, Z) - v = rname(x, y, nzp) if nzp is not None else None - if v and not G.has_edge(u, v): - G.add_edge(u, v, capacity=link_bw) - - # Attach hosts to routers - host_to_router = {} - router_to_hosts = {} - - def hname(x, y, z, i): - return f"h_{x}_{y}_{z}_{i}" - - # If a nid→(x,y,z) CSV is supplied, place accordingly; else dense round-robin - # CSV format: nid,x,y,z[,i] - nid_placement = {} - if coords_csv: - p = Path(coords_csv) - with p.open("rt") as fh: - rd = csv.reader(fh) - for row in rd: - if not row: - continue - nid = int(row[0]) - x, y, z = map(int, row[1:4]) - i = int(row[4]) if len(row) > 4 else 0 - nid_placement[nid] = (x, y, z, i) - - # Build hosts - for x in range(X): - for y in range(Y): - for z in range(Z): - r = rname(x, y, z) - router_to_hosts[r] = [] - for i in range(hosts_per_router): - h = hname(x, y, z, i) - G.add_node(h, kind="host", coord=(x, y, z), local_index=i) - G.add_edge(h, r, capacity=link_bw) # host↔router edge; you can cap with NETWORK_MAX_BW instead - host_to_router[h] = r - router_to_hosts[r].append(h) - - meta = { - "dims": (X, Y, Z), - "wrap": wrap, - "routing": routing, - "host_to_router": host_to_router, - "router_to_hosts": router_to_hosts, - } - return G, meta - - -def _axis_steps(a, b, n, wrap=True): - """Return minimal step sequence along one axis from a to b with wrap-around.""" - if a == b: - return [] - fwd = (b - a) % n - back = (a - b) % n - if not wrap: - step = 1 if b > a else -1 - return [step] * abs(b - a) - if fwd <= back: - return [1] * fwd - else: - return [-1] * back - - -def torus_route_xyz(src_r, dst_r, dims, wrap=True): - """Router-level path (list of router names) using XYZ dimension-order routing.""" - X, Y, Z = dims - - def parse(r): - _, x, y, z = r.split("_") - return int(x), int(y), int(z) - - x1, y1, z1 = parse(src_r) - x2, y2, z2 = parse(dst_r) - - path = [src_r] - x, y, z = x1, y1, z1 - for step in _axis_steps(x, x2, X, wrap): - x = (x + step) % X - path.append(f"r_{x}_{y}_{z}") - for step in _axis_steps(y, y2, Y, wrap): - y = (y + step) % Y - path.append(f"r_{x}_{y}_{z}") - for step in _axis_steps(z, z2, Z, wrap): - z = (z + step) % Z - path.append(f"r_{x}_{y}_{z}") - return path - - -def torus_host_path(G, meta, h_src, h_dst): - r_src = meta["host_to_router"][h_src] - r_dst = meta["host_to_router"][h_dst] - routers = torus_route_xyz(r_src, r_dst, meta["dims"], meta["wrap"]) - # host->src_router + (router path) + dst_router->host - path = [h_src, r_src] + routers[1:] + [h_dst] - return path - - -def link_loads_for_job_torus(G, meta, host_list, traffic_bytes): - # all-to-all between hosts in host_list, route via torus_host_path, add traffic_bytes per pair - loads = {} - n = len(host_list) - for i in range(n): - for j in range(i + 1, n): - p = torus_host_path(G, meta, host_list[i], host_list[j]) - for u, v in zip(p, p[1:]): - e = tuple(sorted((u, v))) - loads[e] = loads.get(e, 0) + traffic_bytes - return loads diff --git a/raps/network/__init__.py b/raps/network/__init__.py new file mode 100644 index 0000000..eb49ee6 --- /dev/null +++ b/raps/network/__init__.py @@ -0,0 +1,126 @@ +from .base import ( + all_to_all_paths, + apply_job_slowdown, + compute_system_network_stats, + link_loads_for_job, + network_congestion, + network_slowdown, + network_utilization, + worst_link_util, +) + +from .fat_tree import build_fattree, node_id_to_host_name +from .torus3d import build_torus3d, link_loads_for_job_torus +from .dragonfly import build_dragonfly, dragonfly_node_id_to_host_name +from raps.utils import get_current_utilization + +__all__ = [ + "NetworkModel", + "apply_job_slowdown", + "compute_system_network_stats", + "network_congestion", + "network_utilization", + "network_slowdown", + "all_to_all_paths", + "link_loads_for_job", + "worst_link_util", + "build_fattree", + "build_torus3d", + "build_dragonfly", + "dragonfly_node_id_to_host_name", +] + + +class NetworkModel: + def __init__(self, *, available_nodes, config, **kwargs): + self.config = config + self.topology = config.get("TOPOLOGY") + self.max_link_bw = config.get("NETWORK_MAX_BW", 1e9) # default safeguard + self.real_to_fat_idx = kwargs.get("real_to_fat_idx", {}) + + if self.topology == "fat-tree": + self.fattree_k = config.get("FATTREE_K") + self.net_graph = build_fattree(self.fattree_k) + + elif self.topology == "torus3d": + dims = ( + int(config["TORUS_X"]), + int(config["TORUS_Y"]), + int(config["TORUS_Z"]) + ) + wrap = bool(config.get("TORUS_WRAP", True)) + hosts_per_router = int(config.get("HOSTS_PER_ROUTER", config.get("hosts_per_router", 1))) + + # Build the graph and metadata + self.net_graph, self.meta = build_torus3d(dims, wrap, hosts_per_router=hosts_per_router) + + # Deterministic numeric → host mapping + X, Y, Z = self.meta["dims"] + self.id_to_host = {} + nid = 0 + for x in range(X): + for y in range(Y): + for z in range(Z): + for i in range(hosts_per_router): + h = f"h_{x}_{y}_{z}_{i}" + self.id_to_host[nid] = h + nid += 1 + + elif self.topology == "dragonfly": + self.net_graph = build_dragonfly( + int(config["DRAGONFLY_D"]), + int(config["DRAGONFLY_A"]), + int(config.get("DRAGONFLY_P", 1)) + ) + + elif self.topology == "capacity": + # Capacity-only model: no explicit graph + self.net_graph = None + + else: + raise ValueError(f"Unsupported topology: {self.topology}") + + def simulate_network_utilization(self, *, job, debug=False): + net_util = net_cong = net_tx = net_rx = 0 + max_throughput = self.max_link_bw * job.trace_quanta + + if job.nodes_required <= 1: + # Single node job, skip network impact + return net_util, net_cong, net_tx, net_rx, max_throughput + + net_tx = get_current_utilization(job.ntx_trace, job) + net_rx = get_current_utilization(job.nrx_trace, job) + net_util = network_utilization(net_tx, net_rx, max_throughput) + + if self.topology == "fat-tree": + host_list = [node_id_to_host_name(n, self.fattree_k) for n in job.scheduled_nodes] + loads = link_loads_for_job(self.net_graph, host_list, net_tx) + net_cong = worst_link_util(loads, max_throughput) + if debug: + print(" fat-tree hosts:", host_list) + + elif self.topology == "dragonfly": + D, A, P = self.config["DRAGONFLY_D"], self.config["DRAGONFLY_A"], self.config["DRAGONFLY_P"] + host_list = [ + dragonfly_node_id_to_host_name(self.real_to_fat_idx[real_n], D, A, P) + for real_n in job.scheduled_nodes + ] + if debug: + print(" dragonfly hosts:", host_list) + loads = link_loads_for_job(self.net_graph, host_list, net_tx) + net_cong = worst_link_util(loads, max_throughput) + + elif self.topology == "torus3d": + host_list = [self.id_to_host[n] for n in job.scheduled_nodes] + loads = link_loads_for_job_torus(self.net_graph, self.meta, host_list, net_tx) + net_cong = worst_link_util(loads, max_throughput) + if debug: + print(" torus3d hosts:", host_list) + + elif self.topology == "capacity": + net_cong = network_congestion(net_tx, net_rx, max_throughput) + + else: + raise ValueError(f"Unsupported topology: {self.topology}") + + return net_util, net_cong, net_tx, net_rx, max_throughput diff --git a/raps/network/base.py b/raps/network/base.py new file mode 100644 index 0000000..e89bb7c --- /dev/null +++ b/raps/network/base.py @@ -0,0 +1,127 @@ +import networkx as nx + + +def apply_job_slowdown(*, job, max_throughput, net_util, net_cong, net_tx, net_rx, debug: bool = False): + # Get the maximum allowed bandwidth from the configuration. + if net_cong > 1: + if debug: + print(f"congested net_cong: {net_cong}, max_throughput: {max_throughput}") + print(f"length of {len(job.gpu_trace)} before dilation") + throughput = net_tx + net_rx + slowdown_factor = network_slowdown(throughput, max_throughput) + + if debug: + print("***", hasattr(job, "dilated"), throughput, max_throughput, slowdown_factor) + + # Only apply slowdown once per job to avoid compounding the effect. + if not job.dilated: + if debug: + print(f"Applying slowdown factor {slowdown_factor:.2f} to job {job.id} due to network congestion") + job.apply_dilation(slowdown_factor) + job.dilated = True + if debug: + print(f"length of {len(job.gpu_trace)} after dilation") + else: + slowdown_factor = 1 + job.slowdown_factor = slowdown_factor + + return slowdown_factor + + +def compute_system_network_stats(net_utils, net_tx_list, net_rx_list, slowdown_factors): + + # Compute network averages + n = len(net_utils) or 1 + avg_tx = sum(net_tx_list) / n + avg_rx = sum(net_rx_list) / n + avg_net = sum(net_utils) / n + # avg_slowdown_per_job = sum(slowdown_factors) / n + # self.avg_slowdown_history.append(avg_slowdown_per_job) + # max_slowdown_per_job = max(slowdown_factors) + # self.max_slowdown_history.append(max_slowdown_per_job) + + return avg_tx, avg_rx, avg_net + + +def network_congestion(tx, rx, max_throughput): + """ + Overload factor ≥0: average of send/recv NOT clamped. + >1.0 means you’re pushing above capacity. + """ + tx_util = float(tx) / max_throughput + rx_util = float(rx) / max_throughput + return (tx_util + rx_util) / 2.0 + + +def network_utilization(tx, rx, max_throughput): + """ + True utilization in [0,1]: average of send/recv clamped to 100%. + """ + tx_u = min(float(tx) / max_throughput, 1.0) + rx_u = min(float(rx) / max_throughput, 1.0) + return (tx_u + rx_u) / 2.0 + + +def network_slowdown(current_throughput, max_throughput): + """ + Calculate a slowdown factor based on current network bandwidth usage. + + If current_bw is within limits, the factor is 1.0 (no slowdown). + If current_bw exceeds max_bw, the factor is current_bw/max_bw. + """ + if current_throughput <= max_throughput: + return 1.0 + else: + return current_throughput / max_throughput + + +def all_to_all_paths(G, hosts): + """ + Given a list of host names, return shortest‐paths for every unordered pair. + """ + paths = [] + for i in range(len(hosts)): + for j in range(i + 1, len(hosts)): + src, dst = hosts[i], hosts[j] + p = nx.shortest_path(G, src, dst) + paths.append((src, dst, p)) + return paths + + +def link_loads_for_job(G, job_hosts, tx_volume_bytes): + """ + Distribute tx_volume_bytes from each host equally to all its peers; + accumulate per-link loads and return a dict {(u,v):bytes, …}. + """ + paths = all_to_all_paths(G, job_hosts) + loads = {edge: 0.0 for edge in G.edges()} + # each host sends tx_volume_bytes to each of the (N-1) peers + for src in job_hosts: + if len(job_hosts) >= 2: + per_peer = tx_volume_bytes / (len(job_hosts) - 1) + else: + per_peer = 0 + # find paths where src is the sender + for s, d, p in paths: + if s != src: + continue + # add per_peer to every link on p + for u, v in zip(p, p[1:]): + # ensure ordering matches loads keys + edge = (u, v) if (u, v) in loads else (v, u) + loads[edge] += per_peer + return loads + + +def worst_link_util(loads, throughput): + """ + Given loads in **bytes** and capacity in **bits/sec**, convert: + util = (bytes * 8) / throughput + Return the maximum util over all links. + """ + max_util = 0.0 + for edge, byte_load in loads.items(): + util = (byte_load * 8) / throughput + if util > max_util: + max_util = util + return max_util diff --git a/raps/network/dragonfly.py b/raps/network/dragonfly.py new file mode 100644 index 0000000..a13d1dc --- /dev/null +++ b/raps/network/dragonfly.py @@ -0,0 +1,75 @@ +import networkx as nx +from itertools import combinations + + +def build_dragonfly(D: int, A: int, P: int) -> nx.Graph: + """ + Build a “simple” k-ary Dragonfly with: + D = # of groups + A = # of routers per group + P = # of hosts (endpoints) per router + + Naming convention: + - Router nodes: "r_{g}_{r}" with g ∈ [0..D−1], r ∈ [0..A−1] + - Host nodes: "h_{g}_{r}_{p}" with p ∈ [0..P−1] + + Topology: + 1. All routers within a group form a full clique. + 2. Each router r in group g has exactly one “global link” to router r in each other group. + 3. Each router r in group g attaches to P hosts ("h_{g}_{r}_{0..P−1}"). + """ + G = nx.Graph() + + # 1) Create all router nodes + for g in range(D): + for r in range(A): + router = f"r_{g}_{r}" + G.add_node(router, type="router", group=g, index=r) + + # 2) Intra‐group full mesh of routers + for g in range(D): + routers_in_group = [f"r_{g}_{r}" for r in range(A)] + for u, v in combinations(routers_in_group, 2): + G.add_edge(u, v) + + # 3) Inter‐group “one‐to‐one” global links + # (router index r in group g → router index r in group g2) + for g1 in range(D): + for g2 in range(g1 + 1, D): + for r in range(A): + u = f"r_{g1}_{r}" + v = f"r_{g2}_{r}" + G.add_edge(u, v) + + # 4) Attach hosts to each router + for g in range(D): + for r in range(A): + router = f"r_{g}_{r}" + for p in range(P): + host = f"h_{g}_{r}_{p}" + G.add_node(host, type="host", group=g, router=r, index=p) + G.add_edge(router, host) + + return G + + +def dragonfly_node_id_to_host_name(fat_idx: int, D: int, A: int, P: int) -> str: + """ + Given a contiguous fat‐index ∈ [0..(D*A*P − 1)], return "h_{g}_{r}_{p}". + Hosts are laid out in order: + 0..(P−1) → group=0, router=0, p=0..P−1 + P..2P−1 → group=0, router=1, p=0..P−1 + … + (A*P)..(2A*P−1) → group=1, router=0, … + In general: + host_offset = fat_idx % P + router_offset = (fat_idx // P) % A + group = fat_idx // (A*P) + """ + total_hosts = D * A * P + assert 0 <= fat_idx < total_hosts, "fat_idx out of range" + + host_offset = fat_idx % P + router_group = (fat_idx // P) % A + pod = fat_idx // (A * P) + return f"h_{pod}_{router_group}_{host_offset}" diff --git a/raps/network/fat_tree.py b/raps/network/fat_tree.py new file mode 100644 index 0000000..2d27b39 --- /dev/null +++ b/raps/network/fat_tree.py @@ -0,0 +1,59 @@ +import networkx as nx + + +def node_id_to_host_name(node_id: int, k: int) -> str: + """ + Convert an integer node id to the host name string in the fat-tree. + Node IDs are assumed to be contiguous, mapping to h_{pod}_{edge}_{i}. + """ + # need to match the scheme from build_fattree + pod = node_id // (k * k // 4) + edge = (node_id % (k * k // 4)) // (k // 2) + host = node_id % (k // 2) + return f"h_{pod}_{edge}_{host}" + + +def build_fattree(k): + """ + Build a k-ary fat-tree: + - k pods + - each pod has k/2 edge switches, k/2 agg switches + - core layer has (k/2)^2 core switches + - each edge switch connects to k/2 hosts + Returns a NetworkX Graph where: + - hosts are named "h_{pod}_{edge}_{i}" + - edge switches "e_{pod}_{edge}" + - agg switches "a_{pod}_{agg}" + - core switches "c_{i}_{j}" + """ + G = nx.Graph() + # core + # num_core = (k//2)**2 # Unused! + for i in range(k // 2): + for j in range(k // 2): + core = f"c_{i}_{j}" + G.add_node(core, type="core") + # pods + for pod in range(k): + # agg switches + for agg in range(k // 2): + a = f"a_{pod}_{agg}" + G.add_node(a, type="agg") + # connect to all core switches in column agg + for i in range(k // 2): + core = f"c_{agg}_{i}" + G.add_edge(a, core) + # edge switches + hosts + for edge in range(k // 2): + e = f"e_{pod}_{edge}" + G.add_node(e, type="edge") + # connect edge→each agg in this pod + for agg in range(k // 2): + a = f"a_{pod}_{agg}" + G.add_edge(e, a) + # connect hosts + for h in range(k // 2): + host = f"h_{pod}_{edge}_{h}" + G.add_node(host, type="host") + G.add_edge(e, host) + return G diff --git a/raps/network/torus3d.py b/raps/network/torus3d.py new file mode 100644 index 0000000..50c988f --- /dev/null +++ b/raps/network/torus3d.py @@ -0,0 +1,152 @@ +import csv +import networkx as nx +from pathlib import Path + + +def build_torus3d(dims, wrap=True, link_bw=1e9, hosts_per_router=1, routing="DOR_XYZ", coords_csv=None): + """ + Build a 3D torus at router granularity, then attach host nodes to routers. + Node ids in the returned graph are host names ("h_x_y_z_i") and router names ("r_x_y_z"). + Edges have attribute 'capacity' (bytes/s) and 'latency' (per hop). + """ + X, Y, Z = map(int, dims) + G = nx.Graph() + + # Routers + def rname(x, y, z): + return f"r_{x}_{y}_{z}" + + for x in range(X): + for y in range(Y): + for z in range(Z): + G.add_node(rname(x, y, z), kind="router", coord=(x, y, z)) + + # Toroidal links between routers (±x, ±y, ±z) + def wrapi(i, n): + return (i + n) % n if wrap else (None if i < 0 or i >= n else i) + + for x in range(X): + for y in range(Y): + for z in range(Z): + u = rname(x, y, z) + # x+ + nxp = wrapi(x + 1, X) + v = rname(nxp, y, z) if nxp is not None else None + if v and not G.has_edge(u, v): + G.add_edge(u, v, capacity=link_bw) + # y+ + nyp = wrapi(y + 1, Y) + v = rname(x, nyp, z) if nyp is not None else None + if v and not G.has_edge(u, v): + G.add_edge(u, v, capacity=link_bw) + # z+ + nzp = wrapi(z + 1, Z) + v = rname(x, y, nzp) if nzp is not None else None + if v and not G.has_edge(u, v): + G.add_edge(u, v, capacity=link_bw) + + # Attach hosts to routers + host_to_router = {} + router_to_hosts = {} + + def hname(x, y, z, i): + return f"h_{x}_{y}_{z}_{i}" + + # If a nid→(x,y,z) CSV is supplied, place accordingly; else dense round-robin + # CSV format: nid,x,y,z[,i] + nid_placement = {} + if coords_csv: + p = Path(coords_csv) + with p.open("rt") as fh: + rd = csv.reader(fh) + for row in rd: + if not row: + continue + nid = int(row[0]) + x, y, z = map(int, row[1:4]) + i = int(row[4]) if len(row) > 4 else 0 + nid_placement[nid] = (x, y, z, i) + + # Build hosts + for x in range(X): + for y in range(Y): + for z in range(Z): + r = rname(x, y, z) + router_to_hosts[r] = [] + for i in range(hosts_per_router): + h = hname(x, y, z, i) + G.add_node(h, kind="host", coord=(x, y, z), local_index=i) + G.add_edge(h, r, capacity=link_bw) # host↔router edge; you can cap with NETWORK_MAX_BW instead + host_to_router[h] = r + router_to_hosts[r].append(h) + + meta = { + "dims": (X, Y, Z), + "wrap": wrap, + "routing": routing, + "host_to_router": host_to_router, + "router_to_hosts": router_to_hosts, + } + return G, meta + + +def _axis_steps(a, b, n, wrap=True): + """Return minimal step sequence along one axis from a to b with wrap-around.""" + if a == b: + return [] + fwd = (b - a) % n + back = (a - b) % n + if not wrap: + step = 1 if b > a else -1 + return [step] * abs(b - a) + if fwd <= back: + return [1] * fwd + else: + return [-1] * back + + +def torus_route_xyz(src_r, dst_r, dims, wrap=True): + """Router-level path (list of router names) using XYZ dimension-order routing.""" + X, Y, Z = dims + + def parse(r): + _, x, y, z = r.split("_") + return int(x), int(y), int(z) + + x1, y1, z1 = parse(src_r) + x2, y2, z2 = parse(dst_r) + + path = [src_r] + x, y, z = x1, y1, z1 + for step in _axis_steps(x, x2, X, wrap): + x = (x + step) % X + path.append(f"r_{x}_{y}_{z}") + for step in _axis_steps(y, y2, Y, wrap): + y = (y + step) % Y + path.append(f"r_{x}_{y}_{z}") + for step in _axis_steps(z, z2, Z, wrap): + z = (z + step) % Z + path.append(f"r_{x}_{y}_{z}") + return path + + +def torus_host_path(G, meta, h_src, h_dst): + r_src = meta["host_to_router"][h_src] + r_dst = meta["host_to_router"][h_dst] + routers = torus_route_xyz(r_src, r_dst, meta["dims"], meta["wrap"]) + # host->src_router + (router path) + dst_router->host + path = [h_src, r_src] + routers[1:] + [h_dst] + return path + + +def link_loads_for_job_torus(G, meta, host_list, traffic_bytes): + # all-to-all between hosts in host_list, route via torus_host_path, add traffic_bytes per pair + loads = {} + n = len(host_list) + for i in range(n): + for j in range(i + 1, n): + p = torus_host_path(G, meta, host_list[i], host_list[j]) + for u, v in zip(p, p[1:]): + e = tuple(sorted((u, v))) + loads[e] = loads.get(e, 0) + traffic_bytes + return loads -- GitLab From b0f1586b00f63983fe7f7986569e7c1a92ae546a Mon Sep 17 00:00:00 2001 From: Wes Brewer Date: Fri, 22 Aug 2025 16:20:55 -0400 Subject: [PATCH 4/8] Add some instructions on how to downsize cray_system_sampler data files --- raps/dataloaders/bluewaters.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/raps/dataloaders/bluewaters.py b/raps/dataloaders/bluewaters.py index 1658806..ffee8e6 100644 --- a/raps/dataloaders/bluewaters.py +++ b/raps/dataloaders/bluewaters.py @@ -17,7 +17,17 @@ To download the necessary datasets: 2. /node_metrics/cray_system_sampler - we are using the file 20170328.tgz (485MB) - Another dataset we plan to use (but not currently): + In order to speed up data loading, we have downsized these files to just + four columns using the following code: + + import csv + with open("20170328", "r") as infile, open("output.csv", "w", newline="") as outfile: + reader = csv.reader(infile, skipinitialspace=True) + writer = csv.writer(outfile) + for row in reader: + writer.writerow([row[0], row[1], row[15], row[16]]) + + Another dataset we plan to use (but not currently using) is: 3. Monet - Blue Waters Network Dataset (140GB) - https://databank.illinois.edu/datasets/IDB-2921318 -- GitLab From 3ee4c435f26a31898b2970d2270e491689b7cdde Mon Sep 17 00:00:00 2001 From: Wes Brewer Date: Tue, 26 Aug 2025 10:29:26 -0400 Subject: [PATCH 5/8] Fix bug in dilate_trace when cpu_trace or gpu_trace is a scalar --- raps/dataloaders/bluewaters.py | 4 +-- raps/job.py | 47 ++++++++++++++++++++++++---------- 2 files changed, 35 insertions(+), 16 deletions(-) diff --git a/raps/dataloaders/bluewaters.py b/raps/dataloaders/bluewaters.py index ffee8e6..e26c51f 100644 --- a/raps/dataloaders/bluewaters.py +++ b/raps/dataloaders/bluewaters.py @@ -304,8 +304,8 @@ def load_data(local_dataset_path, **kwargs): nodes_required=nodes_required, name=r.get("name"), account=r.get("account", "unknown"), - cpu_trace=[0] * samples, - gpu_trace=[0] * samples, + cpu_trace=0, + gpu_trace=0, nrx_trace=nrx, ntx_trace=ntx, end_state="UNKNOWN", diff --git a/raps/job.py b/raps/job.py index ecb7fd3..05c455e 100644 --- a/raps/job.py +++ b/raps/job.py @@ -103,26 +103,45 @@ def dilate_trace(trace, factor): Scale a trace in the time dimension by the given factor. Parameters: - - trace (list of float): the original trace values. - - factor (float): the dilation factor; >1 to slow down (stretch) and <1 to speed up (compress). + - trace: list/tuple/np.ndarray of floats OR a single numeric scalar. + - factor (float): >1 to slow down (stretch in time), <1 to speed up. Returns: - - list of float: the dilated trace. + - list of float for sequence inputs, or numeric for scalar inputs. """ - if trace is None or (isinstance(trace, (list, np.ndarray)) and len(trace) == 0): + if trace is None: return trace - # Traces can be list/np.array or single float values. - # In case of a single float, we adjust the value directly as it is applied to each timestep - if isinstance(trace, (np.float64, float)): - return trace / factor # Single value - original_length = len(trace) - # Compute the new length (rounding to the nearest integer) - new_length = int(np.round(original_length * factor)) - # Create arrays for the old and new indices + + if factor is None: + raise ValueError("factor must be provided") + if factor == 0: + raise ValueError("factor must be non-zero") + + # Treat any numeric scalar (int/float/np.number) as a scalar trace + if isinstance(trace, (int, float, np.integer, np.floating, np.number)): + # Keep total "area" the same when stretching/compressing in time: + return trace / factor + + # Handle common sequence types directly + if isinstance(trace, (list, tuple, np.ndarray)): + arr = np.asarray(trace, dtype=float) + else: + # Last-resort: try coercion (e.g., pandas Series) + arr = np.asarray(trace, dtype=float) + + if arr.size == 0: + # empty sequence: nothing to do + return [] if not isinstance(trace, np.ndarray) else arr + + original_length = arr.size + # at least 1 sample after dilation + new_length = max(1, int(np.round(original_length * float(factor)))) + + # If original_length == 1, interpolation just repeats the value old_indices = np.linspace(0, original_length - 1, num=original_length) new_indices = np.linspace(0, original_length - 1, num=new_length) - # Use linear interpolation to compute the new trace values - new_trace = np.interp(new_indices, old_indices, trace).tolist() + + new_trace = np.interp(new_indices, old_indices, arr).tolist() return new_trace -- GitLab From 29488d911f4f1139268d64754b5be9e4f3cd60c8 Mon Sep 17 00:00:00 2001 From: Wes Brewer Date: Tue, 26 Aug 2025 14:51:54 -0400 Subject: [PATCH 6/8] Update path to mit_supercloud dataset, so experiment works on DT workstation --- experiments/mit.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/experiments/mit.yaml b/experiments/mit.yaml index bc718e4..77815d4 100644 --- a/experiments/mit.yaml +++ b/experiments/mit.yaml @@ -1,6 +1,6 @@ system: mit_supercloud partitions: ["mit_supercloud/part-cpu", "mit_supercloud/part-gpu"] replay: - - ~/data/mit/202201 + - /opt/data/mit_supercloud start: 2021-05-21T13:00 end: 2021-05-21T14:00 -- GitLab From 3b82f176c1b43bd27ab7164b60661eadf1a4b892 Mon Sep 17 00:00:00 2001 From: Wes Brewer Date: Thu, 4 Sep 2025 15:01:10 -0400 Subject: [PATCH 7/8] Update the bluewaters implementation to Jesse's pydantic refactor --- experiments/bluewaters.yaml | 2 +- raps/network/base.py | 13 +++++++++++-- raps/sim_config.py | 3 +++ 3 files changed, 15 insertions(+), 3 deletions(-) diff --git a/experiments/bluewaters.yaml b/experiments/bluewaters.yaml index ac35857..80ab129 100644 --- a/experiments/bluewaters.yaml +++ b/experiments/bluewaters.yaml @@ -2,5 +2,5 @@ system: bluewaters replay: - /opt/data/bluewaters start: "20170328" -simulate-network: True +simulate_network: True filter: "traffic > 1e8" diff --git a/raps/network/base.py b/raps/network/base.py index e89bb7c..f14c523 100644 --- a/raps/network/base.py +++ b/raps/network/base.py @@ -1,12 +1,21 @@ import networkx as nx +def debug_print_trace(job, label: str = ""): + """Print either the length (if iterable) or the value of job.gpu_trace.""" + if hasattr(job.gpu_trace, "__len__"): + print(f"length of {len(job.gpu_trace)} {label}") + else: + print(f"gpu_trace value {job.gpu_trace} {label}") + + def apply_job_slowdown(*, job, max_throughput, net_util, net_cong, net_tx, net_rx, debug: bool = False): # Get the maximum allowed bandwidth from the configuration. if net_cong > 1: if debug: print(f"congested net_cong: {net_cong}, max_throughput: {max_throughput}") - print(f"length of {len(job.gpu_trace)} before dilation") + debug_print_trace(job, "before dilation") + throughput = net_tx + net_rx slowdown_factor = network_slowdown(throughput, max_throughput) @@ -20,7 +29,7 @@ def apply_job_slowdown(*, job, max_throughput, net_util, net_cong, net_tx, net_r job.apply_dilation(slowdown_factor) job.dilated = True if debug: - print(f"length of {len(job.gpu_trace)} after dilation") + debug_print_trace(job, "after dilation") else: slowdown_factor = 1 job.slowdown_factor = slowdown_factor diff --git a/raps/sim_config.py b/raps/sim_config.py index 26a328a..f003a3e 100644 --- a/raps/sim_config.py +++ b/raps/sim_config.py @@ -202,6 +202,9 @@ class SimConfig(BaseModel): maxqueue: int = 50 """ Specify the max queue length for continuous job generation """ + filter: str | None = None + """job filter \"traffic > 1e8\" """ + @model_validator(mode="before") def _validate_before(cls, data): # This is called with the raw input, before Pydantic parses it, so data is just a dict and -- GitLab From 89fdf2615178f8e47053c5ff264fd75b7510bc25 Mon Sep 17 00:00:00 2001 From: Wes Brewer Date: Thu, 4 Sep 2025 16:35:23 -0400 Subject: [PATCH 8/8] Add some documentation for pytest to README.md --- README.md | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/README.md b/README.md index a561278..eeb5a5d 100644 --- a/README.md +++ b/README.md @@ -169,6 +169,33 @@ See instructions in [server/README.md](https://code.ornl.gov/exadigit/simulation See instructions in [dashboard/README.md](https://code.ornl.gov/exadigit/simulation-dashboard) +## Running Tests + +RAPS uses [pytest](https://docs.pytest.org/) for its test suite. +Before running tests, ensure that you have a valid data directory available (e.g., `/opt/data`) and set the environment variable `RAPS_DATA_DIR` to point to it. + +### Run all tests +```bash +RAPS_DATA_DIR=/opt/data pytest -n auto -x +``` + +By default, tests are parallelized with `pytest-xdist` (`-n auto`) to speed up execution. +The `-x` flag stops execution after the first failure. Add `-v` to run in verbose mode. + +### Run only network-related tests + +```bash +RAPS_DATA_DIR=/opt/data pytest -n auto -x -m network +``` + +See `pytest.ini` for the different options for `-m`. + +### Run a specific test file + +```bash +RAPS_DATA_DIR=/opt/data pytest tests/systems/test_engine.py +``` + ### Contributing Code Install pre-commit hooks as set by the project: -- GitLab