Commit 189c8099 authored by Brewer, Wes's avatar Brewer, Wes
Browse files

Implement --allocation options supporting contiguous, random, and hybrid

parent f87cb7a5
Loading
Loading
Loading
Loading
+43 −0
Original line number Diff line number Diff line
@@ -107,6 +107,49 @@ Run network congestion tests outside of RAPS:

    python scripts/run_inter_job_congestion.py --config config/lassen.yaml -v

## Node Allocation Strategies

RAPS supports different node allocation strategies for studying the impact of job placement on network performance and energy consumption. These strategies are based on the research presented in ["Watch Out for the Bully! Job Interference Study on Dragonfly Network" (Yang et al., SC16)](https://doi.org/10.1109/SC.2016.63).

### Available Strategies

| Strategy | Description | Use Case |
|----------|-------------|----------|
| `contiguous` | Assigns nodes sequentially (first N available) | Maintains network locality, reduces inter-job interference |
| `random` | Randomly samples nodes from available pool | Distributes traffic for load balancing, may cause "bully" effect |
| `hybrid` | High-communication jobs get random, low-communication get contiguous | Balances benefits of both approaches |

### Usage Examples

```bash
# Contiguous allocation (default behavior)
raps run --system frontier --allocation contiguous

# Random allocation
raps run --system frontier --allocation random

# Hybrid allocation with custom threshold
raps run --system frontier --allocation hybrid --hybrid-threshold 0.5
```

### Hybrid Strategy Details

The hybrid strategy determines allocation based on a job's communication intensity, computed from its network TX/RX traces:
- Jobs with intensity >= threshold: assigned using **random** allocation
- Jobs with intensity < threshold: assigned using **contiguous** allocation

The `--hybrid-threshold` parameter (default: 0.5) controls this cutoff. A lower threshold means more jobs get random allocation.

### Combining with Network Simulation

To study the full impact of allocation strategies on network congestion and energy:

```bash
# Compare contiguous vs random allocation with network simulation
raps run --system lassen --allocation contiguous --net -t 1h -o output_contiguous
raps run --system lassen --allocation random --net -t 1h -o output_random
```

## Snapshot of extracted workload data

To reduce the expense of extracting the needed data from the telemetry parquet files,
+12 −2
Original line number Diff line number Diff line
@@ -11,7 +11,7 @@ import select
import time
import random
from raps.job import Job, JobState
from raps.policy import PolicyType
from raps.policy import PolicyType, AllocationStrategy
from raps.utils import (
    summarize_ranges,
    get_current_utilization,
@@ -260,11 +260,21 @@ class Engine:
        self.time_delta = time_delta

        self.down_nodes = summarize_ranges(self.config['DOWN_NODES'])

        # Parse allocation strategy from sim_config
        allocation_str = getattr(sim_config, 'allocation', 'contiguous')
        allocation_strategy = AllocationStrategy(allocation_str)
        hybrid_threshold = getattr(sim_config, 'hybrid_threshold', 0.5)

        self.resource_manager = ResourceManager(
            total_nodes=self.config['TOTAL_NODES'],
            down_nodes=self.config['DOWN_NODES'],
            config=self.config
            config=self.config,
            allocation_strategy=allocation_strategy,
            hybrid_threshold=hybrid_threshold
        )
        print(f"Using allocation strategy: {allocation_strategy.value}"
              + (f" (threshold={hybrid_threshold})" if allocation_strategy == AllocationStrategy.HYBRID else ""))
        # Initialize running and queue, etc.
        self.running = []
        self.queue = []
+19 −0
Original line number Diff line number Diff line
@@ -18,3 +18,22 @@ class BackfillType(ValueComparableEnum):
    GREEDY = 'greedy'
    EASY = 'easy'  # Earliest Available Start Time Yielding
    CONSERVATIVE = 'conservative'


class AllocationStrategy(ValueComparableEnum):
    """Supported node allocation strategies.

    Based on job placement policies from:
    "Watch Out for the Bully! Job Interference Study on Dragonfly Network"
    (Yang et al., SC16)

    CONTIGUOUS: Nodes assigned consecutively, filling groups/racks first.
                Minimizes network resource sharing between jobs.
    RANDOM: Nodes randomly selected from available pool.
            Distributes traffic uniformly, enables load balancing.
    HYBRID: Communication-intensive jobs get random allocation,
            less intensive jobs get contiguous allocation.
    """
    CONTIGUOUS = 'contiguous'
    RANDOM = 'random'
    HYBRID = 'hybrid'
+16 −2
Original line number Diff line number Diff line
@@ -4,15 +4,29 @@ Exports a factory that returns the appropriate manager based on config.
"""
from .default import ExclusiveNodeResourceManager
from .multitenant import MultiTenantResourceManager
from raps.policy import AllocationStrategy


def make_resource_manager(total_nodes, down_nodes, config):
def make_resource_manager(total_nodes, down_nodes, config,
                          allocation_strategy=AllocationStrategy.CONTIGUOUS,
                          hybrid_threshold=0.5):
    """
    Factory to choose between exclusive-node and multitenant managers.

    Parameters:
    - total_nodes: Total number of nodes in the system
    - down_nodes: Set of node IDs that are down/unavailable
    - config: Configuration dictionary
    - allocation_strategy: Node allocation strategy (CONTIGUOUS, RANDOM, HYBRID)
    - hybrid_threshold: For HYBRID strategy, communication intensity threshold
    """
    if config.get("multitenant", False):
        return MultiTenantResourceManager(total_nodes, down_nodes, config)
    return ExclusiveNodeResourceManager(total_nodes, down_nodes, config)
    return ExclusiveNodeResourceManager(
        total_nodes, down_nodes, config,
        allocation_strategy=allocation_strategy,
        hybrid_threshold=hybrid_threshold
    )


# Alias for backward compatibility
+108 −8
Original line number Diff line number Diff line
import random

from raps.job import JobState
from raps.policy import PolicyType
from raps.policy import PolicyType, AllocationStrategy


class ExclusiveNodeResourceManager:
    """
    Legacy exclusive-node resource manager: allocates and frees full nodes.
    Exclusive-node resource manager: allocates and frees full nodes.

    Supports different allocation strategies based on:
    "Watch Out for the Bully! Job Interference Study on Dragonfly Network"
    (Yang et al., SC16)
    """

    def __init__(self, total_nodes, down_nodes, config=None,
                 allocation_strategy=AllocationStrategy.CONTIGUOUS,
                 hybrid_threshold=None):
        """
        Initialize the resource manager.

    def __init__(self, total_nodes, down_nodes, config=None):
        Parameters:
        - total_nodes: Total number of nodes in the system
        - down_nodes: Set of node IDs that are down/unavailable
        - config: Configuration dictionary
        - allocation_strategy: Node allocation strategy (CONTIGUOUS, RANDOM, HYBRID)
        - hybrid_threshold: For HYBRID strategy, jobs with communication intensity
                           above this threshold get RANDOM allocation, below get CONTIGUOUS.
                           If None, defaults to 0.5 (median).
        """
        self.total_nodes = total_nodes
        self.down_nodes = set(down_nodes)
        self.config = config or {}
        self.allocation_strategy = allocation_strategy
        self.hybrid_threshold = hybrid_threshold if hybrid_threshold is not None else 0.5

        # Determine per-node capacities
        cfg = self.config
@@ -39,19 +61,25 @@ class ExclusiveNodeResourceManager:
        self.sys_util_history = []

    def assign_nodes_to_job(self, job, current_time, policy, node_id=None):
        """Assigns full nodes to a job (replay or count-based)."""
        """Assigns full nodes to a job using the configured allocation strategy.

        Allocation strategies:
        - CONTIGUOUS: Take first N available nodes (sequential allocation)
        - RANDOM: Randomly sample N nodes from available pool
        - HYBRID: Use job's communication intensity to decide strategy
        """
        # Ensure enough free nodes
        if len(self.available_nodes) < job.nodes_required:
            raise ValueError(f"Not enough available nodes to schedule job {job.id}",
                             f"{len(self.available_nodes)} < {job.nodes_required}")

        if policy == PolicyType.REPLAY and job.scheduled_nodes:
            # Telemetry replay: use the exact nodes
            # Telemetry replay: use the exact nodes from trace
            self.available_nodes = [n for n in self.available_nodes if n not in job.scheduled_nodes]
        else:
            # Count-based allocation: take the first N free nodes
            job.scheduled_nodes = self.available_nodes[:job.nodes_required]
            self.available_nodes = self.available_nodes[job.nodes_required:]
            # Apply allocation strategy
            job.scheduled_nodes = self._allocate_nodes(job)
            self.available_nodes = [n for n in self.available_nodes if n not in job.scheduled_nodes]

        # Mark job running
        job.start_time = current_time
@@ -59,6 +87,78 @@ class ExclusiveNodeResourceManager:
            job.end_time = current_time + job.expected_run_time  # This may be an assumption!
        job.current_state = JobState.RUNNING

    def _allocate_nodes(self, job):
        """Select nodes based on allocation strategy.

        Returns:
            List of node IDs allocated to the job.
        """
        n = job.nodes_required
        strategy = self._get_effective_strategy(job)

        if strategy == AllocationStrategy.CONTIGUOUS:
            # Take first N available nodes (maintains locality)
            return self.available_nodes[:n]
        elif strategy == AllocationStrategy.RANDOM:
            # Randomly sample N nodes (distributes traffic)
            return random.sample(self.available_nodes, n)
        else:
            raise ValueError(f"Unknown allocation strategy: {strategy}")

    def _get_effective_strategy(self, job):
        """Determine effective strategy, handling HYBRID logic.

        For HYBRID: high communication intensity -> RANDOM, low -> CONTIGUOUS
        """
        if self.allocation_strategy != AllocationStrategy.HYBRID:
            return self.allocation_strategy

        # HYBRID: decide based on job's communication intensity
        intensity = self._compute_communication_intensity(job)
        if intensity >= self.hybrid_threshold:
            return AllocationStrategy.RANDOM
        else:
            return AllocationStrategy.CONTIGUOUS

    def _compute_communication_intensity(self, job):
        """Compute normalized communication intensity for a job.

        Uses network TX/RX traces to estimate how communication-intensive
        the job is. Returns value in [0, 1] range.
        """
        import numpy as np

        # Get network traces
        ntx = getattr(job, 'ntx_trace', None)
        nrx = getattr(job, 'nrx_trace', None)

        # Compute average network activity
        total = 0.0
        count = 0

        for trace in [ntx, nrx]:
            if trace is not None:
                if isinstance(trace, (list, np.ndarray)) and len(trace) > 0:
                    total += np.mean(trace)
                    count += 1
                elif isinstance(trace, (int, float)):
                    total += trace
                    count += 1

        if count == 0:
            # No network data available, default to contiguous (conservative)
            return 0.0

        avg_network = total / count

        # Normalize: this is a simple heuristic, can be tuned
        # Assumes network values are in some reasonable range
        # For now, use a simple sigmoid-like normalization
        # Values > 1.0 are considered "high intensity"
        intensity = min(1.0, avg_network / 1.0) if avg_network > 0 else 0.0

        return intensity

    def free_nodes_from_job(self, job):
        """Frees the full nodes previously allocated to a job."""
        if getattr(job, 'scheduled_nodes', None):
Loading