Commit 4d1a0f6f authored by Brewer, Wes's avatar Brewer, Wes
Browse files

Initial federated computing model with metascheduler

parent 82305a99
Loading
Loading
Loading
Loading
+0 −0

Empty file added.

+76 −0
Original line number Diff line number Diff line
from multiprocessing import Process, Queue
from typing import Dict, List
from .types import AmSCJob
from .site_worker import site_worker_main

class MetaScheduler:
    def __init__(self, sites: Dict[str, str]):
        """
        sites: {site_name: sim_config_path}
        """
        self.sites = sites
        self.job_queues: Dict[str, Queue] = {s: Queue() for s in sites}
        self.stop_queues: Dict[str, Queue] = {s: Queue() for s in sites}
        self.status_q: Queue = Queue()
        self.procs: Dict[str, Process] = {}
        self._rr = 0
        self._site_list = list(self.sites.keys())
        assert len(self._site_list) > 0, "No sites configured"

    def start(self):
        for site, cfg_path in self.sites.items():
            p = Process(
                target=site_worker_main,
                args=(site, cfg_path, self.job_queues[site], self.status_q, self.stop_queues[site]),
                daemon=True,
            )
            p.start()
            self.procs[site] = p

        # Wait for READY from each site (simple barrier)
        ready = set()
        while len(ready) < len(self.sites):
            msg = self.status_q.get()
            if msg.get("event") == "READY":
                print(f"READY: {msg['site']}")
                ready.add(msg["site"])

    def stop(self):
        for site in self.sites:
            self.stop_queues[site].put(True)
        for p in self.procs.values():
            p.join(timeout=5)

    # ---- policies ----
    def choose_site(self, job: AmSCJob) -> str:
        """
        Simplest policy: choose the site with the shortest inbound queue length.
        Queue.qsize() is approximate on some platforms, but adequate for a first pass.
        """
        #best_site = None
        #best_q = None
        #for site, q in self.job_queues.items():
        #    qs = q.qsize()
        #    if best_q is None or qs < best_q:
        #        best_q = qs
        #        best_site = site
        #assert best_site is not None
        #return best_site
        site = self._site_list[self._rr % len(self._site_list)]
        self._rr += 1
        return site

    def submit(self, job: AmSCJob) -> str:
        site = self.choose_site(job)
        if site is None:
            raise RuntimeError(f"choose_site() returned None; _site_list={getattr(self,'_site_list',None)}")
        self.job_queues[site].put(job.__dict__)
        return site

    def poll_status(self, max_items: int = 100) -> List[Dict]:
        out = []
        for _ in range(max_items):
            if self.status_q.empty():
                break
            out.append(self.status_q.get())
        return out
+134 −0
Original line number Diff line number Diff line
from dataclasses import asdict
from multiprocessing import Queue
from typing import Any, Dict, Optional

import subprocess
import os
import sys
from pathlib import Path

import time

class DummySim:
    def __init__(self, site_name: str, sim_config_path: str):
        self.site_name = site_name
        self.sim_config_path = sim_config_path
        self._t = 0

    def step(self):
        self._t += 1
        time.sleep(0.01)

def _initialize_raps_sim(site_name: str, sim_config_path: str):
    return DummySim(site_name, sim_config_path)

def _advance_one_step(sim):
    sim.step()

def _inject_job(sim, job_dict):
    # For now just accept; later map into RAPS scheduler.
    return

def _initialize_raps_sim(site_name: str, sim_config_path: str):
    """
    Start RAPS as a subprocess for this site.
    Assumes your CLI supports something like:
      python -m raps ... --config <path>
    Replace cmd with the exact command you run today for that site.
    """
    return DummySim(site_name, sim_config_path)

    #cfg = Path(sim_config_path).resolve()
