Commit 270368d2 authored by Brewer, Wes's avatar Brewer, Wes
Browse files

feat(network): add stall/packet ratio (Cassini counter analog)



Implements the STALL_FLIT_PLAN.md Stage 1+2 math linking RAPS link loads
and slowdown factor to the Cassini hardware counter ratio:
  (hni_tx_paused_0 + hni_tx_paused_1) / parbs_tarb_pi_posted_pkts

Key changes:
- base.py: add compute_stall_ratio(), compute_link_stall_packet_stats(),
  aggregate_link_stall_stats(); apply_job_slowdown now sets job.stall_ratio
- network/__init__.py: export new functions; add NetworkModel.compute_tick_stall_stats()
  using accumulated global_link_loads + mean packet size
- engine.py: collect per-job stall_ratios each tick; expose avg_stall_ratio,
  max_stall_ratio, total_posted_pkts, total_tx_paused in TickReturn/TickData;
  add stall_ratio_history time series
- job.py: JobStatistics records slowdown_factor and stall_ratio per completed job
- run_sim.py: write stall_ratio_history.parquet when network sim is active
- stats.py: avg_stall_ratio and max_stall_ratio added to Network Report
- config/frontier.yaml: add mean_packet_size_bytes=116, flit_size_bytes=64
- tests/unit/test_stall_ratio.py: 15 unit tests covering all new functions

Co-Authored-By: default avatarClaude Sonnet 4.6 <noreply@anthropic.com>
parent 5ca3a45e
Loading
Loading
Loading
Loading
+3 −0
Original line number Diff line number Diff line
@@ -90,3 +90,6 @@ network:
  dragonfly_a: 48  # Global links per router (49 groups total)
  dragonfly_p: 4   # Compute nodes per router
  latency: 1
  # Cassini stall/flit model parameters (from counter data: 25 GB/s / 215M pkts/s ≈ 116 B)
  mean_packet_size_bytes: 116
  flit_size_bytes: 64            # Slingshot Rosetta flit unit
+45 −2
Original line number Diff line number Diff line
@@ -56,6 +56,10 @@ class TickReturn:
    avg_net_tx: Optional[float]
    avg_net_rx: Optional[float]
    avg_net_util: Optional[float]
    avg_stall_ratio: Optional[float]
    max_stall_ratio: Optional[float]
    total_posted_pkts: Optional[float]
    total_tx_paused: Optional[float]
    slowdown_per_job: float
    node_occupancy: dict[int, int]

@@ -80,6 +84,10 @@ class TickData:
    avg_net_tx: Optional[float]
    avg_net_rx: Optional[float]
    avg_net_util: Optional[float]
    avg_stall_ratio: Optional[float]
    max_stall_ratio: Optional[float]
    total_posted_pkts: Optional[float]
    total_tx_paused: Optional[float]
    slowdown_per_job: Optional[float]
    node_occupancy: Optional[dict[int, int]]
    time_delta: int
@@ -302,6 +310,7 @@ class Engine:
        self.avg_net_rx = []
        self.net_util_history = []
        self.net_congestion_history = []
        self.stall_ratio_history = []
        self.avg_slowdown_history = []
        self.max_slowdown_history = []
        self.node_occupancy_history = []
@@ -560,6 +569,7 @@ class Engine:
        net_rx_list = []

        slowdown_factors = []
        stall_ratios = []

        for job in self.running:

@@ -626,6 +636,7 @@ class Engine:
                                                     net_rx=net_rx,
                                                     debug=self.debug)
                slowdown_factors.append(slowdown_factor)
                stall_ratios.append(getattr(job, 'stall_ratio', 0.0))

        # All required values for each jobs have been an collected.
        # Continue with calculations for the whole system:
