Commit 961abd35 authored by Brewer, Wes's avatar Brewer, Wes
Browse files

Add co-scheduling prototype and experiment plot

parent dc5a4b62
Loading
Loading
Loading
Loading
+115 −0
Original line number Diff line number Diff line
# Co-Scheduling Session Summary

Date: 2026-02-05
Branch: `coscheduling`

## What We Built

We added a first-pass co-scheduling mechanism to the federation metascheduler. Key elements:

- **Co-scheduling job fields** in `raps/metasched/types.py`:
  - `co_schedule`, `sites_required`, `reserve_ttl_s`, `start_window_s`.
- **Reservation protocol** (2‑phase reserve/commit) between MetaScheduler and site workers:
  - `RESERVE_REQUEST`, `RESERVE_OK`, `RESERVE_DENY`, `RESERVE_COMMIT`, `RESERVE_ABORT`, `RESERVE_EXPIRE`.
- **Policy hooks** in `MetaScheduler` to select sites for co‑scheduling:
  - `rr`, `least_queue`, `most_free`, `window` (retrying reserve).
- **STARTED events** emitted by site workers to observe actual start times per site.

## Experiment Script

A simple experiment script was added to compare co‑scheduling policies and generate a plot:

- Script: `experiments/coscheduling_experiment.py`
- Output PNG: `experiments/coscheduling_wait.png`

The script:
- Spins up 3 simulated sites (Frontier, Aurora, Perlmutter).
- Submits background (single-site) jobs to create load.
- Submits co‑scheduled jobs across 2 or 3 sites.
- Collects `STARTED` events to compute queue wait time.
- Plots CDFs of co‑scheduled wait time per policy.

## How to Run the Experiments

```bash
source /opt/venvs/exadigit/bin/activate
python experiments/coscheduling_experiment.py \
  --num-single 30 \
  --num-cosched 12 \
  --policies rr least_queue most_free window \
  --sites-required 2 3 \
  --output experiments/coscheduling_wait.png
```

Notes:
- The experiment uses multiprocessing; it must run outside the sandboxed environment (permissions prompt required).
- You can increase `--num-single` or `--num-cosched` for more stable statistics.
- The plot output is a PNG suitable for quick inclusion in the paper.

## What the Plot Shows

The plot is a **CDF of co‑scheduled queue wait times**.

Interpretation:
- **X‑axis**: queue wait time (simulation ticks).
- **Y‑axis (CDF)**: fraction of co‑scheduled jobs that started within that wait time.
- **Left‑shifted, steeper curves are better** (shorter waits for more jobs).

## Results and Discussion (Current Run)

### Co‑schedule across 2 sites
- `least_queue` and `most_free` are left‑shifted and steep, meaning most co‑scheduled jobs start sooner with tighter variance.
- `rr` is right‑shifted and flatter, showing longer waits and higher variability.
- `window` helps relative to `rr` but does not beat `least_queue`/`most_free`.

### Co‑schedule across 3 sites
- Ordering is clearer: `rr` is best, then `least_queue`, then `most_free`, then `window`.
- This suggests greedy capacity‑aware selection can backfire when synchronizing more sites; simple spreading (RR) can reduce synchronization bottlenecks.

### Why RR “breaks down” in 2‑site case
- RR is not load‑aware. It often pairs a congested site with a free one, so the job waits for the bottleneck.
- Load‑aware policies avoid that by selecting sites that are simultaneously less loaded.

### Caveats
- This is **one seed** and a modest job count; results are not statistically robust yet.
- The current `window` policy is a basic retry loop and **not** a true start‑time window search.
- Background load is randomized and can bias outcomes.

## Suggested Next Steps

1. **Add real start‑window search** (try multiple simulated time slots) instead of quick retries.
2. **Run multiple seeds** and plot mean + error bars to make results paper‑quality.
3. **Add bar chart** of median + 95th percentile wait alongside the CDF.

## TikZ Co‑Scheduling Schematic (Reserve/Commit + Synchronized Start)

