Commit 7917dab7 authored by Brewer, Wes's avatar Brewer, Wes
Browse files

Add OPAL agentic workflow for AMSC federation simulation



Models the OPAL Experiment Steering Vision across Aurora/Frontier/Perlmutter:
- Aurora: instrument driver, S3M data streaming, data preparation
- Frontier: ViT segmentation inference, plant analysis
- Perlmutter: data lakehouse ingest, provenance, optional ViT training
- Dynamic routing: LLM interpretation job goes to site with most free nodes

Key additions:
- FedJob: target_site and depends_on fields for DAG-based submission
- MetaScheduler: submit_workflow() + WorkflowHandle for dependency release
- site_worker: emits JOB_COMPLETED events; respects gpu_fraction in meta
- raps/workloads/opal.py: per-iteration DAG factory (8-9 jobs/iteration)
- raps/fed_config.py: OpalConfig, WorkloadConfig
- run_fed.py: _run_opal_workflow() with pipelined iteration and dashboard support
- experiments/opal.yaml: 6 iterations, pipeline_depth=2, training every 3rd iter

Run: raps run-fed experiments/opal.yaml

Co-Authored-By: default avatarClaude Sonnet 4.6 <noreply@anthropic.com>
parent 9a17ba81
Loading
Loading
Loading
Loading

experiments/opal.yaml

0 → 100644
+39 −0
Original line number Diff line number Diff line
# OPAL agentic workflow across AMSC (Aurora, Frontier, Perlmutter)
#
# Models the OPAL Experiment Steering Vision:
#   Aurora      — instrument driver, S3M data streaming, data preparation
#   Frontier    — ViT segmentation inference, plant analysis
#   Perlmutter  — data lakehouse ingest, provenance, optional ViT training
#
# Run:  raps run-fed experiments/opal.yaml

sites:
  frontier: ../config/frontier.yaml
  aurora: ../config/aurora.yaml
  perlmutter: ../config/perlmutter.yaml

sim_time: 24h

iam:
  site_trust_tiers:
    frontier: 3
    aurora: 2
    perlmutter: 2

workload:
  type: opal
  opal:
    num_iterations: 6
    pipeline_depth: 2         # up to 2 iterations overlapping across sites
    training_every_n: 3       # fire ViT training on Perlmutter every 3rd iteration
    training_nodes: 32
    vit_infer_nodes: 8
    data_prep_nodes: 4

token:
  subject: opal@federation
  roles: [researcher]
  scopes: [federation:submit]
  allowed_sites: [frontier, aurora, perlmutter]
  max_nodes: 512
  max_wall_time_s: 28800      # 8 h (covers ViT training)
+15 −0
Original line number Diff line number Diff line
@@ -31,6 +31,20 @@ class TokenConfig(RAPSBaseModel):
    max_wall_time_s: int = 7200


class OpalConfig(RAPSBaseModel):
    num_iterations: int = 5
    pipeline_depth: int = 2       # max iterations in-flight simultaneously
    training_every_n: int = 3     # fire a ViT training job every N iterations (0 = never)
    training_nodes: int = 32
    vit_infer_nodes: int = 8
    data_prep_nodes: int = 4


class WorkloadConfig(RAPSBaseModel):
    type: str = "random"          # "random" | "opal"
    opal: OpalConfig = Field(default_factory=OpalConfig)


class DemoConfig(RAPSBaseModel):
    submit_interval: float = 0.2
    dispatch_interval: float = 1.0
@@ -48,6 +62,7 @@ class FederationConfig(RAPSBaseModel):
    submit: SubmitConfig = Field(default_factory=SubmitConfig)
    token: TokenConfig = Field(default_factory=TokenConfig)
    demo: DemoConfig = Field(default_factory=DemoConfig)
    workload: WorkloadConfig = Field(default_factory=WorkloadConfig)
    noui: bool = False
    listen_seconds: int = 60
    output_json: Path | None = None
+70 −2
Original line number Diff line number Diff line
from collections import defaultdict
from dataclasses import dataclass, field
from multiprocessing import Process, Queue
from typing import Dict, List, Optional
from typing import Any, Dict, List, Optional, Set
from .types import FedJob
from .site_worker import site_worker_main
from .iam import AccessToken, IAMPolicyEngine


