Commit dd00519b authored by Brewer, Wes's avatar Brewer, Wes
Browse files

Add adaptive routing support for Dragonfly topology

Implement UGAL, Valiant, and minimal routing algorithms for Dragonfly
networks. UGAL selects between minimal and non-minimal paths based on
link congestion. Valiant supports configurable bias for probabilistic
path selection.

- Add RoutingAlgorithm enum (minimal, valiant, ugal) to policy.py
- Add routing config fields to SystemNetworkConfig
- Implement path computation functions in dragonfly.py
- Update NetworkModel to track link loads for adaptive routing
- Configure Frontier with Slingshot dragonfly topology and UGAL routing
- Add comprehensive unit tests for routing functions

🤖 Generated with [Claude Code](https://claude.com/claude-code

)

Co-Authored-By: default avatarClaude Opus 4.5 <noreply@anthropic.com>
parent 20c966c4
Loading
Loading
Loading
Loading
+9 −0
Original line number Diff line number Diff line
@@ -81,3 +81,12 @@ cooling:
  w_htwps_key: "simulator[1].centralEnergyPlant[1].hotWaterLoop[1].summary.W_flow_HTWP_kW"
  w_ctwps_key: "simulator[1].centralEnergyPlant[1].coolingTowerLoop[1].summary.W_flow_CTWP_kW"
  w_cts_key: "simulator[1].centralEnergyPlant[1].coolingTowerLoop[1].summary.W_flow_CT_kW"
network:
  topology: dragonfly
  network_max_bw: 25e9  # Slingshot 200 Gbps = 25 GB/s
  routing_algorithm: ugal
  ugal_threshold: 2.0
  dragonfly_d: 48  # Routers per group
  dragonfly_a: 48  # Global links per router (49 groups total)
  dragonfly_p: 4   # Compute nodes per router
  latency: 1
+56 −3
Original line number Diff line number Diff line
@@ -63,6 +63,14 @@ class NetworkModel:
        self.max_link_bw = config.get("NETWORK_MAX_BW", 1e9)  # default safeguard
        self.real_to_fat_idx = kwargs.get("real_to_fat_idx", {})

        # Routing algorithm configuration
        self.routing_algorithm = config.get("ROUTING_ALGORITHM", "minimal")
        self.ugal_threshold = config.get("UGAL_THRESHOLD", 2.0)
        self.valiant_bias = config.get("VALIANT_BIAS", 0.0)

        # Global link loads for adaptive routing (reset each tick)
        self.global_link_loads = {}

        if self.topology == "fat-tree":
            total_nodes = config['TOTAL_NODES'] - len(config['DOWN_NODES'])
            self.fattree_k = config.get("FATTREE_K")
@@ -100,6 +108,11 @@ class NetworkModel:
            P = self.config["DRAGONFLY_P"]
            self.net_graph = build_dragonfly(D, A, P)

            # Store dragonfly params for routing
            self.dragonfly_d = D
            self.dragonfly_a = A
            self.dragonfly_p = P

            # total nodes seen by scheduler or job trace
            total_real_nodes = getattr(self, "available_nodes", None)
            if total_real_nodes is None:
@@ -110,7 +123,16 @@ class NetworkModel:
                total_real_nodes = len(total_real_nodes)

            self.real_to_fat_idx = build_dragonfly_idx_map(D, A, P, total_real_nodes)
            print(f"[DEBUG] Dragonfly mapping: {len(self.real_to_fat_idx)} entries")

            # Initialize global link loads for adaptive routing
            self.global_link_loads = {tuple(sorted(edge)): 0.0 for edge in self.net_graph.edges()}

            routing_info = f"routing={self.routing_algorithm}"
            if self.routing_algorithm == 'ugal':
                routing_info += f", threshold={self.ugal_threshold}"
            elif self.routing_algorithm == 'valiant':
                routing_info += f", bias={self.valiant_bias}"
            print(f"[DEBUG] Dragonfly: {len(self.real_to_fat_idx)} nodes, {routing_info}")

        elif self.topology == "capacity":
            # Capacity-only model: no explicit graph