```tex
\begin{tikzpicture}[
    font=\sffamily,
    node/.style={rectangle, rounded corners=3pt, draw=black!60, fill=gray!5, inner sep=4pt},
    arrow/.style={-{Stealth[length=3mm]}, thick, color=black!70},
    timeline/.style={thick, color=black!60}
]
\node (meta) [node] {Meta-Scheduler};
\node (siteA) [node, right=5.2cm of meta] {Site A};
\node (siteB) [node, below=1.2cm of siteA] {Site B};

\draw[timeline] (siteA.east) -- ++(4.5,0);
\draw[timeline] (siteB.east) -- ++(4.5,0);

\draw[arrow] (meta.east) -- node[above, sloped]{RESERVE\_REQ} (siteA.west);
\draw[arrow] (meta.east) -- node[above, sloped]{RESERVE\_REQ} (siteB.west);

\draw[arrow] (siteA.west) -- ++(-1.7,0.3) node[left]{RESERVE\_OK};
\draw[arrow] (siteB.west) -- ++(-1.7,-0.3) node[left]{RESERVE\_OK};

\draw[arrow] (meta.east) -- node[above, sloped]{COMMIT} (siteA.west);
\draw[arrow] (meta.east) -- node[above, sloped]{COMMIT} (siteB.west);

\draw[thick, color=blue!70] ($(siteA.east)+(2.0,0)$) -- ++(0,0.25)
    node[above, color=blue!70]{Start};
\draw[thick, color=blue!70] ($(siteB.east)+(2.0,0)$) -- ++(0,0.25)
    node[above, color=blue!70]{Start};
\draw[thick, color=blue!70, dashed] ($(siteA.east)+(2.0,0)$) -- ($(siteB.east)+(2.0,0)$);
\end{tikzpicture}
```
+203 −0
Original line number Diff line number Diff line
#!/usr/bin/env python3
import argparse
import random
import time
from typing import Dict, List

import numpy as np
import matplotlib.pyplot as plt

from raps.metasched.iam import AccessToken, IAMPolicyEngine
from raps.metasched.metascheduler import MetaScheduler
from raps.metasched.types import FedJob


DEFAULT_POLICIES = ["rr", "least_queue", "most_free", "window"]
DEFAULT_SITES_REQUIRED = [2, 3]


def _build_token():
    return AccessToken(
        subject="paper@federation",
        roles={"researcher"},
        scopes={"federation:submit"},
        allowed_sites={"frontier", "aurora", "perlmutter"},
        max_nodes=256,
        max_wall_time_s=7200,
        expiry_epoch_s=int(time.time()) + 3600,
    )


def _sites_config():
    return {
        "frontier": "config/frontier.yaml",
        "aurora": "config/aurora.yaml",
        "perlmutter": "config/perlmutter.yaml",
    }


def _iam_policy():
    return IAMPolicyEngine(
        site_trust_tiers={
            "frontier": 3,
            "aurora": 2,
            "perlmutter": 1,
        }
    )


def _submit_background_jobs(ms: MetaScheduler, token: AccessToken, num_single: int):
    for i in range(num_single):
        nodes = random.choice([32, 64, 96, 128])
        wall = random.choice([1200, 1800, 2400, 3600])
        job = FedJob(
            nodes_required=nodes,
            wall_time_s=wall,
            name=f"bg-{i}",
            meta={
                "account": "federated",
                "required_trust_tier": 1,
            },
        )
        ms.submit(job, token=token)
        time.sleep(0.01)


