Commit 87056785 authored by Brewer, Wes's avatar Brewer, Wes
Browse files

Add `-w allocation_test synthetic` workload and `scripts/run_allocation_test.py` test script

parent 189c8099
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -144,7 +144,7 @@ class SimConfig(RAPSBaseModel, abc.ABC):
    # Workload arguments (TODO split into separate model)
    workload: Literal['random', 'benchmark', 'peak', 'idle', 'synthetic',
                      'multitenant', 'replay', 'randomAI', 'network_test',
                      'inter_job_congestion', 'calculon', 'hpl'] = "random"
                      'inter_job_congestion', 'allocation_test', 'calculon', 'hpl'] = "random"

    """ Type of synthetic workload """
    multimodal: list[float] = [1.0]
+2 −0
Original line number Diff line number Diff line
@@ -18,6 +18,7 @@ from .live import continuous_job_generation
from .multitenant import MultitenantWorkload
from .network import NetworkTestWorkload
from .inter_job_congestion import InterJobCongestionWorkload
from .allocation_test import AllocationTestWorkload
from .utils import plot_job_hist


@@ -58,6 +59,7 @@ class Workload(
    MultitenantWorkload,
    NetworkTestWorkload,
    InterJobCongestionWorkload,
    AllocationTestWorkload,
    Calculon,
    HPL
):
+220 −0
Original line number Diff line number Diff line
"""
Allocation Test Workload

Generates synthetic jobs with varying communication intensities to study the
impact of different node allocation strategies (contiguous, random, hybrid).

Based on the "Bully" phenomenon described in:
"Watch Out for the Bully! Job Interference Study on Dragonfly Network"
(Yang et al., SC16)

Key design:
- Jobs do NOT have pre-assigned scheduled_nodes, so allocation strategy is applied
- Jobs have network traces (ntx_trace, nrx_trace) to enable congestion measurement
- Mix of high-comm "bully" jobs and low-comm "victim" jobs
"""

import random
from typing import List

from raps.job import Job, job_dict
from raps.network import max_throughput_per_tick


class AllocationTestWorkload:
    """Workload generator for allocation strategy testing."""

    def allocation_test(self, args) -> List[Job]:
        """
        Generate jobs with varying communication intensities.

        Job mix (configurable):
        - High-comm jobs ("bullies"): 30% - high network traffic
        - Medium-comm jobs: 40% - moderate network traffic
        - Low-comm jobs ("victims"): 30% - low network traffic

        All jobs have nodes_required set but NOT scheduled_nodes,
        allowing the allocation strategy to determine placement.
        """
        legacy_cfg = self.config_map[self.partitions[0]]
        trace_quanta = legacy_cfg.get("TRACE_QUANTA", 15)

        return generate_allocation_test_jobs(
            legacy_cfg=legacy_cfg,
            num_jobs=args.numjobs,
            trace_quanta=trace_quanta,
            seed=args.seed,
            # Configurable parameters (could add CLI args later)
            high_comm_fraction=0.3,
            med_comm_fraction=0.4,
            low_comm_fraction=0.3,
        )