@@ -160,11 +182,35 @@ class NetworkModel:
            host_list = [self.real_to_fat_idx[real_n] for real_n in job.scheduled_nodes]
            if debug:
                print("  dragonfly hosts:", host_list)
                print(f"  routing: {self.routing_algorithm}")
                print("Example nodes in graph:", list(self.net_graph.nodes)[:10])
                print("Contains h_0_9_0?", "h_0_9_0" in self.net_graph)
            loads = link_loads_for_pattern(self.net_graph, host_list, effective_tx, comm_pattern)

            # Build dragonfly params for adaptive routing
            dragonfly_params = {
                'd': D,
                'a': A,
                'ugal_threshold': self.ugal_threshold,
                'valiant_bias': self.valiant_bias,
            }

            loads = link_loads_for_pattern(
                self.net_graph,
                host_list,
                effective_tx,
                comm_pattern,
                routing_algorithm=self.routing_algorithm,
                dragonfly_params=dragonfly_params,
                link_loads=self.global_link_loads,
            )
            net_cong = worst_link_util(loads, max_throughput)

            # Update global link loads for UGAL decisions
            if self.routing_algorithm in ('ugal', 'valiant'):
                for edge, load in loads.items():
                    edge_key = tuple(sorted(edge))
                    if edge_key in self.global_link_loads:
                        self.global_link_loads[edge_key] += load

        elif self.topology == "torus3d":
            X = self.config["TORUS_X"]
            Y = self.config["TORUS_Y"]
@@ -193,6 +239,13 @@ class NetworkModel:

        return net_util, net_cong, net_tx, net_rx, max_throughput

    def reset_link_loads(self):
        """Reset global link loads at the start of each simulation tick."""
        if self.net_graph is not None:
            self.global_link_loads = {
                tuple(sorted(edge)): 0.0 for edge in self.net_graph.edges()
            }

    def plot_topology(self, output_dir):
        """Plot network topology - save as png file in output_dir."""
        if output_dir:
+33 −2
Original line number Diff line number Diff line
@@ -346,20 +346,51 @@ def get_effective_traffic(tx_volume_bytes, job, num_hosts):
    )


def link_loads_for_pattern(G, job_hosts, tx_volume_bytes, comm_pattern):
def link_loads_for_pattern(
    G,
    job_hosts,
    tx_volume_bytes,
    comm_pattern,
    *,
    routing_algorithm: str | None = None,
    dragonfly_params: dict | None = None,
    link_loads: dict | None = None,
):
    """
    Dispatch to appropriate link load calculation based on communication pattern.
    Dispatch to appropriate link load calculation based on communication pattern
    and routing algorithm.

    Args:
        G: NetworkX graph
        job_hosts: List of host names
        tx_volume_bytes: Total transmit volume per host
        comm_pattern: CommunicationPattern enum value
        routing_algorithm: Routing algorithm for Dragonfly ('minimal', 'ugal', 'valiant')
        dragonfly_params: Dict with 'd', 'a', 'ugal_threshold', 'valiant_bias' for Dragonfly
        link_loads: Current global link loads (for adaptive routing decisions)

    Returns:
        dict {(u,v): bytes, ...} of link loads
    """
    from raps.network.dragonfly import link_loads_for_job_dragonfly_adaptive

    comm_pattern = normalize_comm_pattern(comm_pattern)

    # Handle adaptive routing for Dragonfly
    if routing_algorithm and dragonfly_params and routing_algorithm != 'minimal':
        return link_loads_for_job_dragonfly_adaptive(
            G,
            job_hosts,
            tx_volume_bytes,
            algorithm=routing_algorithm,
            d=dragonfly_params['d'],
            a=dragonfly_params['a'],
            link_loads=link_loads,
            ugal_threshold=dragonfly_params.get('ugal_threshold', 2.0),
            valiant_bias=dragonfly_params.get('valiant_bias', 0.0),
        )

    # Standard routing (shortest path)
    if comm_pattern == CommunicationPattern.STENCIL_3D:
        return link_loads_for_job_stencil_3d(G, job_hosts, tx_volume_bytes)
    else:
+415 −3
Original line number Diff line number Diff line
import random
import networkx as nx
from itertools import combinations


import networkx as nx

