Commit 9c291d01 authored by Maiterth, Matthias's avatar Maiterth, Matthias
Browse files

Merge branch 'net-dev' into 'develop'

Add support for network model verification testing, plotting, and inter-job congestion studies

See merge request !130
parents 91c2da82 f70a0931
Loading
Loading
Loading
Loading
+15 −5
Original line number Diff line number Diff line
@@ -29,7 +29,7 @@ Note: Requires python3.12 or greater.

    # Frontier
    DATEDIR="date=2024-01-18"
    DPATH=~/data/frontier-sample-2024-01-18
    DPATH=/opt/data/frontier
    raps run -f $DPATH/slurm/joblive/$DATEDIR,$DPATH/jobprofile/$DATEDIR

## Open Telemetry dataset
@@ -37,7 +37,7 @@ Note: Requires python3.12 or greater.
For Marconi supercomputer, download `job_table.parquet` from https://zenodo.org/records/10127767

    # Marconi100
    raps run --system marconi100 -f ~/data/marconi100/job_table.parquet
    raps run --system marconi100 -f /opt/data/marconi100/job_table.parquet

For Adastra MI250 supercomputer, download 'AdastaJobsMI250_15days.parquet' from https://zenodo.org/records/14007065

@@ -46,10 +46,10 @@ For Adastra MI250 supercomputer, download 'AdastaJobsMI250_15days.parquet' from

For Google cluster trace v2

    raps run --system gcloudv2 -f ~/data/gcloud/v2/google_cluster_data_2011_sample --start '2011-05-02T00:10:00Z'
    raps run --system gcloudv2 -f /opt/data/gcloud/v2/google_cluster_data_2011_sample --start '2011-05-02T00:10:00Z'

    # analyze dataset
    raps telemetry --system gcloudv2 -f ~/data/gcloud/v2/google_cluster_data_2011_sample -v
    raps telemetry --system gcloudv2 -f /opt/data/gcloud/v2/google_cluster_data_2011_sample -v

For MIT Supercloud

@@ -95,7 +95,17 @@ For Lumi
Lassen is one of the few datasets that has networking data. See `raps/dataloaders/lassen.py` for how to
get the datasets. To run a network simulation, use the following command:

    raps run -f ~/data/lassen/Lassen-Supercomputer-Job-Dataset --system lassen --policy fcfs --backfill firstfit --start '2019-08-22T00:00:00+00:00' -t 12h --arrival poisson --net 
    raps run -f /opt/data/lassen/Lassen-Supercomputer-Job-Dataset --system lassen --policy fcfs --backfill firstfit --start '2019-08-22T00:00:00+00:00' -t 12h --arrival poisson --net

To simulate synthetic network tests:

    raps run --system lassen -w network_test --net -t 15m

    raps run --system lassen -w inter_job_congestion --net -t 15m

Run network congestion tests outside of RAPS:

    python scripts/run_inter_job_congestion.py --config config/lassen.yaml -v

## Snapshot of extracted workload data

+8 −2
Original line number Diff line number Diff line
@@ -120,9 +120,15 @@ cooling:
  w_cts_key: "simulator[1].centralEnergyPlant[1].coolingTowerLoop[1].summary.W_flow_CT_kW"
network:
  topology: fat-tree
  network_max_bw: 1000000000.0
  fattree_k: 16
  network_max_bw: 12.5e9
  fattree_k: 32
  dragonfly_d: 11
  dragonfly_a: 9
  dragonfly_p: 8
  latency: 1
  torus_x: 17
  torus_y: 17
  torus_z: 8
  torus_wrap: true
  hosts_per_router: 2
  torus_routing: DOR_XYZ