@@ -685,12 +696,34 @@ class Engine:
                                                                   slowdown_factors=slowdown_factors
                                                                   )
            slowdown_per_job = sum(slowdown_factors) / len(slowdown_factors) if len(slowdown_factors) != 0 else 0
            avg_stall_ratio = sum(stall_ratios) / len(stall_ratios) if stall_ratios else 0.0
            max_stall_ratio = max(stall_ratios) if stall_ratios else 0.0

            # Per-link posted_pkts / tx_paused using accumulated global link loads
            mean_pkt_size = self.config.get('MEAN_PACKET_SIZE_BYTES', 116)
            if hasattr(self.network_model, 'compute_tick_stall_stats') and self.network_model.global_link_loads:
                tick_stall = self.network_model.compute_tick_stall_stats(
                    mean_pkt_size_bytes=mean_pkt_size,
                    dt=time_delta,
                    avg_slowdown=slowdown_per_job,
                )
                total_posted_pkts = tick_stall['total_posted_pkts']
                total_tx_paused = tick_stall['total_tx_paused']
            else:
                total_posted_pkts = None
                total_tx_paused = None

            self.record_network_stats(avg_tx=avg_tx,
                                      avg_rx=avg_rx,
                                      avg_net=avg_net)
                                      avg_net=avg_net,
                                      avg_stall_ratio=avg_stall_ratio)
        else:
            avg_tx, avg_rx, avg_net = None, None, None
            slowdown_per_job = 0
            avg_stall_ratio = None
            max_stall_ratio = None
            total_posted_pkts = None
            total_tx_paused = None

        # Continue with System Simulation

@@ -713,6 +746,10 @@ class Engine:
            avg_net_tx=avg_tx,
            avg_net_rx=avg_rx,
            avg_net_util=avg_net,
            avg_stall_ratio=avg_stall_ratio,
            max_stall_ratio=max_stall_ratio,
            total_posted_pkts=total_posted_pkts,
            total_tx_paused=total_tx_paused,
            slowdown_per_job=slowdown_per_job,
            node_occupancy=node_occupancy,
        )
@@ -843,6 +880,10 @@ class Engine:
                avg_net_rx=tick_return.avg_net_rx,
                avg_net_tx=tick_return.avg_net_tx,
                avg_net_util=tick_return.avg_net_util,
                avg_stall_ratio=tick_return.avg_stall_ratio,
                max_stall_ratio=tick_return.max_stall_ratio,
                total_posted_pkts=tick_return.total_posted_pkts,
                total_tx_paused=tick_return.total_tx_paused,
                slowdown_per_job=tick_return.slowdown_per_job,
                node_occupancy=tick_return.node_occupancy,
                time_delta=self.time_delta
@@ -874,11 +915,13 @@ class Engine:
    def record_network_stats(self, *,
                             avg_tx,
                             avg_rx,
                             avg_net
                             avg_net,
                             avg_stall_ratio=None,
                             ):
        self.avg_net_tx.append(avg_tx)
        self.avg_net_rx.append(avg_rx)
        self.net_util_history.append(avg_net)
        self.stall_ratio_history.append((self.current_timestep, avg_stall_ratio))

    def record_power_stats(self, *, time_delta, total_power_kw, total_loss_kw, jobs_power):
        if (time_delta == 1 and self.current_timestep % self.config['POWER_UPDATE_FREQ'] == 0) or time_delta != 1:
+2 −0
Original line number Diff line number Diff line
@@ -405,6 +405,8 @@ class JobStatistics:
            self.avg_node_power = sum(job.power_history) / len(job.power_history) / self.num_nodes
            self.max_node_power = max(job.power_history) / self.num_nodes
        self.energy = self.run_time * self.avg_node_power * self.num_nodes
        self.slowdown_factor = getattr(job, 'slowdown_factor', 1.0)
        self.stall_ratio = getattr(job, 'stall_ratio', 0.0)


