Loading raps/metasched/site_worker.py +41 −25 Original line number Diff line number Diff line Loading @@ -19,9 +19,6 @@ class DummySim: self._t += 1 time.sleep(0.01) def _initialize_raps_sim(site_name: str, sim_config_path: str): return DummySim(site_name, sim_config_path) def _advance_one_step(sim): sim.step() Loading @@ -36,28 +33,28 @@ def _initialize_raps_sim(site_name: str, sim_config_path: str): python -m raps ... --config <path> Replace cmd with the exact command you run today for that site. """ return DummySim(site_name, sim_config_path) #return DummySim(site_name, sim_config_path) #cfg = Path(sim_config_path).resolve() # # # Example placeholder; replace with your actual RAPS command line. # cmd = [ # sys.executable, "-m", "raps", # "--config", str(cfg), # ] # # env = os.environ.copy() # env["RAPS_SITE"] = site_name # # p = subprocess.Popen( # cmd, # env=env, # stdout=subprocess.PIPE, # stderr=subprocess.STDOUT, # text=True, # bufsize=1, # ) # return p cfg = Path(sim_config_path).resolve() # Example placeholder; replace with your actual RAPS command line. cmd = [ sys.executable, "-m", "raps", "--config", str(cfg), ] env = os.environ.copy() env["RAPS_SITE"] = site_name p = subprocess.Popen( cmd, env=env, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, bufsize=1, ) return p def _advance_one_step(sim): """ Loading Loading @@ -99,6 +96,8 @@ def site_worker_main(site_name: str, status_out_q.put({"site": site_name, "event": "READY"}) tick = 0 while True: # cooperative stop if not stop_q.empty(): Loading @@ -114,6 +113,15 @@ def site_worker_main(site_name: str, # advance simulation by one tick (or a small quantum) # This avoids blocking so metascheduler stays responsive. _advance_one_step(sim) tick += 1 if tick % 200 == 0: status_out_q.put({ "site": site_name, "event": "HEARTBEAT", "tick": tick, "sim_time": _get_sim_time(sim), }) # optionally publish periodic metrics maybe = _poll_site_metrics(sim) Loading @@ -124,6 +132,14 @@ def site_worker_main(site_name: str, _shutdown(sim) status_out_q.put({"site": site_name, "event": "STOPPED"}) def _get_sim_time(sim): # dummy dict case if isinstance(sim, dict): return sim.get("ticks", 0) # real simulator case (edit to match your class) return getattr(sim, "time", None) or getattr(sim, "now", None) def _sim_prepare(sim): return Loading scripts/run_federation.py +23 −6 Original line number Diff line number Diff line #!/usr/bin/env python3 import time from raps.metasched.metascheduler import MetaScheduler from raps.metasched.types import AmSCJob def main(): sites = { "frontier": "configs/frontier/sim.yaml", Loading @@ -10,23 +13,37 @@ def main(): } ms = MetaScheduler(sites) # Start all site workers; MetaScheduler.start() blocks until all are READY ms.start() # Example: submit a few jobs # Submit a few jobs (arrival stream) for i in range(10): job = AmSCJob(nodes_required=64, wall_time_s=3600, name=f"amsc-{i}") job = AmSCJob(nodes_required=64, wall_time_s=3600, name=f"fed-{i}") site = ms.submit(job) print(f"submitted {job.job_id} -> {site}") time.sleep(0.5) time.sleep(0.2) # Stream status for a bit # Listen for events for a while. # This is necessary; a single ms.poll_status() call often misses heartbeats. listen_seconds = 10 t0 = time.time() while time.time() - t0 < 10: while time.time() - t0 < listen_seconds: for msg in ms.poll_status(): ev = msg.get("event") # Print the signals that prove each site is alive / progressing if ev in ("HEARTBEAT", "METRICS", "ERROR"): print(msg) # Optional: show enqueue events too (can be noisy) # elif ev == "ENQUEUED": # print(msg) time.sleep(0.2) ms.stop() if __name__ == "__main__": main() Loading
raps/metasched/site_worker.py +41 −25 Original line number Diff line number Diff line Loading @@ -19,9 +19,6 @@ class DummySim: self._t += 1 time.sleep(0.01) def _initialize_raps_sim(site_name: str, sim_config_path: str): return DummySim(site_name, sim_config_path) def _advance_one_step(sim): sim.step() Loading @@ -36,28 +33,28 @@ def _initialize_raps_sim(site_name: str, sim_config_path: str): python -m raps ... --config <path> Replace cmd with the exact command you run today for that site. """ return DummySim(site_name, sim_config_path) #return DummySim(site_name, sim_config_path) #cfg = Path(sim_config_path).resolve() # # # Example placeholder; replace with your actual RAPS command line. # cmd = [ # sys.executable, "-m", "raps", # "--config", str(cfg), # ] # # env = os.environ.copy() # env["RAPS_SITE"] = site_name # # p = subprocess.Popen( # cmd, # env=env, # stdout=subprocess.PIPE, # stderr=subprocess.STDOUT, # text=True, # bufsize=1, # ) # return p cfg = Path(sim_config_path).resolve() # Example placeholder; replace with your actual RAPS command line. cmd = [ sys.executable, "-m", "raps", "--config", str(cfg), ] env = os.environ.copy() env["RAPS_SITE"] = site_name p = subprocess.Popen( cmd, env=env, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, bufsize=1, ) return p def _advance_one_step(sim): """ Loading Loading @@ -99,6 +96,8 @@ def site_worker_main(site_name: str, status_out_q.put({"site": site_name, "event": "READY"}) tick = 0 while True: # cooperative stop if not stop_q.empty(): Loading @@ -114,6 +113,15 @@ def site_worker_main(site_name: str, # advance simulation by one tick (or a small quantum) # This avoids blocking so metascheduler stays responsive. _advance_one_step(sim) tick += 1 if tick % 200 == 0: status_out_q.put({ "site": site_name, "event": "HEARTBEAT", "tick": tick, "sim_time": _get_sim_time(sim), }) # optionally publish periodic metrics maybe = _poll_site_metrics(sim) Loading @@ -124,6 +132,14 @@ def site_worker_main(site_name: str, _shutdown(sim) status_out_q.put({"site": site_name, "event": "STOPPED"}) def _get_sim_time(sim): # dummy dict case if isinstance(sim, dict): return sim.get("ticks", 0) # real simulator case (edit to match your class) return getattr(sim, "time", None) or getattr(sim, "now", None) def _sim_prepare(sim): return Loading
scripts/run_federation.py +23 −6 Original line number Diff line number Diff line #!/usr/bin/env python3 import time from raps.metasched.metascheduler import MetaScheduler from raps.metasched.types import AmSCJob def main(): sites = { "frontier": "configs/frontier/sim.yaml", Loading @@ -10,23 +13,37 @@ def main(): } ms = MetaScheduler(sites) # Start all site workers; MetaScheduler.start() blocks until all are READY ms.start() # Example: submit a few jobs # Submit a few jobs (arrival stream) for i in range(10): job = AmSCJob(nodes_required=64, wall_time_s=3600, name=f"amsc-{i}") job = AmSCJob(nodes_required=64, wall_time_s=3600, name=f"fed-{i}") site = ms.submit(job) print(f"submitted {job.job_id} -> {site}") time.sleep(0.5) time.sleep(0.2) # Stream status for a bit # Listen for events for a while. # This is necessary; a single ms.poll_status() call often misses heartbeats. listen_seconds = 10 t0 = time.time() while time.time() - t0 < 10: while time.time() - t0 < listen_seconds: for msg in ms.poll_status(): ev = msg.get("event") # Print the signals that prove each site is alive / progressing if ev in ("HEARTBEAT", "METRICS", "ERROR"): print(msg) # Optional: show enqueue events too (can be noisy) # elif ev == "ENQUEUED": # print(msg) time.sleep(0.2) ms.stop() if __name__ == "__main__": main()