#
#    # Example placeholder; replace with your actual RAPS command line.
#    cmd = [
#        sys.executable, "-m", "raps",
#        "--config", str(cfg),
#    ]
#
#    env = os.environ.copy()
#    env["RAPS_SITE"] = site_name
#
#    p = subprocess.Popen(
#        cmd,
#        env=env,
#        stdout=subprocess.PIPE,
#        stderr=subprocess.STDOUT,
#        text=True,
#        bufsize=1,
#    )
#    return p

def _advance_one_step(sim):
    """
    If you run RAPS as a subprocess, you don't "step" it here.
    Instead, you can tail output or just sleep a little.
    """
    import time
    time.sleep(0.05)

def _shutdown(sim):
    try:
        sim.terminate()
    except Exception:
        pass

def site_worker_main(site_name: str,
                     sim_config_path: str,
                     job_in_q: "Queue[Dict[str, Any]]",
                     status_out_q: "Queue[Dict[str, Any]]",
                     stop_q: "Queue[bool]"):
    """
    One process per site. Owns the RAPS simulator object and all mutable state.
    Communicates via queues only.
    """
    # --- Import RAPS inside the process to avoid cross-process state leakage
    # You will adjust these imports to your branch structure.
    # Example idea (names may differ in your code):
    # from raps.simulator import Simulator
    # from raps.sim_config import SingleSimConfig
    #
    # cfg = SingleSimConfig.from_yaml(sim_config_path) or similar
    # sim = Simulator(cfg)
    #
    # For now, treat these as placeholders:
    sim = _initialize_raps_sim(site_name, sim_config_path)

    # Optional: prime sim, load traces, etc.
    _sim_prepare(sim)

    status_out_q.put({"site": site_name, "event": "READY"})

    while True:
        # cooperative stop
        if not stop_q.empty():
            _ = stop_q.get()
            break

        # receive new jobs
        while not job_in_q.empty():
            job = job_in_q.get()
            _inject_job(sim, job)
            status_out_q.put({"site": site_name, "event": "ENQUEUED", "job_id": job["job_id"]})

        # advance simulation by one tick (or a small quantum)
        # This avoids blocking so metascheduler stays responsive.
        _advance_one_step(sim)

        # optionally publish periodic metrics
        maybe = _poll_site_metrics(sim)
        if maybe is not None:
            status_out_q.put({"site": site_name, "event": "METRICS", **maybe})

    # graceful shutdown
    _shutdown(sim)
    status_out_q.put({"site": site_name, "event": "STOPPED"})

def _sim_prepare(sim):
    return

def _poll_site_metrics(sim) -> Optional[Dict[str, Any]]:
    return None

def _shutdown(sim):
    return
+13 −0
Original line number Diff line number Diff line
from dataclasses import dataclass, field
from typing import Any, Dict, Optional
import time
import uuid

@dataclass
class AmSCJob:
    nodes_required: int
    wall_time_s: int
    submit_time_s: int = field(default_factory=lambda: int(time.time()))
    name: str = "job"
    meta: Dict[str, Any] = field(default_factory=dict)
    job_id: str = field(default_factory=lambda: str(uuid.uuid4()))
+32 −0
Original line number Diff line number Diff line
import time
from raps.metasched.metascheduler import MetaScheduler
from raps.metasched.types import AmSCJob

def main():
    sites = {
        "frontier": "configs/frontier/sim.yaml",
        "aurora": "configs/aurora/sim.yaml",
        "perlmutter": "configs/perlmutter/sim.yaml",
    }

    ms = MetaScheduler(sites)
    ms.start()

    # Example: submit a few jobs
    for i in range(10):
        job = AmSCJob(nodes_required=64, wall_time_s=3600, name=f"amsc-{i}")
        site = ms.submit(job)
        print(f"submitted {job.job_id} -> {site}")
        time.sleep(0.5)

    # Stream status for a bit
    t0 = time.time()
    while time.time() - t0 < 10:
        for msg in ms.poll_status():
            print(msg)
        time.sleep(0.2)

    ms.stop()

if __name__ == "__main__":
    main()