+16 −2
Original line number Diff line number Diff line
@@ -30,7 +30,8 @@ from raps.power import (
from raps.network import (
    NetworkModel,
    apply_job_slowdown,
    compute_system_network_stats
    compute_system_network_stats,
    simulate_inter_job_congestion
)
from raps.telemetry import Telemetry
from raps.cooling import ThermoFluidsModel
@@ -292,6 +293,7 @@ class Engine:
        self.avg_net_tx = []
        self.avg_net_rx = []
        self.net_util_history = []
        self.net_congestion_history = []
        self.avg_slowdown_history = []
        self.max_slowdown_history = []
        self.node_occupancy_history = []
@@ -328,7 +330,7 @@ class Engine:
            available_nodes = self.resource_manager.available_nodes
            self.network_model = NetworkModel(
                available_nodes=available_nodes,
                config=self.config,
                config=self.config
            )
        else:
            self.network_model = None
@@ -621,6 +623,18 @@ class Engine:
        system_util = self.num_active_nodes / self.config['AVAILABLE_NODES'] * 100
        self.record_util_stats(system_util=system_util)

        # --- Inter-Job Network Congestion ---
        if self.simulate_network and self.network_model and self.running:
            congestion_stats = simulate_inter_job_congestion(
                self.network_model, self.running, self.config, self.debug
            )
            if isinstance(congestion_stats, dict):
                total_congestion = congestion_stats['mean']
            else:
                total_congestion = congestion_stats
            self.net_congestion_history.append((self.current_timestep, total_congestion))
        # ---

        # System Power
        if self.power_manager:  # Power is always simulated
            power_df, rack_power, total_power_kw, total_loss_kw, jobs_power = \
+70 −15
Original line number Diff line number Diff line
import os
import warnings

from .base import (
    all_to_all_paths,
    apply_job_slowdown,
@@ -7,11 +10,16 @@ from .base import (
    network_slowdown,
    network_utilization,
    worst_link_util,
    get_link_util_stats,
    simulate_inter_job_congestion,
    max_throughput_per_tick,
)

from .fat_tree import build_fattree, node_id_to_host_name
from .torus3d import build_torus3d, link_loads_for_job_torus
from .dragonfly import build_dragonfly, dragonfly_node_id_to_host_name
from .fat_tree import build_fattree, node_id_to_host_name, subsample_hosts
from .torus3d import build_torus3d, link_loads_for_job_torus, torus_host_from_real_index
from .dragonfly import build_dragonfly, dragonfly_node_id_to_host_name, build_dragonfly_idx_map
from raps.plotting import plot_fattree_hierarchy, plot_dragonfly, plot_torus2d, plot_torus3d

from raps.utils import get_current_utilization

__all__ = [
@@ -28,6 +36,9 @@ __all__ = [
    "build_torus3d",
    "build_dragonfly",
    "dragonfly_node_id_to_host_name",
    "simulate_inter_job_congestion",
    "max_throughput_per_tick",
    "get_link_util_stats",
]


@@ -39,8 +50,11 @@ class NetworkModel:
        self.real_to_fat_idx = kwargs.get("real_to_fat_idx", {})

        if self.topology == "fat-tree":
            total_nodes = config['TOTAL_NODES'] - len(config['DOWN_NODES'])
            self.fattree_k = config.get("FATTREE_K")
            self.net_graph = build_fattree(self.fattree_k)
            self.net_graph = build_fattree(self.fattree_k, total_nodes)
            # TODO: future testing of subsampling feature
            #self.net_graph = subsample_hosts(self.net_graph, num_hosts=4626)

        elif self.topology == "torus3d":
            dims = (
@@ -67,11 +81,22 @@ class NetworkModel:
                            nid += 1

        elif self.topology == "dragonfly":
            self.net_graph = build_dragonfly(
                int(config["DRAGONFLY_D"]),
                int(config["DRAGONFLY_A"]),
                int(config.get("DRAGONFLY_P", 1))
            )
            D = self.config["DRAGONFLY_D"]
            A = self.config["DRAGONFLY_A"]
            P = self.config["DRAGONFLY_P"]
            self.net_graph = build_dragonfly(D, A, P)

            # total nodes seen by scheduler or job trace
            total_real_nodes = getattr(self, "available_nodes", None)
            if total_real_nodes is None:
                total_real_nodes = 4626  # fallback for Lassen

            # if available_nodes is a list, take its length
            if not isinstance(total_real_nodes, int):
                total_real_nodes = len(total_real_nodes)

            self.real_to_fat_idx = build_dragonfly_idx_map(D, A, P, total_real_nodes)
            print(f"[DEBUG] Dragonfly mapping: {len(self.real_to_fat_idx)} entries")

        elif self.topology == "capacity":
            # Capacity-only model: no explicit graph
@@ -100,18 +125,28 @@ class NetworkModel:
                print("  fat-tree hosts:", host_list)

        elif self.topology == "dragonfly":
            D, A, P = self.config["DRAGONFLY_D"], self.config["DRAGONFLY_A"], self.config["DRAGONFLY_P"]
            host_list = [
                dragonfly_node_id_to_host_name(self.real_to_fat_idx[real_n], D, A, P)
                for real_n in job.scheduled_nodes
            ]
            D = self.config["DRAGONFLY_D"]
            A = self.config["DRAGONFLY_A"]
            P = self.config["DRAGONFLY_P"]
            # Directly use mapped host names
            host_list = [self.real_to_fat_idx[real_n] for real_n in job.scheduled_nodes]
            if debug:
                print("  dragonfly hosts:", host_list)
            print("Example nodes in graph:", list(self.net_graph.nodes)[:10])
            print("Contains h_0_9_0?", "h_0_9_0" in self.net_graph)
            loads = link_loads_for_job(self.net_graph, host_list, net_tx)
            net_cong = worst_link_util(loads, max_throughput)

        elif self.topology == "torus3d":
            host_list = [self.id_to_host[n] for n in job.scheduled_nodes]
            X = self.config["TORUS_X"]
            Y = self.config["TORUS_Y"]
            Z = self.config["TORUS_Z"]
            hosts_per_router = self.config["HOSTS_PER_ROUTER"]
            #host_list = [self.id_to_host[n] for n in job.scheduled_nodes]
            host_list = [
                torus_host_from_real_index(n, X, Y, Z, hosts_per_router)
                for n in job.scheduled_nodes
            ]
            loads = link_loads_for_job_torus(self.net_graph, self.meta, host_list, net_tx)
            net_cong = worst_link_util(loads, max_throughput)
            if debug:
@@ -124,3 +159,23 @@ class NetworkModel:
            raise ValueError(f"Unsupported topology: {self.topology}")

        return net_util, net_cong, net_tx, net_rx, max_throughput

    def plot_topology(self, output_dir):
        """Plot network topology - save as png file in output_dir."""
        if output_dir:
            if self.topology == "fat-tree":
                save_path = output_dir / "net-fat-tree.png"
                plot_fattree_hierarchy(self.net_graph, k=self.fattree_k, save_path=save_path)
            elif self.topology == "dragonfly":
                save_path = output_dir / "net-dragonfly.png"
                plot_dragonfly(self.net_graph, save_path=save_path)
            elif self.topology == "torus3d":
                save_path = output_dir / "net-torus2d.png"
                plot_torus2d(self.net_graph, save_path=save_path)
                save_path = output_dir / "net-torus3d.png"
                plot_torus3d(self.net_graph, save_path=save_path)
            else:
                warnings.warn(
                    f"plotting not supported for {self.topology} topology",
                    UserWarning
                )
+83 −1
Original line number Diff line number Diff line
import networkx as nx

import numpy as np
from raps.utils import get_current_utilization
from raps.network.fat_tree import node_id_to_host_name
from raps.network.torus3d import link_loads_for_job_torus, torus_host_from_real_index

def debug_print_trace(job, label: str = ""):
    """Print either the length (if iterable) or the value of job.gpu_trace."""
@@ -134,3 +137,82 @@ def worst_link_util(loads, throughput):
        if util > max_util:
            max_util = util
    return max_util

def get_link_util_stats(loads, throughput, top_n=10):
    """
    Calculates a distribution of link utilization stats.
    Returns a dictionary with min, mean, max, std_dev, and top N congested links.
    """
    if not loads:
        return {'max': 0, 'mean': 0, 'min': 0, 'std_dev': 0, 'top_links': []}

    # Calculate utilization for every link
    utilizations = {(edge): (byte_load * 8) / throughput for edge, byte_load in loads.items()}
    
    util_values = list(utilizations.values())
    
    stats = {
        'max': np.max(util_values),
        'mean': np.mean(util_values),
        'min': np.min(util_values),
        'std_dev': np.std(util_values)
    }

    # Get top N congested links
    sorted_links = sorted(utilizations.items(), key=lambda item: item[1], reverse=True)
    stats['top_links'] = sorted_links[:top_n]
    
    return stats

def max_throughput_per_tick(legacy_cfg: dict, trace_quanta: int) -> float:
    """Return bytes-per-tick throughput of a single link."""
    bw = legacy_cfg.get("NETWORK_MAX_BW") or 12.5e9
    return float(bw) * trace_quanta

def simulate_inter_job_congestion(network_model, jobs, legacy_cfg, debug=False):
    """
    Simulates network congestion from a list of concurrently running jobs.
    """
    if not network_model.net_graph:
        print("[WARN] Network graph is not defined. Skipping congestion simulation.")
        return 0.0

    total_loads = {tuple(sorted(edge)): 0.0 for edge in network_model.net_graph.edges()}
    trace_quanta = jobs[0].trace_quanta if jobs else 0

    for job in jobs:
        # Assuming job.running_time is 0 for this static simulation
        job.running_time = 0
        job.trace_start_time = 0
        net_tx = get_current_utilization(job.ntx_trace, job)

        job_loads = {}
        if network_model.topology in ("fat-tree", "dragonfly"):
            if network_model.topology == "fat-tree":
                k = int(legacy_cfg.get("FATTREE_K", 32))
                host_list = [node_id_to_host_name(n, k) for n in job.scheduled_nodes]
            else:  # dragonfly
                host_list = [network_model.real_to_fat_idx[real_n] for real_n in job.scheduled_nodes]
            
            job_loads = link_loads_for_job(network_model.net_graph, host_list, net_tx)

        elif network_model.topology == "torus3d":
            X = int(legacy_cfg.get("TORUS_X", 12))
            Y = int(legacy_cfg.get("TORUS_Y", 12))
            Z = int(legacy_cfg.get("TORUS_Z", 12))
            hosts_per_router = int(legacy_cfg.get("HOSTS_PER_ROUTER", 1))
            host_list = [
                torus_host_from_real_index(n, X, Y, Z, hosts_per_router)
                for n in job.scheduled_nodes
            ]
            job_loads = link_loads_for_job_torus(network_model.net_graph, network_model.meta, host_list, net_tx)

        for edge, load in job_loads.items():
            edge_key = tuple(sorted(edge))
            if edge_key in total_loads:
                total_loads[edge_key] += load

    max_throughput = max_throughput_per_tick(legacy_cfg, trace_quanta)
    net_stats = get_link_util_stats(total_loads, max_throughput)
    
    return net_stats
Loading