def generate_allocation_test_jobs(
    legacy_cfg: dict,
    num_jobs: int = 60,
    trace_quanta: int = 15,
    seed: int = None,
    high_comm_fraction: float = 0.3,
    med_comm_fraction: float = 0.4,
    low_comm_fraction: float = 0.3,
) -> List[Job]:
    """
    Generate synthetic jobs with varying communication intensities.

    Parameters:
    - legacy_cfg: System configuration dictionary
    - num_jobs: Total number of jobs to generate
    - trace_quanta: Time quantum for traces (seconds)
    - seed: Random seed for reproducibility
    - high_comm_fraction: Fraction of high-communication "bully" jobs
    - med_comm_fraction: Fraction of medium-communication jobs
    - low_comm_fraction: Fraction of low-communication "victim" jobs
    """
    if seed is not None:
        random.seed(seed)

    total_nodes = int(legacy_cfg["TOTAL_NODES"])
    max_nodes_per_job = min(
        int(legacy_cfg.get("MAX_NODES_PER_JOB", 128)),
        total_nodes // 4  # Don't let single job take > 25% of system
    )
    min_nodes_per_job = max(1, int(legacy_cfg.get("MIN_NODES_PER_JOB", 2)))

    # Compute bandwidth reference for network traces
    per_tick_bw = max_throughput_per_tick(legacy_cfg, trace_quanta)

    # Communication intensity levels (fraction of max bandwidth)
    HIGH_COMM_FACTOR = 0.7   # Bullies use 70% of available bandwidth
    MED_COMM_FACTOR = 0.3    # Medium jobs use 30%
    LOW_COMM_FACTOR = 0.05   # Victims use 5%

    # Job counts by category
    n_high = int(num_jobs * high_comm_fraction)
    n_med = int(num_jobs * med_comm_fraction)
    n_low = num_jobs - n_high - n_med

    print(f"[allocation_test] Generating {num_jobs} jobs:")
    print(f"  - High-comm (bullies): {n_high} jobs @ {HIGH_COMM_FACTOR*100:.0f}% bandwidth")
    print(f"  - Medium-comm: {n_med} jobs @ {MED_COMM_FACTOR*100:.0f}% bandwidth")
    print(f"  - Low-comm (victims): {n_low} jobs @ {LOW_COMM_FACTOR*100:.0f}% bandwidth")
    print(f"  - Max throughput/tick: {per_tick_bw:.2e} bytes")

    jobs: List[Job] = []
    jid = 1
    submit_time = 0

    # Generate high-comm jobs (bullies) - typically larger jobs
    for _ in range(n_high):
        nodes = random.randint(max(min_nodes_per_job, max_nodes_per_job // 4), max_nodes_per_job)
        net_traffic = HIGH_COMM_FACTOR * per_tick_bw
        job = make_allocation_test_job(
            jid=jid,
            nodes_required=nodes,
            net_traffic=net_traffic,
            trace_quanta=trace_quanta,
            submit_time=submit_time,
            job_category="high_comm",
            legacy_cfg=legacy_cfg,
        )
        jobs.append(job)
        jid += 1
        submit_time += random.randint(1, 30)  # Stagger arrivals

    # Generate medium-comm jobs
    for _ in range(n_med):
        nodes = random.randint(min_nodes_per_job, max_nodes_per_job // 2)
        net_traffic = MED_COMM_FACTOR * per_tick_bw
        job = make_allocation_test_job(
            jid=jid,
            nodes_required=nodes,
            net_traffic=net_traffic,
            trace_quanta=trace_quanta,
            submit_time=submit_time,
            job_category="med_comm",
            legacy_cfg=legacy_cfg,
        )
        jobs.append(job)
        jid += 1
        submit_time += random.randint(1, 30)

    # Generate low-comm jobs (victims) - typically smaller jobs
    for _ in range(n_low):
        nodes = random.randint(min_nodes_per_job, max(min_nodes_per_job + 1, max_nodes_per_job // 4))
        net_traffic = LOW_COMM_FACTOR * per_tick_bw
        job = make_allocation_test_job(
            jid=jid,
            nodes_required=nodes,
            net_traffic=net_traffic,
            trace_quanta=trace_quanta,
            submit_time=submit_time,
            job_category="low_comm",
            legacy_cfg=legacy_cfg,
        )
        jobs.append(job)
        jid += 1
        submit_time += random.randint(1, 30)

    # Shuffle to interleave job types (more realistic)
    random.shuffle(jobs)

    # Re-assign submit times after shuffle
    submit_time = 0
    for job in jobs:
        job.submit_time = submit_time
        submit_time += random.randint(5, 60)

    return jobs


def make_allocation_test_job(
    jid: int,
    nodes_required: int,
    net_traffic: float,
    trace_quanta: int,
    submit_time: int,
    job_category: str,
    legacy_cfg: dict,
) -> Job:
    """
    Create a job for allocation testing.

    Key: scheduled_nodes is NOT set, allowing allocation strategy to work.
    """
    # Job duration: 5-15 minutes
    job_duration = random.randint(300, 900)
    trace_len = max(1, job_duration // trace_quanta)

    # CPU/GPU utilization (moderate, focus is on network)
    gpus_per_node = legacy_cfg.get("GPUS_PER_NODE", 4)
    cpu_util = random.uniform(0.3, 0.7)
    gpu_util = random.uniform(0.4, 0.8) * gpus_per_node

    # Network traces - constant rate for simplicity
    # (could make time-varying for more realism)
    ntx_trace = [net_traffic] * trace_len
    nrx_trace = [net_traffic] * trace_len

    # CPU/GPU traces
    cpu_trace = [cpu_util] * trace_len
    gpu_trace = [gpu_util] * trace_len

    return Job(job_dict(
        id=jid,
        name=f"{job_category}_{jid}",
        account="allocation_test",
        nodes_required=nodes_required,
        # NOTE: scheduled_nodes is NOT set - this is the key difference!
        # The allocation strategy will assign nodes when the job is scheduled.
        scheduled_nodes=[],
        cpu_trace=cpu_trace,
        gpu_trace=gpu_trace,
        ntx_trace=ntx_trace,
        nrx_trace=nrx_trace,
        trace_quanta=trace_quanta,
        submit_time=submit_time,
        expected_run_time=job_duration,
        time_limit=job_duration * 2,
        end_state="COMPLETED",
    ))
+287 −0
Original line number Diff line number Diff line
#!/usr/bin/env python3
"""
RAPS Allocation Strategy Test (Bully Effect Study)
===================================================

This script tests different node allocation strategies and their impact on
network congestion, based on the "Bully" phenomenon from:
"Watch Out for the Bully! Job Interference Study on Dragonfly Network"
(Yang et al., SC16)

It bypasses the full RAPS simulation for fast iteration, directly computing
network congestion for jobs placed using different allocation strategies.

Usage:
    python scripts/run_allocation_test.py --config config/lassen.yaml

Example comparing all strategies:
    python scripts/run_allocation_test.py --config config/lassen.yaml --compare -v

Example single strategy:
    python scripts/run_allocation_test.py --config config/lassen.yaml --allocation random -v
"""

from __future__ import annotations
import argparse
import random
from pathlib import Path
from typing import List

from raps.job import Job
from raps.system_config import get_system_config
from raps.policy import AllocationStrategy
from raps.network import (
    NetworkModel,
    simulate_inter_job_congestion,
)
from raps.workloads.allocation_test import generate_allocation_test_jobs


def allocate_nodes_to_jobs(
    jobs: List[Job],
    available_nodes: List[int],
    strategy: AllocationStrategy,
    hybrid_threshold: float = 0.5,
) -> List[Job]:
    """
    Apply allocation strategy to assign nodes to jobs.

    This simulates what the resource manager would do, but for a static
    snapshot where all jobs run concurrently.
    """
    import numpy as np

    # Sort available nodes for contiguous allocation
    available = sorted(available_nodes.copy())

    for job in jobs:
        n = job.nodes_required

        if n > len(available):
            print(f"[WARN] Job {job.id} needs {n} nodes but only {len(available)} available, skipping")
            job.scheduled_nodes = []
            continue

        # Determine effective strategy for this job
        if strategy == AllocationStrategy.HYBRID:
            # Compute communication intensity from network traces
            intensity = _compute_intensity(job)
            effective_strategy = (
                AllocationStrategy.RANDOM if intensity >= hybrid_threshold
                else AllocationStrategy.CONTIGUOUS
            )
        else:
            effective_strategy = strategy

        # Apply allocation
        if effective_strategy == AllocationStrategy.CONTIGUOUS:
            job.scheduled_nodes = available[:n]
        else:  # RANDOM
            job.scheduled_nodes = random.sample(available, n)

        # Remove allocated nodes from available pool
        available = [node for node in available if node not in job.scheduled_nodes]

    return jobs


def _compute_intensity(job: Job) -> float:
    """Compute normalized communication intensity for hybrid strategy."""
    import numpy as np

    ntx = getattr(job, 'ntx_trace', None)
    nrx = getattr(job, 'nrx_trace', None)

    total = 0.0
    count = 0

    for trace in [ntx, nrx]:
        if trace is not None:
            if isinstance(trace, (list, np.ndarray)) and len(trace) > 0:
                total += np.mean(trace)
                count += 1
            elif isinstance(trace, (int, float)):
                total += trace
                count += 1

    if count == 0:
        return 0.0

    avg_network = total / count
    # Normalize based on typical bandwidth values
    # This threshold should be tuned based on your workload
    intensity = min(1.0, avg_network / 1e11)  # Normalize to ~100GB/s reference
    return intensity


def run_allocation_test(
    legacy_cfg: dict,
    strategy: AllocationStrategy,
    num_jobs: int = 60,
    seed: int = 42,
    hybrid_threshold: float = 0.5,
    verbose: bool = False,
    debug: bool = False,
) -> dict:
    """
    Run allocation test for a single strategy.

    Returns congestion statistics.
    """
    random.seed(seed)

    # Generate jobs (without node assignments)
    jobs = generate_allocation_test_jobs(
        legacy_cfg=legacy_cfg,
        num_jobs=num_jobs,
        seed=seed,
    )

    # Get available nodes
    total_nodes = legacy_cfg["TOTAL_NODES"]
    down_nodes = set(legacy_cfg.get("DOWN_NODES", []))
    available_nodes = [i for i in range(total_nodes) if i not in down_nodes]

    # Apply allocation strategy
    jobs = allocate_nodes_to_jobs(
        jobs=jobs,
        available_nodes=available_nodes,
        strategy=strategy,
        hybrid_threshold=hybrid_threshold,
    )

    # Filter out jobs that couldn't be allocated
    jobs = [j for j in jobs if j.scheduled_nodes]

    if verbose:
        print(f"\n[{strategy.value.upper()}] Allocated {len(jobs)} jobs")
        # Show some example allocations
        for job in jobs[:5]:
            nodes_preview = job.scheduled_nodes[:5]
            suffix = "..." if len(job.scheduled_nodes) > 5 else ""
            print(f"  Job {job.id} ({job.name}): {len(job.scheduled_nodes)} nodes -> {nodes_preview}{suffix}")

    # Initialize network model
    net = NetworkModel(
        config=legacy_cfg,
        available_nodes=available_nodes,
        debug=debug,
    )

    # Simulate congestion
    stats = simulate_inter_job_congestion(net, jobs, legacy_cfg, debug=debug)

    return stats


def print_comparison_table(results: dict):
    """Print a comparison table of all strategies."""
    print("\n" + "=" * 70)
    print("ALLOCATION STRATEGY COMPARISON")
    print("=" * 70)
    print(f"{'Strategy':<15} {'Max Cong':>12} {'Mean Cong':>12} {'Std Dev':>12}")
    print("-" * 70)

    for strategy, stats in results.items():
        print(f"{strategy:<15} {stats['max']:>12.2f} {stats['mean']:>12.2f} {stats['std_dev']:>12.2f}")

    print("=" * 70)

    # Analysis
    strategies = list(results.keys())
    if len(strategies) >= 2:
        max_congs = {s: results[s]['max'] for s in strategies}
        best = min(max_congs, key=max_congs.get)
        worst = max(max_congs, key=max_congs.get)

        improvement = (max_congs[worst] - max_congs[best]) / max_congs[worst] * 100
        print(f"\nBest strategy: {best} (lowest max congestion)")
        print(f"Improvement over {worst}: {improvement:.1f}%")


def main():
    parser = argparse.ArgumentParser(
        description="Test allocation strategies and their impact on network congestion.",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog=__doc__
    )
    parser.add_argument("--config", required=True, help="Path to system YAML (e.g., config/lassen.yaml)")
    parser.add_argument("--allocation", choices=["contiguous", "random", "hybrid"],
                        default="contiguous", help="Allocation strategy to test")
    parser.add_argument("--compare", action="store_true",
                        help="Compare all allocation strategies")
    parser.add_argument("--jobs", type=int, default=60, help="Number of synthetic jobs")
    parser.add_argument("--seed", type=int, default=42, help="Random seed for reproducibility")
    parser.add_argument("--hybrid-threshold", type=float, default=0.5,
                        help="Threshold for hybrid strategy (0-1)")
    parser.add_argument("--debug", action="store_true", help="Enable network debug output")
    parser.add_argument("--verbose", "-v", action="store_true", help="Print detailed statistics")
    args = parser.parse_args()

    # Load config
    sys_cfg = get_system_config(args.config)
    legacy = sys_cfg.get_legacy()

    topology = legacy.get("TOPOLOGY", "").lower()
    if not topology:
        raise ValueError(f"Could not infer topology from {args.config}")

    print(f"[INFO] System: {args.config}")
    print(f"[INFO] Topology: {topology}")
    print(f"[INFO] Total nodes: {legacy['TOTAL_NODES']}")
    print(f"[INFO] Jobs: {args.jobs}")

    if args.compare:
        # Compare all strategies
        results = {}
        for strategy in AllocationStrategy:
            print(f"\n{'='*50}")
            print(f"Testing {strategy.value.upper()} allocation...")
            print('='*50)

            stats = run_allocation_test(
                legacy_cfg=legacy,
                strategy=strategy,
                num_jobs=args.jobs,
                seed=args.seed,
                hybrid_threshold=args.hybrid_threshold,
                verbose=args.verbose,
                debug=args.debug,
            )
            results[strategy.value] = stats

            print(f"[RESULT] {strategy.value}: max_congestion={stats['max']:.2f}, "
                  f"mean={stats['mean']:.2f}")

        print_comparison_table(results)
    else:
        # Single strategy test
        strategy = AllocationStrategy(args.allocation)

        stats = run_allocation_test(
            legacy_cfg=legacy,
            strategy=strategy,
            num_jobs=args.jobs,
            seed=args.seed,
            hybrid_threshold=args.hybrid_threshold,
            verbose=args.verbose,
            debug=args.debug,
        )

        print(f"\n[RESULT] strategy={args.allocation}, max_congestion={stats['max']:.2f}, "
              f"mean={stats['mean']:.2f}")

        if args.verbose:
            print("\n--- Detailed Network Congestion Stats ---")
            print(f"  Max Congestion (Worst Link): {stats['max']:.2f}")
            print(f"  Mean Link Congestion:        {stats['mean']:.2f}")
            print(f"  Min Link Congestion:         {stats['min']:.2f}")
            print(f"  Std Dev of Congestion:       {stats['std_dev']:.2f}")
            if 'top_links' in stats:
                print("\n  Top 10 Most Congested Links:")
                for (link, congestion) in stats['top_links'][:10]:
                    print(f"    - Link {link}: {congestion:.2f}")


if __name__ == "__main__":
    main()