def _submit_cosched_jobs(
    ms: MetaScheduler,
    token: AccessToken,
    num_cosched: int,
    sites_required: int,
) -> Dict[str, Dict]:
    cosched_jobs: Dict[str, Dict] = {}
    for i in range(num_cosched):
        submit_sim = ms.estimate_sim_time()
        job = FedJob(
            nodes_required=32,
            wall_time_s=1800,
            name=f"cosched-{sites_required}-{i}",
            co_schedule=True,
            sites_required=sites_required,
            reserve_ttl_s=30,
            meta={
                "account": "federated",
                "required_trust_tier": 1,
                "submit_sim_time": submit_sim,
            },
        )
        try:
            ms.submit_coscheduled(job, token=token, max_attempts=3)
            cosched_jobs[job.job_id] = {
                "submit_sim_time": submit_sim,
                "started": {},
                "failed": False,
            }
        except RuntimeError:
            cosched_jobs[job.job_id] = {
                "submit_sim_time": submit_sim,
                "started": {},
                "failed": True,
            }
        time.sleep(0.02)
    return cosched_jobs


def run_trial(policy: str, sites_required: int, num_single: int, num_cosched: int, seed: int) -> List[int]:
    random.seed(seed)
    np.random.seed(seed)

    ms = MetaScheduler(
        _sites_config(),
        iam_policy=_iam_policy(),
        policy=policy,
        cosched_policy=policy,
    )
    token = _build_token()

    ms.start()
    t0 = time.time()
    while time.time() - t0 < 0.5:
        ms.poll_status()
        time.sleep(0.05)

    _submit_background_jobs(ms, token, num_single)
    cosched_jobs = _submit_cosched_jobs(ms, token, num_cosched, sites_required)

    deadline = time.time() + 12.0
    while time.time() < deadline:
        for msg in ms.poll_status():
            if msg.get("event") != "STARTED":
                continue
            job_id = msg.get("job_id")
            if job_id not in cosched_jobs:
                continue
            cosched_jobs[job_id]["started"][msg.get("site")] = msg.get("sim_time", 0)
        done = True
        for rec in cosched_jobs.values():
            if rec.get("failed"):
                continue
            if len(rec.get("started", {})) < sites_required:
                done = False
                break
        if done:
            break
        time.sleep(0.05)

    ms.stop()

    waits = []
    for rec in cosched_jobs.values():
        if rec.get("failed"):
            continue
        if len(rec.get("started", {})) >= sites_required:
            start_time = max(rec["started"].values())
            wait = max(0, int(start_time) - int(rec["submit_sim_time"]))
            waits.append(wait)
    return waits


def plot_cdfs(results: Dict[int, Dict[str, List[int]]], output_path: str):
    fig, axes = plt.subplots(1, len(results), figsize=(11, 4), sharey=True)
    if len(results) == 1:
        axes = [axes]

    for ax, (sites_required, policy_data) in zip(axes, results.items()):
        for policy, waits in policy_data.items():
            if not waits:
                continue
            data = np.sort(np.array(waits))
            y = np.arange(1, len(data) + 1) / len(data)
            ax.plot(data, y, label=policy)
        ax.set_title(f"Co-schedule across {sites_required} sites")
        ax.set_xlabel("Queue wait (sim ticks)")
        ax.grid(True, alpha=0.3)

    axes[0].set_ylabel("CDF")
    axes[-1].legend(frameon=False)
    fig.tight_layout()
    fig.savefig(output_path, dpi=200)


def main():
    parser = argparse.ArgumentParser(description="Co-scheduling policy experiment")
    parser.add_argument("--policies", nargs="+", default=DEFAULT_POLICIES)
    parser.add_argument("--sites-required", nargs="+", type=int, default=DEFAULT_SITES_REQUIRED)
    parser.add_argument("--num-single", type=int, default=30)
    parser.add_argument("--num-cosched", type=int, default=12)
    parser.add_argument("--seed", type=int, default=7)
    parser.add_argument("--output", default="experiments/coscheduling_wait.png")
    args = parser.parse_args()

    results: Dict[int, Dict[str, List[int]]] = {}
    for sites_required in args.sites_required:
        results[sites_required] = {}
        for policy in args.policies:
            waits = run_trial(policy, sites_required, args.num_single, args.num_cosched, args.seed)
            results[sites_required][policy] = waits

    plot_cdfs(results, args.output)
    print(f"wrote {args.output}")


if __name__ == "__main__":
    main()