@dataclass
class WorkflowHandle:
    """Tracks submission and completion of a job DAG."""
    _pending: Dict[str, FedJob]            # job_id -> job not yet submitted
    _dependents: Dict[str, List[FedJob]]   # job_id -> jobs waiting on it
    _submitted: Set[str]                   # job_ids sent to a site
    _completed: Set[str]                   # job_ids that reported JOB_COMPLETED
    _ms: Any                               # MetaScheduler ref
    _token: Any

    def notify_completed(self, job_id: str) -> List[str]:
        """Call when a JOB_COMPLETED event arrives. Returns newly submitted job IDs."""
        if job_id not in self._submitted:
            return []
        self._completed.add(job_id)
        newly = []
        for dep_job in self._dependents.get(job_id, []):
            if dep_job.job_id in self._submitted:
                continue
            if all(d in self._completed for d in dep_job.depends_on):
                self._ms.submit(dep_job, token=self._token)
                self._submitted.add(dep_job.job_id)
                self._pending.pop(dep_job.job_id, None)
                newly.append(dep_job.job_id)
        return newly

    @property
    def is_complete(self) -> bool:
        return not self._pending and self._submitted == self._completed

    @property
    def pending_count(self) -> int:
        return len(self._pending)

class MetaScheduler:
    def __init__(self, sites: Dict[str, str], iam_policy: Optional[IAMPolicyEngine] = None,
                 sim_time: str = "24h"):
@@ -71,8 +108,10 @@ class MetaScheduler:

    def submit(self, job: FedJob, token: Optional[AccessToken] = None) -> str:
        candidate_sites = self._site_list
        if job.target_site is not None:
            candidate_sites = [job.target_site]
        if self.iam_policy is not None:
            eligible = sorted(self.iam_policy.eligible_sites(job, token, self._site_list))
            eligible = sorted(self.iam_policy.eligible_sites(job, token, candidate_sites))
            if not eligible:
                reason = "no authorized site for job"
                self.status_q.put({
@@ -110,3 +149,32 @@ class MetaScheduler:
                break
            out.append(self.status_q.get())
        return out

    def submit_workflow(self, jobs: List[FedJob], token: Optional[AccessToken] = None) -> WorkflowHandle:
        """
        Submit a DAG of FedJobs respecting depends_on dependencies.
        Jobs with no dependencies are submitted immediately; others are
        released automatically as their dependencies complete via
        WorkflowHandle.notify_completed().
        """
        dependents: Dict[str, List[FedJob]] = defaultdict(list)
        for job in jobs:
            for dep_id in job.depends_on:
                dependents[dep_id].append(job)

        submitted: Set[str] = set()
        pending: Dict[str, FedJob] = {}
        for job in jobs:
            pending[job.job_id] = job
            if not job.depends_on:
                self.submit(job, token=token)
                submitted.add(job.job_id)

        return WorkflowHandle(
            _pending={jid: j for jid, j in pending.items() if jid not in submitted},
            _dependents=dict(dependents),
            _submitted=submitted,
            _completed=set(),
            _ms=self,
            _token=token,
        )
+15 −1
Original line number Diff line number Diff line
@@ -53,6 +53,10 @@ def _inject_job(engine, job_dict: dict):
    # Generate traces consistent with internal random workloads:
    # utilizations are in [0, CPUS_PER_NODE] / [0, GPUS_PER_NODE]
    cpu_util = np.random.random() * engine.config.get('CPUS_PER_NODE', 1)
    gpu_fraction = job_dict.get('meta', {}).get('gpu_fraction', None)
    if gpu_fraction is not None:
        gpu_util = float(gpu_fraction) * engine.config.get('GPUS_PER_NODE', 1)
    else:
        gpu_util = np.random.random() * engine.config.get('GPUS_PER_NODE', 1)
    cpu_trace = (cpu_util * np.ones(num_samples)).tolist()
    gpu_trace = (gpu_util * np.ones(num_samples)).tolist()
@@ -250,6 +254,16 @@ def site_worker_main(site_name: str,
                "sim_time": _get_sim_time(engine),
            })

        # Emit JOB_COMPLETED events for jobs that finished this tick
        for job in tick_data.completed:
            status_out_q.put({
                "site": site_name,
                "event": "JOB_COMPLETED",
                "job_id": str(job.id),
                "name": str(job.name),
                "tick": tick,
            })

        # Periodic metrics
        metrics = _poll_site_metrics(engine, tick, tick_data)
        if metrics is not None:
+3 −1
Original line number Diff line number Diff line
from dataclasses import dataclass, field
from typing import Any, Dict, Optional
from typing import Any, Dict, List, Optional
import time
import uuid

@@ -11,3 +11,5 @@ class FedJob:
    name: str = "job"
    meta: Dict[str, Any] = field(default_factory=dict)
    job_id: str = field(default_factory=lambda: str(uuid.uuid4()))
    target_site: Optional[str] = None          # pin to a specific site; None = metasched chooses
    depends_on: List[str] = field(default_factory=list)  # job_ids that must complete first
Loading