Commit 73bbe28e authored by Brewer, Wes's avatar Brewer, Wes
Browse files

Add support for inter-job congestion synthetic simulations both outside

(scripts/run_interjob_congestion.py) and within RAPS using -w inter_job_congestion
parent 054fd6f3
Loading
Loading
Loading
Loading
+4 −0
Original line number Diff line number Diff line
@@ -93,6 +93,10 @@ To simulate synthetic network tests:

    raps run --system lassen -w network_test --net -t 15m

Run network congestion tests outside of RAPS:

    python scripts/run_inter_job_congestion.py --config config/lassen.yaml -v

## Snapshot of extracted workload data

To reduce the expense of extracting the needed data from the telemetry parquet files,
+11 −1
Original line number Diff line number Diff line
@@ -30,7 +30,8 @@ from raps.power import (
from raps.network import (
    NetworkModel,
    apply_job_slowdown,
    compute_system_network_stats
    compute_system_network_stats,
    simulate_inter_job_congestion
)
from raps.telemetry import Telemetry
from raps.cooling import ThermoFluidsModel
@@ -292,6 +293,7 @@ class Engine:
        self.avg_net_tx = []
        self.avg_net_rx = []
        self.net_util_history = []
        self.net_congestion_history = []
        self.avg_slowdown_history = []
        self.max_slowdown_history = []
        self.node_occupancy_history = []
@@ -619,6 +621,14 @@ class Engine:
        system_util = self.num_active_nodes / self.config['AVAILABLE_NODES'] * 100
        self.record_util_stats(system_util=system_util)

        # --- Inter-Job Network Congestion ---
        if self.simulate_network and self.network_model and self.running:
            total_congestion = simulate_inter_job_congestion(
                self.network_model, self.running, self.config, self.debug
            )
            self.net_congestion_history.append((self.current_timestep, total_congestion))
        # ---

        # System Power
        if self.power_manager:  # Power is always simulated
            power_df, rack_power, total_power_kw, total_loss_kw, jobs_power = \
+6 −0
Original line number Diff line number Diff line
@@ -9,6 +9,9 @@ from .base import (
    network_slowdown,
    network_utilization,
    worst_link_util,
    get_link_util_stats,
    simulate_inter_job_congestion,
    max_throughput_per_tick,
)

from .fat_tree import build_fattree, node_id_to_host_name, subsample_hosts
@@ -32,6 +35,9 @@ __all__ = [
    "build_torus3d",
    "build_dragonfly",
    "dragonfly_node_id_to_host_name",
    "simulate_inter_job_congestion",
    "max_throughput_per_tick",
    "get_link_util_stats",
]


+83 −1
Original line number Diff line number Diff line
import networkx as nx

import numpy as np
from raps.utils import get_current_utilization
from raps.network.fat_tree import node_id_to_host_name
from raps.network.torus3d import link_loads_for_job_torus, torus_host_from_real_index

def debug_print_trace(job, label: str = ""):
    """Print either the length (if iterable) or the value of job.gpu_trace."""
@@ -134,3 +137,82 @@ def worst_link_util(loads, throughput):
        if util > max_util:
            max_util = util
    return max_util

def get_link_util_stats(loads, throughput, top_n=10):
    """
    Calculates a distribution of link utilization stats.
    Returns a dictionary with min, mean, max, std_dev, and top N congested links.
    """
    if not loads:
        return {'max': 0, 'mean': 0, 'min': 0, 'std_dev': 0, 'top_links': []}

    # Calculate utilization for every link
    utilizations = {(edge): (byte_load * 8) / throughput for edge, byte_load in loads.items()}
    
    util_values = list(utilizations.values())
    
    stats = {
        'max': np.max(util_values),
        'mean': np.mean(util_values),
        'min': np.min(util_values),
        'std_dev': np.std(util_values)
    }

    # Get top N congested links
    sorted_links = sorted(utilizations.items(), key=lambda item: item[1], reverse=True)
    stats['top_links'] = sorted_links[:top_n]
    
    return stats

def max_throughput_per_tick(legacy_cfg: dict, trace_quanta: int) -> float:
    """Return bytes-per-tick throughput of a single link."""
    bw = legacy_cfg.get("NETWORK_MAX_BW") or 12.5e9
    return float(bw) * trace_quanta

def simulate_inter_job_congestion(network_model, jobs, legacy_cfg, debug=False):
    """
    Simulates network congestion from a list of concurrently running jobs.
    """
    if not network_model.net_graph:
        print("[WARN] Network graph is not defined. Skipping congestion simulation.")
        return 0.0

    total_loads = {tuple(sorted(edge)): 0.0 for edge in network_model.net_graph.edges()}
    trace_quanta = jobs[0].trace_quanta if jobs else 0

    for job in jobs:
        # Assuming job.running_time is 0 for this static simulation
        job.running_time = 0
        job.trace_start_time = 0
        net_tx = get_current_utilization(job.ntx_trace, job)

        job_loads = {}
        if network_model.topology in ("fat-tree", "dragonfly"):
            if network_model.topology == "fat-tree":
                k = int(legacy_cfg.get("FATTREE_K", 32))
                host_list = [node_id_to_host_name(n, k) for n in job.scheduled_nodes]
            else:  # dragonfly
                host_list = [network_model.real_to_fat_idx[real_n] for real_n in job.scheduled_nodes]
            
            job_loads = link_loads_for_job(network_model.net_graph, host_list, net_tx)

        elif network_model.topology == "torus3d":
            X = int(legacy_cfg.get("TORUS_X", 12))
            Y = int(legacy_cfg.get("TORUS_Y", 12))
            Z = int(legacy_cfg.get("TORUS_Z", 12))
            hosts_per_router = int(legacy_cfg.get("HOSTS_PER_ROUTER", 1))
            host_list = [
                torus_host_from_real_index(n, X, Y, Z, hosts_per_router)
                for n in job.scheduled_nodes
            ]
            job_loads = link_loads_for_job_torus(network_model.net_graph, network_model.meta, host_list, net_tx)

        for edge, load in job_loads.items():
            edge_key = tuple(sorted(edge))
            if edge_key in total_loads:
                total_loads[edge_key] += load

    max_throughput = max_throughput_per_tick(legacy_cfg, trace_quanta)
    net_stats = get_link_util_stats(total_loads, max_throughput)
    
    return net_stats
+1 −1
Original line number Diff line number Diff line
@@ -135,7 +135,7 @@ class SimConfig(RAPSBaseModel, abc.ABC):

    # Workload arguments (TODO split into separate model)
    workload: Literal['random', 'benchmark', 'peak', 'idle', 'synthetic',
                      'multitenant', 'replay', 'randomAI', 'network_test'] = "random"
                      'multitenant', 'replay', 'randomAI', 'network_test', 'inter_job_congestion'] = "random"

    """ Type of synthetic workload """
    multimodal: list[float] = [1.0]
Loading