def build_dragonfly(d, a, p):
    """
    Build a Dragonfly network graph.
@@ -142,3 +140,417 @@ def build_dragonfly_idx_map(d: int, a: int, p: int, total_real_nodes: int) -> di
        host = fat_idx % p
        mapping[i] = f"h_{group}_{router}_{host}"
    return mapping


# =============================================================================
# Adaptive Routing Functions for Dragonfly
# =============================================================================

def parse_dragonfly_host(host_name: str) -> tuple[int, int, int]:
    """
    Parse a Dragonfly host name into its components.

    Args:
        host_name: Host name in format 'h_{group}_{router}_{port}'

    Returns:
        Tuple of (group, router, port)
    """
    parts = host_name.split("_")
    return int(parts[1]), int(parts[2]), int(parts[3])


def parse_dragonfly_router(router_name: str) -> tuple[int, int]:
    """
    Parse a Dragonfly router name into its components.

    Args:
        router_name: Router name in format 'r_{group}_{router}'

    Returns:
        Tuple of (group, router_index)
    """
    parts = router_name.split("_")
    return int(parts[1]), int(parts[2])


def get_host_router(host_name: str) -> str:
    """
    Get the router name for a given host.

    Args:
        host_name: Host name in format 'h_{group}_{router}_{port}'

    Returns:
        Router name in format 'r_{group}_{router}'
    """
    group, router, _ = parse_dragonfly_host(host_name)
    return f"r_{group}_{router}"


def dragonfly_minimal_path(src_host: str, dst_host: str, d: int, a: int) -> list[str]:
    """
    Compute the minimal path between two hosts in a Dragonfly network.

    Minimal paths in Dragonfly use at most one global link:
    - Intra-group: host → router → [local hop] → router → host (max 2 router hops)
    - Inter-group: host → router → [local] → global → [local] → router → host (3 router hops)

    Args:
        src_host: Source host name (h_g_r_p format)
        dst_host: Destination host name (h_g_r_p format)
        d: Number of routers per group
        a: Number of global links per router (num_groups = a + 1)

    Returns:
        List of node names forming the minimal path
    """
    src_group, src_router, _ = parse_dragonfly_host(src_host)
    dst_group, dst_router, _ = parse_dragonfly_host(dst_host)

    src_r = f"r_{src_group}_{src_router}"
    dst_r = f"r_{dst_group}_{dst_router}"

    # Same host
    if src_host == dst_host:
        return [src_host]

    # Same router
    if src_group == dst_group and src_router == dst_router:
        return [src_host, src_r, dst_host]

    # Intra-group: full mesh, so direct local link
    if src_group == dst_group:
        return [src_host, src_r, dst_r, dst_host]

    # Inter-group: need to use global link
    # In build_dragonfly(), router r in group g connects to router (r % d) in other groups
    # So from src_router in src_group, the global link lands at router (src_router % d) in dst_group

    path = [src_host, src_r]

    # Global link destination router in dst_group
    global_landing_router = src_router % d

    # Take the global link
    path.append(f"r_{dst_group}_{global_landing_router}")

    # If we didn't land at the destination router, add local hop
    if global_landing_router != dst_router:
        path.append(dst_r)

    path.append(dst_host)
    return path


def dragonfly_nonminimal_path(
    src_host: str,
    dst_host: str,
    intermediate_group: int,
    d: int,
    a: int
) -> list[str]:
    """
    Compute a non-minimal path via an intermediate group (Valiant routing).

    Non-minimal paths use two global links:
    src_group → intermediate_group → dst_group

    Args:
        src_host: Source host name
        dst_host: Destination host name
        intermediate_group: Group index to route through
        d: Number of routers per group
        a: Number of global links per router

    Returns:
        List of node names forming the non-minimal path
    """
    src_group, src_router, _ = parse_dragonfly_host(src_host)
    dst_group, dst_router, _ = parse_dragonfly_host(dst_host)

    src_r = f"r_{src_group}_{src_router}"
    dst_r = f"r_{dst_group}_{dst_router}"

    path = [src_host, src_r]

    # Phase 1: Source group → Intermediate group
    if src_group != intermediate_group:
        # Global link from src_router lands at router (src_router % d) in intermediate
        inter_landing = src_router % d
        path.append(f"r_{intermediate_group}_{inter_landing}")
        current_router = inter_landing
    else:
        # Already in intermediate group (shouldn't happen in valid Valiant)
        current_router = src_router

    # Phase 2: Intermediate group → Destination group
    if intermediate_group != dst_group:
        # Global link from current_router lands at router (current_router % d) in dst_group
        dst_landing = current_router % d
        path.append(f"r_{dst_group}_{dst_landing}")

        # Local hop to destination router if needed
        if dst_landing != dst_router:
            path.append(dst_r)
    else:
        # Intermediate is destination (shouldn't happen in valid Valiant)
        if current_router != dst_router:
            path.append(dst_r)

    path.append(dst_host)
    return path


def estimate_path_latency(
    path: list[str],
    link_loads: dict,
    hop_latency: float = 1.0,
    congestion_weight: float = 1.0
) -> float:
    """
    Estimate the latency of a path based on hop count and link congestion.

    Used by UGAL to compare minimal vs non-minimal paths.

    Args:
        path: List of node names in the path
        link_loads: Dict {(u, v): load_bytes} of current link loads
        hop_latency: Base latency per hop (default 1.0)
        congestion_weight: Weight for congestion term (default 1.0)

    Returns:
        Estimated latency (higher = more congested/longer path)
    """
    if len(path) <= 1:
        return 0.0

    latency = 0.0

    for i in range(len(path) - 1):
        u, v = path[i], path[i + 1]

        # Normalize edge key (try both orderings)
        edge = (u, v) if (u, v) in link_loads else (v, u)

        # Base hop latency
        latency += hop_latency

        # Add congestion component
        load = link_loads.get(edge, 0.0)
        latency += congestion_weight * load

    return latency


def ugal_select_path(
    src_host: str,
    dst_host: str,
    link_loads: dict,
    d: int,
    a: int,
    threshold: float = 2.0
) -> list[str]:
    """
    UGAL (Universal Globally-Adaptive Load-balanced) path selection.

    Compares minimal path latency to best non-minimal path latency.
    Decision rule: if minimal_latency < threshold * best_nonminimal_latency,
    use minimal path; otherwise use non-minimal.

    Args:
        src_host: Source host name
        dst_host: Destination host name
        link_loads: Current link load state {(u, v): bytes}
        d: Number of routers per group
        a: Number of global links per router
        threshold: Decision threshold (default 2.0, standard UGAL)

    Returns:
        Selected path as list of node names
    """
    num_groups = a + 1
    src_group, _, _ = parse_dragonfly_host(src_host)
    dst_group, _, _ = parse_dragonfly_host(dst_host)

    # Compute minimal path and its latency
    minimal_path = dragonfly_minimal_path(src_host, dst_host, d, a)
    minimal_latency = estimate_path_latency(minimal_path, link_loads)

    # For intra-group traffic, always use minimal (no benefit from non-minimal)
    if src_group == dst_group:
        return minimal_path

    # Evaluate non-minimal paths through each intermediate group
    best_nonminimal_path = None
    best_nonminimal_latency = float('inf')

    for inter_group in range(num_groups):
        # Skip source and destination groups (not valid intermediate)
        if inter_group == src_group or inter_group == dst_group:
            continue

        nonminimal_path = dragonfly_nonminimal_path(
            src_host, dst_host, inter_group, d, a
        )
        latency = estimate_path_latency(nonminimal_path, link_loads)

        if latency < best_nonminimal_latency:
            best_nonminimal_latency = latency
            best_nonminimal_path = nonminimal_path

    # UGAL decision
    if best_nonminimal_path is None:
        return minimal_path

    if minimal_latency < threshold * best_nonminimal_latency:
        return minimal_path
    else:
        return best_nonminimal_path


def valiant_select_path(
    src_host: str,
    dst_host: str,
    d: int,
    a: int,
    bias: float = 0.0
) -> list[str]:
    """
    Valiant routing with configurable bias toward non-minimal paths.

    Args:
        src_host: Source host name
        dst_host: Destination host name
        d: Number of routers per group
        a: Number of global links per router
        bias: Fraction of traffic to route non-minimally (0.0-1.0)
              0.0 = always minimal, 1.0 = always non-minimal
              0.05 = 5% non-minimal, 95% minimal

    Returns:
        Selected path as list of node names
    """
    num_groups = a + 1
    src_group, _, _ = parse_dragonfly_host(src_host)
    dst_group, _, _ = parse_dragonfly_host(dst_host)

    # Intra-group: always minimal (non-minimal makes no sense)
    if src_group == dst_group:
        return dragonfly_minimal_path(src_host, dst_host, d, a)

    # Probabilistic selection based on bias
    if random.random() >= bias:
        # Use minimal path (1 - bias probability)
        return dragonfly_minimal_path(src_host, dst_host, d, a)
    else:
        # Use non-minimal path via random intermediate group
        valid_intermediates = [
            g for g in range(num_groups)
            if g != src_group and g != dst_group
        ]
        if not valid_intermediates:
            return dragonfly_minimal_path(src_host, dst_host, d, a)

        inter_group = random.choice(valid_intermediates)
        return dragonfly_nonminimal_path(src_host, dst_host, inter_group, d, a)


def dragonfly_route(
    src_host: str,
    dst_host: str,
    algorithm: str,
    d: int,
    a: int,
    link_loads: dict | None = None,
    ugal_threshold: float = 2.0,
    valiant_bias: float = 0.0
) -> list[str]:
    """
    Main routing dispatcher for Dragonfly networks.

    Args:
        src_host: Source host name
        dst_host: Destination host name
        algorithm: Routing algorithm ('minimal', 'ugal', 'valiant')
        d: Number of routers per group
        a: Number of global links per router
        link_loads: Current link loads (required for UGAL)
        ugal_threshold: UGAL decision threshold
        valiant_bias: Valiant non-minimal bias (0.0-1.0)

    Returns:
        Path as list of node names
    """
    if algorithm == 'minimal':
        return dragonfly_minimal_path(src_host, dst_host, d, a)

    elif algorithm == 'ugal':
        if link_loads is None:
            link_loads = {}
        return ugal_select_path(
            src_host, dst_host, link_loads, d, a, ugal_threshold
        )

    elif algorithm == 'valiant':
        return valiant_select_path(src_host, dst_host, d, a, valiant_bias)

    else:
        # Default to minimal
        return dragonfly_minimal_path(src_host, dst_host, d, a)


def link_loads_for_job_dragonfly_adaptive(
    G: nx.Graph,
    job_hosts: list[str],
    tx_volume_bytes: float,
    algorithm: str,
    d: int,
    a: int,
    link_loads: dict | None = None,
    ugal_threshold: float = 2.0,
    valiant_bias: float = 0.0
) -> dict:
    """
    Compute link loads for a job using adaptive routing on Dragonfly.

    Args:
        G: NetworkX graph (for edge initialization)
        job_hosts: List of host names for this job
        tx_volume_bytes: Traffic volume per host
        algorithm: Routing algorithm ('minimal', 'ugal', 'valiant')
        d: Number of routers per group
        a: Number of global links per router
        link_loads: Global link loads (for UGAL decisions)
        ugal_threshold: UGAL decision threshold
        valiant_bias: Valiant non-minimal bias

    Returns:
        Dict {(u, v): bytes} of link loads from this job
    """
    job_loads = {tuple(sorted(edge)): 0.0 for edge in G.edges()}

    if len(job_hosts) < 2:
        return job_loads

    # All-to-all traffic: each host sends to every other host
    per_peer = tx_volume_bytes / (len(job_hosts) - 1)

    for src in job_hosts:
        for dst in job_hosts:
            if src == dst:
                continue

            path = dragonfly_route(
                src, dst, algorithm, d, a,
                link_loads=link_loads,
                ugal_threshold=ugal_threshold,
                valiant_bias=valiant_bias
            )

            for u, v in zip(path, path[1:]):
                edge = tuple(sorted((u, v)))
                if edge in job_loads:
                    job_loads[edge] += per_peer

    return job_loads
+22 −0
Original line number Diff line number Diff line
@@ -37,3 +37,25 @@ class AllocationStrategy(ValueComparableEnum):
    CONTIGUOUS = 'contiguous'
    RANDOM = 'random'
    HYBRID = 'hybrid'


class RoutingAlgorithm(ValueComparableEnum):
    """Supported network routing algorithms for HPC topologies.

    Based on routing algorithms from:
    "Study of Workload Interference with Intelligent Routing on Dragonfly"
    (Kang et al., SC22)

    MINIMAL: Always use shortest/minimal path routing.
             For Dragonfly: at most 3 hops (local-global-local).
    VALIANT: Valiant load balancing - route via random intermediate group.
             Configurable bias parameter controls minimal vs non-minimal ratio.
             valiant_bias=0.05 means 5% non-minimal, 95% minimal.
    UGAL: Universal Globally-Adaptive Load-balanced routing.
          Dynamically chooses minimal or non-minimal based on congestion.
          Uses threshold comparison: if min_latency < threshold * nonmin_latency,
          use minimal path; otherwise use non-minimal.
    """
    MINIMAL = 'minimal'
    VALIANT = 'valiant'
    UGAL = 'ugal'
Loading