Commit 0dfc13f0 authored by Brewer, Wes's avatar Brewer, Wes
Browse files

Fix some of the issues related to communicating with the invididual machines of the federation

parent 4242b8cc
Loading
Loading
Loading
Loading
+129 −103
Original line number Diff line number Diff line
from dataclasses import asdict
from multiprocessing import Queue
from typing import Any, Dict, Optional

import subprocess
import os
import sys
from pathlib import Path

import time

class DummySim:
    def __init__(self, site_name: str, sim_config_path: str):
        self.site_name = site_name
        self.sim_config_path = sim_config_path
        self._t = 0

    def step(self):
        self._t += 1
        time.sleep(0.01)
def _initialize_raps_sim(site_name: str, sim_config_path: str):
    """
    Initialize a RAPS Engine directly in this process.
    Returns the engine instance for direct manipulation.

def _advance_one_step(sim):
    sim.step()
    sim_config_path can be:
    - A path to a SystemConfig YAML file (e.g., config/frontier.yaml)
    - A built-in system name (e.g., "frontier")
    """
    from raps.sim_config import SingleSimConfig
    from raps.engine import Engine

    # Create a SingleSimConfig that references the system config
    # Using sensible defaults for federation simulation
    sim_config = SingleSimConfig(
        system=sim_config_path,  # Path to SystemConfig or system name
        time="1h",               # Simulate 1 hour by default
        numjobs=1,               # Minimal jobs to initialize (more will be injected)
        noui=True,               # No UI in worker process
        output="none",           # No file output
        workload="random",       # Generates initial random jobs
        policy="fcfs",           # First-come-first-served for dynamic jobs
    )
    engine = Engine(sim_config)
    return engine

def _inject_job(sim, job_dict):
    # For now just accept; later map into RAPS scheduler.
    return