+145 KiB
Loading image diff...
+179 −14
Original line number Diff line number Diff line
from multiprocessing import Process, Queue
from typing import Dict, List, Optional
import time
import uuid
from .types import FedJob
from .site_worker import site_worker_main
from .iam import AccessToken, IAMPolicyEngine

class MetaScheduler:
    def __init__(self, sites: Dict[str, str], iam_policy: Optional[IAMPolicyEngine] = None):
    def __init__(
        self,
        sites: Dict[str, str],
        iam_policy: Optional[IAMPolicyEngine] = None,
        policy: str = "rr",
        cosched_policy: Optional[str] = None,
    ):
        """
        sites: {site_name: sim_config_path}
        """
@@ -17,6 +25,10 @@ class MetaScheduler:
        self._rr = 0
        self._site_list = list(self.sites.keys())
        self.iam_policy = iam_policy
        self.policy = policy
        self.cosched_policy = cosched_policy or policy
        self._status_backlog: List[Dict] = []
        self.site_metrics: Dict[str, Dict] = {}
        assert len(self._site_list) > 0, "No sites configured"

    def start(self):
@@ -49,21 +61,15 @@ class MetaScheduler:
        Simplest policy: choose the site with the shortest inbound queue length.
        Queue.qsize() is approximate on some platforms, but adequate for a first pass.
        """
        #best_site = None
        #best_q = None
        #for site, q in self.job_queues.items():
        #    qs = q.qsize()
        #    if best_q is None or qs < best_q:
        #        best_q = qs
        #        best_site = site
        #assert best_site is not None
        #return best_site
        candidate_sites = candidates if candidates is not None else self._site_list
        if not candidate_sites:
            raise RuntimeError(f"No candidate sites available for job {job.job_id}")
        if self.policy == "rr":
            site = candidate_sites[self._rr % len(candidate_sites)]
            self._rr += 1
            return site
        ranked = self._rank_sites(candidate_sites, self.policy)
        return ranked[0] if ranked else candidate_sites[0]

    def submit(self, job: FedJob, token: Optional[AccessToken] = None) -> str:
        candidate_sites = self._site_list
@@ -99,10 +105,169 @@ class MetaScheduler:
        self.job_queues[site].put(job.__dict__)
        return site

    def _rank_sites(self, candidates: List[str], policy: str) -> List[str]:
        if policy == "least_queue":
            return sorted(candidates, key=lambda s: self.job_queues[s].qsize())
        if policy == "most_free":
            return sorted(
                candidates,
                key=lambda s: self.site_metrics.get(s, {}).get("free_nodes", 0),
                reverse=True,
            )
        if policy == "random":
            ranked = list(candidates)
            import random
            random.shuffle(ranked)
            return ranked
        return list(candidates)

    def _select_cosched_sites(self, candidates: List[str], count: int, policy: str) -> List[str]:
        if count <= 0:
            raise ValueError("sites_required must be > 0")
        if len(candidates) < count:
            raise RuntimeError(f"Not enough candidate sites for co-scheduling: need {count}, have {len(candidates)}")
        if policy == "rr":
            start = self._rr % len(candidates)
            chosen = []
            for i in range(count):
                chosen.append(candidates[(start + i) % len(candidates)])
            self._rr += count
            return chosen
        ranked = self._rank_sites(candidates, policy)
        return ranked[:count]

    def _drain_status_until(self, deadline_s: float, request_id: str, expected_sites: List[str]) -> Dict[str, Dict]:
        results: Dict[str, Dict] = {}
        while time.time() < deadline_s and len(results) < len(expected_sites):
            timeout = max(0.0, deadline_s - time.time())
            if timeout == 0.0:
                break
            try:
                msg = self.status_q.get(timeout=timeout)
            except Exception:
                break
            self._ingest_status(msg)
            if msg.get("request_id") == request_id and msg.get("site") in expected_sites:
                results[msg["site"]] = msg
            else:
                self._status_backlog.append(msg)
        return results

    def submit_coscheduled(
        self,
        job: FedJob,
        token: Optional[AccessToken] = None,
        max_attempts: int = 3,
        policy: Optional[str] = None,
    ) -> List[str]:
        if not job.co_schedule or job.sites_required <= 1:
            return [self.submit(job, token=token)]

        candidate_sites = self._site_list
        if self.iam_policy is not None:
            eligible = sorted(self.iam_policy.eligible_sites(job, token, self._site_list))
            if not eligible:
                reason = "no authorized site for co-scheduled job"
                self.status_q.put({
                    "event": "IAM_DENY",
                    "job_id": job.job_id,
                    "subject": getattr(token, "subject", "unknown"),
                    "reason": reason,
                })
                raise PermissionError(f"IAM authorization denied: {reason}")
            candidate_sites = eligible

        if len(candidate_sites) < job.sites_required:
            raise RuntimeError(f"Not enough eligible sites for co-scheduling: need {job.sites_required}, have {len(candidate_sites)}")

        chosen_policy = policy or self.cosched_policy
        if chosen_policy == "window":
            max_attempts = max(max_attempts, 6)

        for attempt in range(max_attempts):
            target_sites = self._select_cosched_sites(candidate_sites, job.sites_required, chosen_policy if chosen_policy != "window" else "most_free")
            request_id = str(uuid.uuid4())
            ttl_s = max(5, int(job.reserve_ttl_s))
            deadline_s = time.time() + ttl_s

            for site in target_sites:
                self.job_queues[site].put({
                    "type": "RESERVE_REQUEST",
                    "request_id": request_id,
                    "job": job.__dict__,
                    "ttl_s": ttl_s,
                })

            responses = self._drain_status_until(deadline_s, request_id, target_sites)
            if len(responses) < len(target_sites):
                # Timeout: abort any partial reservations
                for site in target_sites:
                    self.job_queues[site].put({
                        "type": "RESERVE_ABORT",
                        "request_id": request_id,
                        "job_id": job.job_id,
                    })
                if chosen_policy == "window":
                    time.sleep(0.1)
                continue

            denied = [s for s, msg in responses.items() if msg.get("event") != "RESERVE_OK"]
            if denied:
                for site in target_sites:
                    self.job_queues[site].put({
                        "type": "RESERVE_ABORT",
                        "request_id": request_id,
                        "job_id": job.job_id,
                    })
                if chosen_policy == "window":
                    time.sleep(0.1)
                continue

            # Commit all sites
            for site in target_sites:
                self.job_queues[site].put({
                    "type": "RESERVE_COMMIT",
                    "request_id": request_id,
                    "job_id": job.job_id,
                })
            return target_sites

        raise RuntimeError(f"Co-scheduling failed after {max_attempts} attempts for job {job.job_id}")

    def poll_status(self, max_items: int = 100) -> List[Dict]:
        out = []
        while self._status_backlog and len(out) < max_items:
            msg = self._status_backlog.pop(0)
            self._ingest_status(msg)
            out.append(msg)
        for _ in range(max_items):
            if self.status_q.empty():
                break
            out.append(self.status_q.get())
            msg = self.status_q.get()
            self._ingest_status(msg)
            out.append(msg)
        return out

    def _ingest_status(self, msg: Dict):
        site = msg.get("site")
        if not site:
            return
        ev = msg.get("event")
        if ev == "METRICS":
            self.site_metrics[site] = msg
        elif ev == "HEARTBEAT":
            m = self.site_metrics.get(site, {})
            if "sim_time" in msg:
                m["sim_time"] = msg.get("sim_time")
            self.site_metrics[site] = m

    def estimate_sim_time(self) -> int:
        sim_times = []
        for m in self.site_metrics.values():
            if m is None:
                continue
            if "sim_time" in m and m["sim_time"] is not None:
                sim_times.append(int(m["sim_time"]))
        if not sim_times:
            return 0
        return min(sim_times)
+119 −7

File changed.

Preview size limit exceeded, changes collapsed.

Loading