Commit fb501c08 authored by Brewer, Wes's avatar Brewer, Wes
Browse files

Merge comm pattern improvements

parent 0edd9410
Loading
Loading
Loading
Loading
+29 −5
Original line number Diff line number Diff line
@@ -35,6 +35,21 @@ MESSAGE_SIZE_64K = 64 * 1024 # 64 KiB
MESSAGE_SIZE_1M = 1024 * 1024     # 1 MiB


def normalize_comm_pattern(value):
    """Return a CommunicationPattern for common string variants."""
    if value is None:
        return CommunicationPattern.ALL_TO_ALL
    if isinstance(value, CommunicationPattern):
        return value
    if isinstance(value, str):
        key = value.strip().lower()
        if key in ("all-to-all", "all_to_all", "alltoall"):
            return CommunicationPattern.ALL_TO_ALL
        if key in ("stencil", "stencil-3d", "stencil_3d", "nearest-neighbor", "nearest_neighbor", "nn"):
            return CommunicationPattern.STENCIL_3D
    raise ValueError(f"Unsupported comm_pattern: {value}")


def job_dict(*,
             nodes_required,
             name,
@@ -70,13 +85,14 @@ def job_dict(*,
             trace_missing_values: bool | None = False,
             downscale: int = 1,
             # Communication parameters
             comm_pattern: CommunicationPattern | str = CommunicationPattern.ALL_TO_ALL,
             comm_pattern: CommunicationPattern | str | None = CommunicationPattern.ALL_TO_ALL,
             message_size: int | None = None,  # bytes per message (None = raw bandwidth model)
             message_size_bytes: int | None = MESSAGE_SIZE_64K,
             ):
    """ Return job info dictionary """
    # Normalize comm_pattern to enum
    if isinstance(comm_pattern, str):
        comm_pattern = CommunicationPattern(comm_pattern)
    comm_pattern = normalize_comm_pattern(comm_pattern)
    if message_size is None:
        message_size = message_size_bytes

    return {
        'nodes_required': nodes_required,
@@ -116,6 +132,7 @@ def job_dict(*,
        # Communication parameters:
        'comm_pattern': comm_pattern,
        'message_size': message_size,
        'message_size_bytes': message_size,
    }


@@ -204,11 +221,18 @@ class Job:
        self.current_run_time = 0     # Current running time updated when simulating
        # Communication parameters:
        self.comm_pattern = CommunicationPattern.ALL_TO_ALL
        self.message_size = None  # None = raw bandwidth model (no message overhead)
        self.message_size = MESSAGE_SIZE_64K
        self.message_size_bytes = MESSAGE_SIZE_64K

        # If a job dict was given, override the values from the job_dict:
        for key, value in job_dict.items():
            setattr(self, key, value)
        # Normalize communication attributes after load
        self.comm_pattern = normalize_comm_pattern(self.comm_pattern)
        if getattr(self, "message_size", None) is None:
            self.message_size = getattr(self, "message_size_bytes", MESSAGE_SIZE_64K)
        if getattr(self, "message_size_bytes", None) is None:
            self.message_size_bytes = self.message_size
        # In any case: provide a job_id!
        if self.id is None:  # This is wrong
            self.id = Job._get_next_id()
+7 −8
Original line number Diff line number Diff line
@@ -174,14 +174,13 @@ class NetworkModel:
                torus_host_from_real_index(n, X, Y, Z, hosts_per_router)
                for n in job.scheduled_nodes
            ]
            # For torus3d, use the specialized torus routing
            # but still apply the communication pattern for traffic distribution
            if comm_pattern == CommunicationPattern.STENCIL_3D:
                # Use pattern-aware loading for stencil on torus
                loads = link_loads_for_pattern(self.net_graph, host_list, effective_tx, comm_pattern)
            else:
                # Use torus-specific routing for all-to-all
                loads = link_loads_for_job_torus(self.net_graph, self.meta, host_list, effective_tx)
            loads = link_loads_for_job_torus(
                self.net_graph,
                self.meta,
                host_list,
                effective_tx,
                comm_pattern=comm_pattern,
            )
            net_cong = worst_link_util(loads, max_throughput)
            if debug:
                print("  torus3d hosts:", host_list)
+13 −5
Original line number Diff line number Diff line
@@ -3,7 +3,7 @@ import numpy as np
from raps.utils import get_current_utilization
from raps.network.fat_tree import node_id_to_host_name
from raps.network.torus3d import link_loads_for_job_torus, torus_host_from_real_index
from raps.job import CommunicationPattern
from raps.job import CommunicationPattern, normalize_comm_pattern


# Message overhead constants (typical HPC network headers)
@@ -275,7 +275,7 @@ def link_loads_for_job_stencil_3d(G, job_hosts, tx_volume_bytes):
    return loads


def apply_message_size_overhead(tx_volume_bytes, message_size, num_peers):
def apply_message_size_overhead(tx_volume_bytes, message_size, num_peers, *, overhead_bytes=None):
    """
    Apply message size overhead to the traffic volume.

@@ -308,7 +308,8 @@ def apply_message_size_overhead(tx_volume_bytes, message_size, num_peers):
    total_messages = messages_per_peer * num_peers

    # Add header overhead for each message
    overhead_bytes = total_messages * MESSAGE_HEADER_OVERHEAD
    per_msg_overhead = MESSAGE_HEADER_OVERHEAD if overhead_bytes is None else overhead_bytes
    overhead_bytes = total_messages * per_msg_overhead

    return tx_volume_bytes + overhead_bytes

@@ -326,7 +327,8 @@ def get_effective_traffic(tx_volume_bytes, job, num_hosts):
        Effective transmit volume with overhead applied
    """
    message_size = getattr(job, 'message_size', None)
    comm_pattern = getattr(job, 'comm_pattern', CommunicationPattern.ALL_TO_ALL)
    comm_pattern = normalize_comm_pattern(getattr(job, 'comm_pattern', CommunicationPattern.ALL_TO_ALL))
    overhead_bytes = getattr(job, 'message_overhead_bytes', None)

    # Calculate number of peers based on pattern
    if comm_pattern == CommunicationPattern.STENCIL_3D:
@@ -336,7 +338,12 @@ def get_effective_traffic(tx_volume_bytes, job, num_hosts):
        # All-to-all: everyone talks to everyone
        num_peers = max(1, num_hosts - 1)

    return apply_message_size_overhead(tx_volume_bytes, message_size, num_peers)
    return apply_message_size_overhead(
        tx_volume_bytes,
        message_size,
        num_peers,
        overhead_bytes=overhead_bytes,
    )


def link_loads_for_pattern(G, job_hosts, tx_volume_bytes, comm_pattern):
@@ -352,6 +359,7 @@ def link_loads_for_pattern(G, job_hosts, tx_volume_bytes, comm_pattern):
    Returns:
        dict {(u,v): bytes, ...} of link loads
    """
    comm_pattern = normalize_comm_pattern(comm_pattern)
    if comm_pattern == CommunicationPattern.STENCIL_3D:
        return link_loads_for_job_stencil_3d(G, job_hosts, tx_volume_bytes)
    else:
+103 −9
Original line number Diff line number Diff line
import csv
import math
import networkx as nx
from pathlib import Path
from raps.job import CommunicationPattern, normalize_comm_pattern


def build_torus3d(
@@ -169,18 +171,111 @@ def torus_host_path(G, meta, h_src, h_dst):
    return path


def link_loads_for_job_torus(G, meta, host_list, traffic_bytes):
    # all-to-all between hosts in host_list, route via torus_host_path, add traffic_bytes per pair
    loads = {}
def factorize_3d(n):
    """
    Factorize n into three dimensions (x, y, z) for a virtual 3D grid.
    Tries to make dimensions as equal as possible.
    Returns (x, y, z) where x * y * z >= n.
    """
    if n <= 0:
        return (1, 1, 1)

    cube_root = int(math.ceil(n ** (1/3)))
    best = (n, 1, 1)
    best_diff = n

    for x in range(1, cube_root + 2):
        for y in range(1, cube_root + 2):
            z = int(math.ceil(n / (x * y)))
            if x * y * z >= n:
                diff = max(x, y, z) - min(x, y, z)
                if diff < best_diff or (diff == best_diff and x * y * z < best[0] * best[1] * best[2]):
                    best = (x, y, z)
                    best_diff = diff

    return best


def get_stencil_3d_neighbors(node_idx, dims, num_nodes):
    """
    Get the 6 neighbors for a node in a 3D stencil pattern.
    Uses periodic boundary conditions (wraps around).
    """
    x_dim, y_dim, z_dim = dims

    z = node_idx // (x_dim * y_dim)
    remainder = node_idx % (x_dim * y_dim)
    y = remainder // x_dim
    x = remainder % x_dim

    neighbors = []
    neighbor_offsets = [
        (1, 0, 0), (-1, 0, 0),
        (0, 1, 0), (0, -1, 0),
        (0, 0, 1), (0, 0, -1),
    ]

    for dx, dy, dz in neighbor_offsets:
        nx = (x + dx) % x_dim
        ny = (y + dy) % y_dim
        nz = (z + dz) % z_dim
        neighbor_idx = nz * (x_dim * y_dim) + ny * x_dim + nx
        if neighbor_idx < num_nodes and neighbor_idx != node_idx:
            neighbors.append(neighbor_idx)

    return neighbors


def stencil_3d_pairs(host_list):
    """Generate (src, dst) pairs for a 3D stencil pattern."""
    num_nodes = len(host_list)
    dims = factorize_3d(num_nodes)
    pairs = []
    for idx, host in enumerate(host_list):
        neighbors = get_stencil_3d_neighbors(idx, dims, num_nodes)
        for neighbor_idx in neighbors:
            pairs.append((host, host_list[neighbor_idx]))
    return pairs


def link_loads_for_job_torus(G, meta, host_list, traffic_bytes, *, comm_pattern=None):
    """
    Distribute traffic_bytes using torus routing, respecting comm_pattern.
    """
    loads = {edge: 0.0 for edge in G.edges()}
    comm_pattern = normalize_comm_pattern(comm_pattern or CommunicationPattern.ALL_TO_ALL)
    n = len(host_list)
    for i in range(n):
        for j in range(i + 1, n):
            p = torus_host_path(G, meta, host_list[i], host_list[j])

    if n <= 1:
        return loads

    if comm_pattern == CommunicationPattern.ALL_TO_ALL:
        for src in host_list:
            per_peer = traffic_bytes / (n - 1)
            for dst in host_list:
                if dst == src:
                    continue
                p = torus_host_path(G, meta, src, dst)
                for u, v in zip(p, p[1:]):
                    e = tuple(sorted((u, v)))
                loads[e] = loads.get(e, 0) + traffic_bytes
                    loads[e] += per_peer
        return loads

    if comm_pattern == CommunicationPattern.STENCIL_3D:
        pairs = stencil_3d_pairs(host_list)
        neighbor_count = {}
        for src, _ in pairs:
            neighbor_count[src] = neighbor_count.get(src, 0) + 1
        for src, dst in pairs:
            per_neighbor = traffic_bytes / neighbor_count.get(src, 1)
            p = torus_host_path(G, meta, src, dst)
            for u, v in zip(p, p[1:]):
                e = tuple(sorted((u, v)))
                loads[e] += per_neighbor
        return loads

    raise ValueError(f"Unsupported comm_pattern: {comm_pattern}")


def torus_host_from_real_index(real_n, X, Y, Z, hosts_per_router):
    total_hosts = X * Y * Z * hosts_per_router
@@ -191,4 +286,3 @@ def torus_host_from_real_index(real_n, X, Y, Z, hosts_per_router):
    y = (r // Z) % Y
    x = (r // (Y * Z)) % X
    return f"h_{x}_{y}_{z}_{h}"
+23 −1
Original line number Diff line number Diff line
@@ -22,6 +22,7 @@ import argparse
from pathlib import Path

from raps.system_config import get_system_config
from raps.job import normalize_comm_pattern
from raps.network import (
    NetworkModel,
    simulate_inter_job_congestion,
@@ -49,6 +50,13 @@ def main():
    parser.add_argument("--config", required=True, help="Path to system YAML (e.g., config/lassen.yaml)")
    parser.add_argument("--jobs", type=int, default=60, help="Number of synthetic jobs")
    parser.add_argument("--txfrac", type=float, default=0.35, help="Fraction of per-link bandwidth per job")
    parser.add_argument("--comm-pattern", default="all-to-all",
                        choices=("all-to-all", "stencil", "stencil-3d"),
                        help="Communication pattern for all jobs")
    parser.add_argument("--message-size-bytes", type=float, default=65536,
                        help="Average message size in bytes")
    parser.add_argument("--message-overhead-bytes", type=float, default=64,
                        help="Per-message overhead in bytes")
    parser.add_argument("--debug", action="store_true", help="Enable network debug output")
    parser.add_argument("--verbose", "-v", action="store_true", help="Print detailed statistics")
    args = parser.parse_args()
@@ -72,8 +80,22 @@ def main():
    workload_data = workload_generator.generate_jobs()
    jobs = workload_data.jobs

    comm_pattern = normalize_comm_pattern(args.comm_pattern)
    message_size_bytes = float(args.message_size_bytes)
    if message_size_bytes <= 0:
        raise ValueError("--message-size-bytes must be > 0")
    message_overhead_bytes = max(0.0, float(args.message_overhead_bytes))

    for job in jobs:
        job.comm_pattern = comm_pattern
        job.message_size = message_size_bytes
        job.message_size_bytes = message_size_bytes
        job.message_overhead_bytes = message_overhead_bytes

    print(f"[INFO] Detected topology: {topology}")
    print(f"[INFO] Generated {len(jobs)} jobs for congestion test.")
    print(f"[INFO] comm_pattern={comm_pattern.value}, message_size_bytes={message_size_bytes:.0f}, "
          f"message_overhead_bytes={message_overhead_bytes:.0f}")

    # --- Initialize network model ---
    net = NetworkModel(