def _initialize_raps_sim(site_name: str, sim_config_path: str):
def _inject_job(engine, job_dict: dict):
    """
    Start RAPS as a subprocess for this site.
    Assumes your CLI supports something like:
      python -m raps ... --config <path>
    Replace cmd with the exact command you run today for that site.
    Convert a FedJob dict to a RAPS Job and add it directly to the queue.
    Adding to engine.queue bypasses the batching mechanism and makes the
    job immediately available for scheduling.
    """
    #return DummySim(site_name, sim_config_path)

    cfg = Path(sim_config_path).resolve()

    # Example placeholder; replace with your actual RAPS command line.
    cmd = [
        sys.executable, "-m", "raps",
        "--config", str(cfg),
    ]

    env = os.environ.copy()
    env["RAPS_SITE"] = site_name

    p = subprocess.Popen(
        cmd,
        env=env,
        stdout=subprocess.PIPE,
        stderr=subprocess.STDOUT,
        text=True,
        bufsize=1,
    from raps.job import Job, job_dict as make_job_dict
    import numpy as np

    wall_time = job_dict['wall_time_s']
    trace_quanta = 15  # 15-second samples (matches RAPS convention)
    num_samples = max(1, wall_time // trace_quanta)

    # Generate random realistic traces
    cpu_trace = np.random.uniform(30, 90, num_samples).tolist()
    gpu_trace = np.random.uniform(40, 95, num_samples).tolist()

    # Convert FedJob fields to RAPS Job
    raps_job = Job(make_job_dict(
        nodes_required=job_dict['nodes_required'],
        name=job_dict.get('name', 'fed-job'),
        account=job_dict.get('meta', {}).get('account', 'federated'),
        id=job_dict.get('job_id'),
        submit_time=engine.current_timestep,  # Use simulation time
        expected_run_time=wall_time,
        time_limit=wall_time,
        cpu_trace=cpu_trace,
        gpu_trace=gpu_trace,
        ntx_trace=None,
        nrx_trace=None,
        trace_quanta=trace_quanta,
    ))

    # Add directly to queue for immediate scheduling consideration
    # (Adding to engine.jobs wouldn't work because the simulation loop
    # copies jobs at startup and doesn't see later additions)
    engine.queue.append(raps_job)

    # Trigger the scheduler to consider the new job immediately
    engine.scheduler.schedule(
        engine.queue,
        engine.running,
        engine.current_timestep,
        accounts=engine.accounts,
        sorted=False
    )
    return p

def _advance_one_step(sim):

def _get_sim_time(engine) -> Optional[int]:
    """Return the current simulation timestep."""
    return getattr(engine, 'current_timestep', None)


def _poll_site_metrics(engine, tick: int) -> Optional[Dict[str, Any]]:
    """
    If you run RAPS as a subprocess, you don't "step" it here.
    Instead, you can tail output or just sleep a little.
    Return current site metrics every N ticks.
    Returns None if not time to report yet.
    """
    import time
    time.sleep(0.05)
    # Report metrics every 100 ticks
    if tick % 100 != 0:
        return None

def _shutdown(sim):
    try:
        sim.terminate()
    except Exception:
    return {
        'running_jobs': len(engine.running),
        'queued_jobs': len(engine.queue),
        'active_nodes': engine.num_active_nodes,
        'free_nodes': engine.num_free_nodes,
    }


def _shutdown(engine):
    """Graceful shutdown of the engine (no-op for embedded engine)."""
    pass


def site_worker_main(site_name: str,
                     sim_config_path: str,
                     job_in_q: "Queue[Dict[str, Any]]",
                     status_out_q: "Queue[Dict[str, Any]]",
                     stop_q: "Queue[bool]"):
    """
    One process per site. Owns the RAPS simulator object and all mutable state.
    One process per site. Owns the RAPS Engine and all mutable state.
    Communicates via queues only.
    """
    # --- Import RAPS inside the process to avoid cross-process state leakage
    # You will adjust these imports to your branch structure.
    # Example idea (names may differ in your code):
    # from raps.simulator import Simulator
    # from raps.sim_config import SingleSimConfig
    #
    # cfg = SingleSimConfig.from_yaml(sim_config_path) or similar
    # sim = Simulator(cfg)
    #
    # For now, treat these as placeholders:
    sim = _initialize_raps_sim(site_name, sim_config_path)

    # Optional: prime sim, load traces, etc.
    _sim_prepare(sim)
    # Initialize the RAPS engine directly in this process
    engine = _initialize_raps_sim(site_name, sim_config_path)

    # Create the simulation generator
    sim_gen = engine.run_simulation()

    status_out_q.put({"site": site_name, "event": "READY"})

    tick = 0

    while True:
        # cooperative stop
        # Check for stop signal
        if not stop_q.empty():
            _ = stop_q.get()
            break

        # receive new jobs
        # Receive and inject new jobs
        while not job_in_q.empty():
            job = job_in_q.get()
            _inject_job(sim, job)
            status_out_q.put({"site": site_name, "event": "ENQUEUED", "job_id": job["job_id"]})
            _inject_job(engine, job)
            status_out_q.put({
                "site": site_name,
                "event": "ENQUEUED",
                "job_id": job["job_id"]
            })

        # advance simulation by one tick (or a small quantum)
        # This avoids blocking so metascheduler stays responsive.
        _advance_one_step(sim)
        # Advance simulation by one tick
        try:
            tick_data = next(sim_gen)
            tick += 1
        except StopIteration:
            # Simulation completed
            status_out_q.put({
                "site": site_name,
                "event": "SIMULATION_COMPLETE",
                "tick": tick
            })
            break

        # Periodic heartbeat
        if tick % 200 == 0:
            status_out_q.put({
                "site": site_name,
                "event": "HEARTBEAT",
                "tick": tick,
                "sim_time": _get_sim_time(sim),
                "sim_time": _get_sim_time(engine),
            })

        # optionally publish periodic metrics
        maybe = _poll_site_metrics(sim)
        if maybe is not None:
            status_out_q.put({"site": site_name, "event": "METRICS", **maybe})
        # Periodic metrics
        metrics = _poll_site_metrics(engine, tick)
        if metrics is not None:
            status_out_q.put({
                "site": site_name,
                "event": "METRICS",
                **metrics
            })

    # graceful shutdown
    _shutdown(sim)
    # Graceful shutdown
    _shutdown(engine)
    status_out_q.put({"site": site_name, "event": "STOPPED"})

def _get_sim_time(sim):
    # dummy dict case
    if isinstance(sim, dict):
        return sim.get("ticks", 0)

    # real simulator case (edit to match your class)
    return getattr(sim, "time", None) or getattr(sim, "now", None)

def _sim_prepare(sim):
    return

def _poll_site_metrics(sim) -> Optional[Dict[str, Any]]:
    return None

def _shutdown(sim):
    return