Commit 2f210163 authored by Brewer, Wes's avatar Brewer, Wes
Browse files

Stagger OPAL pipeline iterations by Aurora phase completion



Previously all pipeline slots were filled at t=0, causing iterations to
march through Aurora/Frontier/Perlmutter in lockstep with no cross-site
concurrency. Now only one iteration starts immediately; subsequent
iterations are launched when the current one's data-prep job completes,
naturally staggering them so Aurora, Frontier, and Perlmutter are busy
with different iterations simultaneously.

Co-Authored-By: default avatarClaude Sonnet 4.6 <noreply@anthropic.com>
parent 7917dab7
Loading
Loading
Loading
Loading
+18 −8
Original line number Diff line number Diff line
@@ -100,9 +100,15 @@ def _run_opal_workflow(ms: MetaScheduler, token: AccessToken,
        next_iter += 1
        return handle

    def _maybe_stagger(name: str):
        """Launch the next iteration when the current one's Aurora phase ends."""
        if (name.endswith("-data-prep")
                and len(active_handles) < pipeline_depth
                and next_iter < total_iters):
            active_handles.append(_launch_next())

    if noui:
        # Fill initial pipeline
        while len(active_handles) < pipeline_depth and next_iter < total_iters:
        # Seed with one iteration; stagger subsequent ones on data-prep completion
        active_handles.append(_launch_next())

        done_sites = set()
@@ -113,9 +119,11 @@ def _run_opal_workflow(ms: MetaScheduler, token: AccessToken,

                if ev == "JOB_COMPLETED":
                    job_id = msg["job_id"]
                    print(f"  [opal] completed {msg.get('name', job_id)} on {site}")
                    name = msg.get("name", "")
                    print(f"  [opal] completed {name or job_id} on {site}")
                    for handle in active_handles:
                        handle.notify_completed(job_id)
                    _maybe_stagger(name)

                elif ev in ("HEARTBEAT", "METRICS", "IAM_DENY", "ERROR"):
                    print(msg)
@@ -126,7 +134,7 @@ def _run_opal_workflow(ms: MetaScheduler, token: AccessToken,
                if json_out and json_out["mode"] in ("events", "both"):
                    json_out["write"]({"type": "event", "ts": time.time(), "msg": msg})

            # Retire completed handles and launch new iterations
            # Retire completed handles and backfill any remaining slots
            active_handles = [h for h in active_handles if not h.is_complete]
            while len(active_handles) < pipeline_depth and next_iter < total_iters:
                active_handles.append(_launch_next())
@@ -146,8 +154,7 @@ def _run_opal_workflow(ms: MetaScheduler, token: AccessToken,
        dashboard = FederationDashboard(site_names)
        dashboard.update_layout()

        # Fill initial pipeline
        while len(active_handles) < pipeline_depth and next_iter < total_iters:
        # Seed with one iteration; stagger subsequent ones on data-prep completion
        active_handles.append(_launch_next())

        with Live(dashboard.layout, refresh_per_second=4, screen=True):
@@ -156,12 +163,15 @@ def _run_opal_workflow(ms: MetaScheduler, token: AccessToken,
                    ev = msg.get("event")
                    if ev == "JOB_COMPLETED":
                        job_id = msg["job_id"]
                        name = msg.get("name", "")
                        for handle in active_handles:
                            handle.notify_completed(job_id)
                        _maybe_stagger(name)
                    dashboard.process_event(msg)
                    if json_out and json_out["mode"] in ("events", "both"):
                        json_out["write"]({"type": "event", "ts": time.time(), "msg": msg})

                # Retire completed handles and backfill any remaining slots
                active_handles = [h for h in active_handles if not h.is_complete]
                while len(active_handles) < pipeline_depth and next_iter < total_iters:
                    active_handles.append(_launch_next())