if __name__ == "__main__":
+30 −0
Original line number Diff line number Diff line
@@ -6,6 +6,9 @@ from raps.job import CommunicationPattern
from .base import (
    all_to_all_paths,
    apply_job_slowdown,
    compute_link_stall_packet_stats,
    aggregate_link_stall_stats,
    compute_stall_ratio,
    compute_system_network_stats,
    link_loads_for_job,
    link_loads_for_job_stencil_3d,
@@ -33,6 +36,9 @@ from raps.utils import get_current_utilization
__all__ = [
    "NetworkModel",
    "apply_job_slowdown",
    "compute_link_stall_packet_stats",
    "aggregate_link_stall_stats",
    "compute_stall_ratio",
    "compute_system_network_stats",
    "network_congestion",
    "network_utilization",
@@ -261,6 +267,30 @@ class NetworkModel:

        return net_util, net_cong, net_tx, net_rx, max_throughput

    def compute_tick_stall_stats(self, *, mean_pkt_size_bytes, dt, avg_slowdown=1.0):
        """
        Compute system-level stall/packet stats for the current tick using
        accumulated global_link_loads.

        Args:
            mean_pkt_size_bytes: Mean packet size in bytes (from config)
            dt: Tick duration in seconds
            avg_slowdown: System-average slowdown factor for the tick

        Returns:
            dict with 'total_posted_pkts', 'total_tx_paused', 'system_stall_ratio'
        """
        if not self.global_link_loads:
            return {'total_posted_pkts': 0.0, 'total_tx_paused': 0.0, 'system_stall_ratio': 0.0}
        link_stats = compute_link_stall_packet_stats(
            self.global_link_loads,
            self.max_link_bw * 8,      # convert bytes/s → bits/s
            mean_pkt_size_bytes,
            dt,
            avg_slowdown,
        )
        return aggregate_link_stall_stats(link_stats)

    def reset_link_loads(self):
        """Reset global link loads at the start of each simulation tick."""
        if self.net_graph is not None:
+72 −0
Original line number Diff line number Diff line
@@ -18,6 +18,77 @@ def debug_print_trace(job, label: str = ""):
        print(f"gpu_trace value {job.gpu_trace} {label}")


def compute_link_stall_packet_stats(loads, link_bw_bps, mean_pkt_size_bytes, dt, slowdown_factor):
    """
    Compute per-link stall/packet stats from RAPS link loads.

    Maps to Cassini counters:
      posted_pkts  ~ parbs_tarb_pi_posted_pkts
      tx_paused    ~ hni_tx_paused_0 + hni_tx_paused_1
      stall_ratio  = tx_paused / posted_pkts  = slowdown_factor - 1

    Args:
        loads: dict {edge: byte_load} per-link byte load for the tick
        link_bw_bps: link bandwidth in bits/s (e.g. 25e9 * 8 for 25 GB/s)
        mean_pkt_size_bytes: mean packet size in bytes (116 for Frontier Slingshot)
        dt: timestep duration in seconds
        slowdown_factor: network slowdown factor (≥1.0); may be per-job average

    Returns:
        dict {edge: {'posted_pkts', 'tx_paused', 'stall_ratio', 'utilization'}}
    """
    max_pkt_rate = link_bw_bps / (mean_pkt_size_bytes * 8)  # pkts/s at 100% utilization
    stall = max(0.0, float(slowdown_factor) - 1.0)
    stats = {}
    for edge, byte_load in loads.items():
        rho = (byte_load * 8) / (link_bw_bps * dt) if dt > 0 else 0.0
        posted_pkts = rho * max_pkt_rate * dt
        stats[edge] = {
            'posted_pkts': posted_pkts,
            'tx_paused': stall * posted_pkts,
            'stall_ratio': stall,
            'utilization': min(rho, 1.0),
        }
    return stats


def aggregate_link_stall_stats(link_stats):
    """
    Aggregate per-link stall stats to system-level totals.

    Returns:
        dict with 'total_posted_pkts', 'total_tx_paused', 'system_stall_ratio'
    """
    total_posted = sum(s['posted_pkts'] for s in link_stats.values())
    total_paused = sum(s['tx_paused'] for s in link_stats.values())
    system_stall_ratio = total_paused / total_posted if total_posted > 0 else 0.0
    return {
        'total_posted_pkts': total_posted,
        'total_tx_paused': total_paused,
        'system_stall_ratio': system_stall_ratio,
    }


def compute_stall_ratio(slowdown_factor):
    """
    Compute the stall/packet ratio from a slowdown factor.

    This is the RAPS analog of the Cassini counter ratio:
        (hni_tx_paused_0 + hni_tx_paused_1) / parbs_tarb_pi_posted_pkts

    Derivation: tx_paused = (s - 1) * posted_pkts, so stall_ratio = s - 1.
    - No congestion (s=1): stall_ratio = 0
    - Frontier-level load (s≈6.7): stall_ratio ≈ 5.7

    Args:
        slowdown_factor: Network slowdown factor (≥1.0)

    Returns:
        Dimensionless stall/packet ratio (≥0.0)
    """
    return max(0.0, float(slowdown_factor) - 1.0)


def apply_job_slowdown(*, job, max_throughput, net_util, net_cong, net_tx, net_rx, debug: bool = False):
    # Get the maximum allowed bandwidth from the configuration.
    if net_cong > 1:
@@ -42,6 +113,7 @@ def apply_job_slowdown(*, job, max_throughput, net_util, net_cong, net_tx, net_r
    else:
        slowdown_factor = 1
    job.slowdown_factor = slowdown_factor
    job.stall_ratio = compute_stall_ratio(slowdown_factor)

    return slowdown_factor

Loading