From 25ce7f35eb68caca0cdff52a527185fcd99aa7a3 Mon Sep 17 00:00:00 2001 From: Jesse Hines Date: Fri, 12 Sep 2025 17:29:36 -0400 Subject: [PATCH 1/6] Add length check --- raps/sim_config.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/raps/sim_config.py b/raps/sim_config.py index 867dac0..c27a2ab 100644 --- a/raps/sim_config.py +++ b/raps/sim_config.py @@ -4,6 +4,7 @@ from pathlib import Path from functools import cached_property from datetime import timedelta from typing import Literal, Annotated as A +from annotated_types import Len import importlib from raps.schedulers.default import PolicyType, BackfillType from raps.utils import ( @@ -435,7 +436,7 @@ class SingleSimConfig(SimConfig, abc.ABC): class MultiPartSimConfig(SimConfig): - partitions: list[SystemConfig | str] + partitions: A[list[SystemConfig | str], Len(min_length=1)] """ List of multiple systems/partitions to run. Can be names of preconfigured systems, or paths to custom SystemConfig yaml files. -- GitLab From 449e480fde3fdca6f0a0d51afcd298967534b83b Mon Sep 17 00:00:00 2001 From: Jesse Hines Date: Fri, 12 Sep 2025 15:21:40 -0400 Subject: [PATCH 2/6] remove job param from run_simulation --- raps/engine.py | 15 ++++++++------- raps/envs/raps_env.py | 2 +- raps/multi_part_engine.py | 12 ++++-------- raps/run_sim.py | 5 +---- raps/ui.py | 5 ++--- tests/util.py | 6 ++---- 6 files changed, 18 insertions(+), 27 deletions(-) diff --git a/raps/engine.py b/raps/engine.py index 8d7e231..c652336 100644 --- a/raps/engine.py +++ b/raps/engine.py @@ -130,7 +130,7 @@ class Engine: flops_manager: FLOPSManager, telemetry: Telemetry, cooling_model: ThermoFluidsModel | None = None, - jobs=None, + jobs: list[Job], total_initial_jobs=0, # Workload class to generate from for continuous generation continuous_workload: Workload | None = None, @@ -153,6 +153,7 @@ class Engine: self.job_history_dict = [] self.jobs_completed = 0 self.jobs_killed = 0 + self.jobs = jobs self.total_initial_jobs = total_initial_jobs self.current_timestep = 0 self.cooling_model = cooling_model @@ -713,7 +714,7 @@ class Engine: self.scheduler.policy = target_policy self.scheduler.bfpolicy = target_bfpolicy - def run_simulation(self, jobs, timestep_start, timestep_end, time_delta=1, autoshutdown=False): + def run_simulation(self, timestep_start, timestep_end, time_delta=1, autoshutdown=False): """Generator that yields after each simulation tick.""" if self.scheduler.policy == PolicyType.REPLAY: @@ -722,16 +723,16 @@ class Engine: replay = False if self.debug: - print(f"[DEBUG] run_simulation: Initial jobs count: {len(jobs)}") - if jobs: + print(f"[DEBUG] run_simulation: Initial jobs count: {len(self.jobs)}") + if self.jobs: print("[DEBUG] run_simulation: First job submit_time: " - f"{jobs[0].submit_time}, start_time: {jobs[0].start_time}") + f"{self.jobs[0].submit_time}, start_time: {self.jobs[0].start_time}") # Set times and place jobs that are currently running, onto the system. - self.prepare_system_state(all_jobs=jobs, timestep_start=timestep_start, timestep_end=timestep_end) + self.prepare_system_state(all_jobs=self.jobs, timestep_start=timestep_start, timestep_end=timestep_end) # Process jobs in batches for better performance of timestep loop - all_jobs = jobs.copy() + all_jobs = self.jobs.copy() submit_times = [j.submit_time for j in all_jobs] cursor = 0 diff --git a/raps/envs/raps_env.py b/raps/envs/raps_env.py index ec9fcad..bb435d8 100644 --- a/raps/envs/raps_env.py +++ b/raps/envs/raps_env.py @@ -64,7 +64,7 @@ class RAPSEnv(gym.Env): self.jobs = workload_data.jobs timestep_start = workload_data.telemetry_start timestep_end = workload_data.telemetry_end - self.generator = engine.run_simulation(self.jobs, timestep_start, timestep_end, time_delta) + self.generator = engine.run_simulation(timestep_start, timestep_end, time_delta) return engine def reset(self, **kwargs): diff --git a/raps/multi_part_engine.py b/raps/multi_part_engine.py index 57e3e27..7d2d8f2 100644 --- a/raps/multi_part_engine.py +++ b/raps/multi_part_engine.py @@ -5,10 +5,9 @@ from raps.utils import WorkloadData class MultiPartEngine: - def __init__(self, engines: dict[str, Engine], jobs: dict[str, list]): + def __init__(self, *, engines: dict[str, Engine]): self.partition_names = sorted(engines.keys()) self.engines = engines - self.jobs = jobs @staticmethod def from_sim_config(sim_config: MultiPartSimConfig): @@ -38,19 +37,16 @@ class MultiPartEngine: for engine in engines.values(): engine.total_initial_jobs = total_initial_jobs - multi_engine = MultiPartEngine( - engines=engines, - jobs={p: w.jobs for p, w in workloads_by_partition.items()}, - ) + multi_engine = MultiPartEngine(engines=engines) return multi_engine, workloads_by_partition, timestep_start, timestep_end, time_delta - def run_simulation(self, jobs: dict, timestep_start, timestep_end, time_delta=1 + def run_simulation(self, timestep_start, timestep_end, time_delta=1 ) -> Iterable[dict[str, TickData | None]]: generators = [] for part in self.partition_names: generators.append(self.engines[part].run_simulation( - jobs[part], timestep_start, timestep_end, time_delta, + timestep_start, timestep_end, time_delta, )) for tick_datas in zip(*generators, strict=True): yield dict(zip(self.partition_names, tick_datas)) diff --git a/raps/run_sim.py b/raps/run_sim.py index 74ea87a..df9d44b 100644 --- a/raps/run_sim.py +++ b/raps/run_sim.py @@ -77,7 +77,6 @@ def run_sim(sim_config: SingleSimConfig): args_dict=sim_config.get_legacy_args_dict(), **sim_config.system_configs[0].get_legacy(), ) layout_manager.run( - jobs, timestep_start=timestep_start, timestep_end=timestep_end, time_delta=time_delta, ) @@ -237,10 +236,8 @@ def run_parts_sim(sim_config: MultiPartSimConfig): ) (out / 'sim_config.yaml').write_text(sim_config.dump_yaml()) - jobs = {p: w.jobs for p, w in workload_results.items()} - ui_update_freq = sim_config.system_configs[0].scheduler.ui_update_freq - gen = multi_engine.run_simulation(jobs, timestep_start, timestep_end, time_delta) + gen = multi_engine.run_simulation(timestep_start, timestep_end, time_delta) for tick_datas in gen: sys_power = 0 diff --git a/raps/ui.py b/raps/ui.py index 3965935..b9a4694 100644 --- a/raps/ui.py +++ b/raps/ui.py @@ -556,7 +556,7 @@ class LayoutManager: data.system_util, uncertainties=uncertainties, ) - def run(self, jobs, timestep_start, timestep_end, time_delta): + def run(self, timestep_start, timestep_end, time_delta): """ Runs the UI, blocking until the simulation is complete """ if not self.debug and not self.noui: context = Live(self.layout, auto_refresh=True, refresh_per_second=3) @@ -565,8 +565,7 @@ class LayoutManager: try: with context: # last_i = 0 - for i, data in enumerate(self.engine.run_simulation(jobs, - timestep_start, + for i, data in enumerate(self.engine.run_simulation(timestep_start, timestep_end, time_delta, autoshutdown=True)): diff --git a/tests/util.py b/tests/util.py index b5ba495..b9e8005 100644 --- a/tests/util.py +++ b/tests/util.py @@ -61,10 +61,9 @@ def run_engine(sim_config, include_ticks=False) -> tuple[Engine, dict[str, Any]] sim_config = SingleSimConfig.model_validate(sim_config) engine, workload_data, time_delta = Engine.from_sim_config(sim_config) - jobs = workload_data.jobs timestep_start = workload_data.telemetry_start timestep_end = workload_data.telemetry_end - gen = engine.run_simulation(jobs, timestep_start, timestep_end, time_delta) + gen = engine.run_simulation(timestep_start, timestep_end, time_delta) stats = { "tick_count": 0, @@ -93,8 +92,7 @@ def run_multi_part_engine(sim_config, include_ticks=False) -> tuple[MultiPartEng sim_config = MultiPartSimConfig.model_validate(sim_config) multi_engine, workload_results, timestep_start, timestep_end, time_delta = \ MultiPartEngine.from_sim_config(sim_config) - jobs = {p: w.jobs for p, w in workload_results.items()} - gen = multi_engine.run_simulation(jobs, timestep_start, timestep_end, time_delta) + gen = multi_engine.run_simulation(timestep_start, timestep_end, time_delta) stats = { "tick_count": 0, -- GitLab From c7fd5fc0984af39ffe1aec3b1c81b0669aae3716 Mon Sep 17 00:00:00 2001 From: Jesse Hines Date: Fri, 12 Sep 2025 15:42:08 -0400 Subject: [PATCH 3/6] remove timestep params from run_simulation --- raps/cooling.py | 4 ++-- raps/engine.py | 27 ++++++++++++++++++++++----- raps/envs/raps_env.py | 4 +--- raps/multi_part_engine.py | 7 ++----- raps/run_sim.py | 6 ++---- raps/ui.py | 11 +++++------ tests/util.py | 6 ++---- 7 files changed, 36 insertions(+), 29 deletions(-) diff --git a/raps/cooling.py b/raps/cooling.py index 066c12c..68a8a30 100644 --- a/raps/cooling.py +++ b/raps/cooling.py @@ -155,8 +155,8 @@ class ThermoFluidsModel: # If replay mode is on and weather data is available if self.weather and self.weather.has_coords: # Convert total seconds to timedelta object - delta = timedelta(seconds=engine.current_timestep) - target_datetime = self.weather.start + delta + delta = timedelta(seconds=engine.current_timestep - engine.timestep_start) + target_datetime = engine.start + delta # Get temperature from weather data temperature = self.weather.get_temperature(target_datetime) or self.config['WET_BULB_TEMP'] diff --git a/raps/engine.py b/raps/engine.py index c652336..7a12040 100644 --- a/raps/engine.py +++ b/raps/engine.py @@ -10,6 +10,7 @@ import os import select import time import random +from datetime import datetime from raps.job import Job, JobState from raps.policy import PolicyType from raps.utils import ( @@ -126,6 +127,10 @@ class Engine: """Job scheduling simulation engine.""" def __init__(self, *, + start: datetime, + timestep_start: int, + timestep_end: int, + time_delta: int, power_manager: PowerManager, flops_manager: FLOPSManager, telemetry: Telemetry, @@ -138,6 +143,12 @@ class Engine: sim_config: SimConfig, system_config: SystemConfig, ): + self.start = start + self.timestep_start = timestep_start + self.timestep_end = timestep_end + self.time_delta = time_delta + self.sim_config = sim_config + self.system_config = system_config self.config = system_config.get_legacy() self.down_nodes = summarize_ranges(self.config['DOWN_NODES']) self.resource_manager = ResourceManager( @@ -321,6 +332,10 @@ class Engine: accounts = job_accounts engine = Engine( + start=start, + timestep_start=wd.telemetry_start, + timestep_end=wd.telemetry_end, + time_delta=time_delta, power_manager=power_manager, flops_manager=flops_manager, cooling_model=cooling_model, @@ -714,7 +729,7 @@ class Engine: self.scheduler.policy = target_policy self.scheduler.bfpolicy = target_bfpolicy - def run_simulation(self, timestep_start, timestep_end, time_delta=1, autoshutdown=False): + def run_simulation(self, autoshutdown=False): """Generator that yields after each simulation tick.""" if self.scheduler.policy == PolicyType.REPLAY: @@ -729,7 +744,9 @@ class Engine: f"{self.jobs[0].submit_time}, start_time: {self.jobs[0].start_time}") # Set times and place jobs that are currently running, onto the system. - self.prepare_system_state(all_jobs=self.jobs, timestep_start=timestep_start, timestep_end=timestep_end) + self.prepare_system_state(all_jobs=self.jobs, + timestep_start=self.timestep_start, timestep_end=self.timestep_end, + ) # Process jobs in batches for better performance of timestep loop all_jobs = self.jobs.copy() @@ -738,9 +755,9 @@ class Engine: jobs = [] # Batch Jobs into 6h windows based on submit_time or twice the time_delta if larger - batch_window = max(60 * 60 * 6, 2 * time_delta) # at least 6h + batch_window = max(60 * 60 * 6, 2 * self.time_delta) # at least 6h - sim_state = SimulationState(time_delta) + sim_state = SimulationState(self.time_delta) # listener_thread = threading.Thread(target=keyboard_listener, args=(sim_state,), daemon=True) # listener_thread.start() @@ -752,7 +769,7 @@ class Engine: current_time_delta = sim_state.get_time_delta() - if (self.current_timestep % batch_window == 0) or (self.current_timestep == timestep_start): + if (self.current_timestep % batch_window == 0) or (self.current_timestep == self.timestep_start): # Add jobs that are within the batching window and remove them from all jobs # jobs += [job for job in all_jobs if job.submit_time <= self.current_timestep + batch_window] # all_jobs[:] = [job for job in all_jobs if job.submit_time > self.current_timestep + batch_window] diff --git a/raps/envs/raps_env.py b/raps/envs/raps_env.py index bb435d8..aa2e945 100644 --- a/raps/envs/raps_env.py +++ b/raps/envs/raps_env.py @@ -62,9 +62,7 @@ class RAPSEnv(gym.Env): engine, workload_data, time_delta = Engine.from_sim_config(self.sim_config) engine.scheduler.env = self self.jobs = workload_data.jobs - timestep_start = workload_data.telemetry_start - timestep_end = workload_data.telemetry_end - self.generator = engine.run_simulation(timestep_start, timestep_end, time_delta) + self.generator = engine.run_simulation() return engine def reset(self, **kwargs): diff --git a/raps/multi_part_engine.py b/raps/multi_part_engine.py index 7d2d8f2..15521c8 100644 --- a/raps/multi_part_engine.py +++ b/raps/multi_part_engine.py @@ -41,13 +41,10 @@ class MultiPartEngine: return multi_engine, workloads_by_partition, timestep_start, timestep_end, time_delta - def run_simulation(self, timestep_start, timestep_end, time_delta=1 - ) -> Iterable[dict[str, TickData | None]]: + def run_simulation(self) -> Iterable[dict[str, TickData | None]]: generators = [] for part in self.partition_names: - generators.append(self.engines[part].run_simulation( - timestep_start, timestep_end, time_delta, - )) + generators.append(self.engines[part].run_simulation()) for tick_datas in zip(*generators, strict=True): yield dict(zip(self.partition_names, tick_datas)) diff --git a/raps/run_sim.py b/raps/run_sim.py index df9d44b..0d95ca0 100644 --- a/raps/run_sim.py +++ b/raps/run_sim.py @@ -76,9 +76,7 @@ def run_sim(sim_config: SingleSimConfig): debug=sim_config.debug, total_timesteps=total_timesteps, args_dict=sim_config.get_legacy_args_dict(), **sim_config.system_configs[0].get_legacy(), ) - layout_manager.run( - timestep_start=timestep_start, timestep_end=timestep_end, time_delta=time_delta, - ) + layout_manager.run() engine_stats = get_engine_stats(engine) job_stats = get_job_stats(engine) @@ -237,7 +235,7 @@ def run_parts_sim(sim_config: MultiPartSimConfig): (out / 'sim_config.yaml').write_text(sim_config.dump_yaml()) ui_update_freq = sim_config.system_configs[0].scheduler.ui_update_freq - gen = multi_engine.run_simulation(timestep_start, timestep_end, time_delta) + gen = multi_engine.run_simulation() for tick_datas in gen: sys_power = 0 diff --git a/raps/ui.py b/raps/ui.py index b9a4694..6330bc9 100644 --- a/raps/ui.py +++ b/raps/ui.py @@ -556,7 +556,7 @@ class LayoutManager: data.system_util, uncertainties=uncertainties, ) - def run(self, timestep_start, timestep_end, time_delta): + def run(self): """ Runs the UI, blocking until the simulation is complete """ if not self.debug and not self.noui: context = Live(self.layout, auto_refresh=True, refresh_per_second=3) @@ -565,12 +565,11 @@ class LayoutManager: try: with context: # last_i = 0 - for i, data in enumerate(self.engine.run_simulation(timestep_start, - timestep_end, - time_delta, - autoshutdown=True)): + for i, data in enumerate(self.engine.run_simulation(autoshutdown=True)): if data and (not self.debug and not self.noui): - self.update_full_layout(data, time_delta, timestep_start=timestep_start) + self.update_full_layout(data, + self.engine.time_delta, + timestep_start=self.engine.timestep_start) # self.update_progress_bar(i-last_i) # last_i=i if not self.debug and not self.noui: diff --git a/tests/util.py b/tests/util.py index b9e8005..59e3179 100644 --- a/tests/util.py +++ b/tests/util.py @@ -61,9 +61,7 @@ def run_engine(sim_config, include_ticks=False) -> tuple[Engine, dict[str, Any]] sim_config = SingleSimConfig.model_validate(sim_config) engine, workload_data, time_delta = Engine.from_sim_config(sim_config) - timestep_start = workload_data.telemetry_start - timestep_end = workload_data.telemetry_end - gen = engine.run_simulation(timestep_start, timestep_end, time_delta) + gen = engine.run_simulation() stats = { "tick_count": 0, @@ -92,7 +90,7 @@ def run_multi_part_engine(sim_config, include_ticks=False) -> tuple[MultiPartEng sim_config = MultiPartSimConfig.model_validate(sim_config) multi_engine, workload_results, timestep_start, timestep_end, time_delta = \ MultiPartEngine.from_sim_config(sim_config) - gen = multi_engine.run_simulation(timestep_start, timestep_end, time_delta) + gen = multi_engine.run_simulation() stats = { "tick_count": 0, -- GitLab From 7227bd26ba4c67b3c04f678c3ed17e7c1ad9fbaa Mon Sep 17 00:00:00 2001 From: Jesse Hines Date: Mon, 15 Sep 2025 13:37:08 -0400 Subject: [PATCH 4/6] Simplify from_sim_config return --- raps/engine.py | 13 ++++++++++++- raps/envs/raps_env.py | 4 ++-- raps/multi_part_engine.py | 25 +++++++++---------------- raps/run_sim.py | 15 +++++++-------- 4 files changed, 30 insertions(+), 27 deletions(-) diff --git a/raps/engine.py b/raps/engine.py index 7a12040..a67fc55 100644 --- a/raps/engine.py +++ b/raps/engine.py @@ -16,6 +16,7 @@ from raps.policy import PolicyType from raps.utils import ( summarize_ranges, get_current_utilization, + WorkloadData, ) from raps.resmgr import ResourceManager from raps.schedulers import load_scheduler @@ -275,6 +276,9 @@ class Engine: jobs = wd.jobs if len(jobs) == 0: print(f"Warning no jobs found for {partition or 'system'}") + if partition and len(sim_config.system_configs) > 1: + for job in jobs: # TODO: I don't think Job.partition is actually used anywhere? + job.partition = partition if sim_config.start: start = sim_config.start @@ -347,7 +351,14 @@ class Engine: system_config=system_config, ) - return engine, wd, time_delta + return engine + + def get_workload_data(self) -> WorkloadData: + return WorkloadData( + jobs=self.jobs[:], + telemetry_start=self.timestep_start, telemetry_end=self.timestep_end, + start_date=self.start, + ) def add_running_jobs_to_queue(self, jobs_to_submit: List): """ diff --git a/raps/envs/raps_env.py b/raps/envs/raps_env.py index aa2e945..a5b6146 100644 --- a/raps/envs/raps_env.py +++ b/raps/envs/raps_env.py @@ -59,9 +59,9 @@ class RAPSEnv(gym.Env): self.action_space = spaces.Discrete(max_jobs) def _create_engine(self): - engine, workload_data, time_delta = Engine.from_sim_config(self.sim_config) + engine = Engine.from_sim_config(self.sim_config) engine.scheduler.env = self - self.jobs = workload_data.jobs + self.jobs = engine.jobs self.generator = engine.run_simulation() return engine diff --git a/raps/multi_part_engine.py b/raps/multi_part_engine.py index 15521c8..a5d62bf 100644 --- a/raps/multi_part_engine.py +++ b/raps/multi_part_engine.py @@ -1,13 +1,16 @@ from collections.abc import Iterable from raps.engine import Engine, TickData from raps.sim_config import MultiPartSimConfig -from raps.utils import WorkloadData class MultiPartEngine: def __init__(self, *, engines: dict[str, Engine]): self.partition_names = sorted(engines.keys()) self.engines = engines + first_engine = list(engines.values())[0] + self.start = first_engine.start + self.timestep_start = first_engine.timestep_start + self.timestep_end = first_engine.timestep_end @staticmethod def from_sim_config(sim_config: MultiPartSimConfig): @@ -17,29 +20,19 @@ class MultiPartEngine: if len(root_systems) > 1: raise ValueError("Replay for multi-system runs is not supported") - workloads_by_partition: dict[str, WorkloadData] = {} engines: dict[str, Engine] = {} - time_delta = 0 for partition in sim_config.system_configs: - name = partition.system_name - engine, workload_data, time_delta = Engine.from_sim_config( - sim_config, partition=name, - ) - for job in workload_data.jobs: - job.partition = name - workloads_by_partition[name] = workload_data - engines[name] = engine - timestep_start = min(w.telemetry_start for w in workloads_by_partition.values()) - timestep_end = min(w.telemetry_end for w in workloads_by_partition.values()) - - total_initial_jobs = sum(len(j.jobs) for j in workloads_by_partition.values()) + engine = Engine.from_sim_config(sim_config, partition=partition.system_name) + engines[partition.system_name] = engine + + total_initial_jobs = sum(len(e.jobs) for e in engines.values()) for engine in engines.values(): engine.total_initial_jobs = total_initial_jobs multi_engine = MultiPartEngine(engines=engines) - return multi_engine, workloads_by_partition, timestep_start, timestep_end, time_delta + return multi_engine def run_simulation(self) -> Iterable[dict[str, TickData | None]]: generators = [] diff --git a/raps/run_sim.py b/raps/run_sim.py index 0d95ca0..7a09f60 100644 --- a/raps/run_sim.py +++ b/raps/run_sim.py @@ -49,27 +49,27 @@ def run_sim(sim_config: SingleSimConfig): print("Use run-parts to run multi-partition simulations") sys.exit(1) - engine, workload_data, time_delta = Engine.from_sim_config(sim_config) + engine = Engine.from_sim_config(sim_config) out = sim_config.get_output() if out: out.mkdir(parents=True) engine.telemetry.save_snapshot( dest=str(out / 'snapshot.npz'), - result=workload_data, + result=engine.get_workload_data(), args=sim_config, ) (out / 'sim_config.yaml').write_text(sim_config.dump_yaml()) - jobs = workload_data.jobs - timestep_start, timestep_end = workload_data.telemetry_start, workload_data.telemetry_end + jobs = engine.jobs + timestep_start, timestep_end = engine.timestep_start, engine.timestep_end total_timesteps = timestep_end - timestep_start downscale = sim_config.downscale downscale_str = ""if downscale == 1 else f"/{downscale}" print(f"Simulating {len(jobs)} jobs for {total_timesteps}{downscale_str}" f" seconds from {timestep_start} to {timestep_end}.") - print(f"Simulation time delta: {time_delta}{downscale_str} s," + print(f"Simulation time delta: {engine.time_delta}{downscale_str} s," f"Telemetry trace quanta: {jobs[0].trace_quanta}{downscale_str} s.") layout_manager = LayoutManager( sim_config.layout, engine=engine, @@ -220,8 +220,7 @@ def run_parts_sim(sim_config: MultiPartSimConfig): UserWarning ) - multi_engine, workload_results, timestep_start, timestep_end, time_delta = \ - MultiPartEngine.from_sim_config(sim_config) + multi_engine = MultiPartEngine.from_sim_config(sim_config) out = sim_config.get_output() if out: @@ -229,7 +228,7 @@ def run_parts_sim(sim_config: MultiPartSimConfig): for part, engine in multi_engine.engines.items(): engine.telemetry.save_snapshot( dest=str(out / part.split('/')[-1]), - result=workload_results[part], + result=engine.get_workload_data(), args=sim_config, ) (out / 'sim_config.yaml').write_text(sim_config.dump_yaml()) -- GitLab From 11fb9fe87003b2719fdd825279c37e6aa2a4ff96 Mon Sep 17 00:00:00 2001 From: Jesse Hines Date: Mon, 15 Sep 2025 14:05:05 -0400 Subject: [PATCH 5/6] Combine from_sim_config and init --- raps/engine.py | 206 ++++++++++++++++---------------------- raps/envs/raps_env.py | 2 +- raps/multi_part_engine.py | 22 ++-- raps/run_sim.py | 4 +- tests/util.py | 5 +- 5 files changed, 99 insertions(+), 140 deletions(-) diff --git a/raps/engine.py b/raps/engine.py index a67fc55..4c125c7 100644 --- a/raps/engine.py +++ b/raps/engine.py @@ -127,105 +127,7 @@ def keyboard_listener(state): class Engine: """Job scheduling simulation engine.""" - def __init__(self, *, - start: datetime, - timestep_start: int, - timestep_end: int, - time_delta: int, - power_manager: PowerManager, - flops_manager: FLOPSManager, - telemetry: Telemetry, - cooling_model: ThermoFluidsModel | None = None, - jobs: list[Job], - total_initial_jobs=0, - # Workload class to generate from for continuous generation - continuous_workload: Workload | None = None, - accounts=None, - sim_config: SimConfig, - system_config: SystemConfig, - ): - self.start = start - self.timestep_start = timestep_start - self.timestep_end = timestep_end - self.time_delta = time_delta - self.sim_config = sim_config - self.system_config = system_config - self.config = system_config.get_legacy() - self.down_nodes = summarize_ranges(self.config['DOWN_NODES']) - self.resource_manager = ResourceManager( - total_nodes=self.config['TOTAL_NODES'], - down_nodes=self.config['DOWN_NODES'], - config=self.config - ) - # Initialize running and queue, etc. - self.running = [] - self.queue = [] - self.accounts = accounts - self.telemetry = telemetry - self.job_history_dict = [] - self.jobs_completed = 0 - self.jobs_killed = 0 - self.jobs = jobs - self.total_initial_jobs = total_initial_jobs - self.current_timestep = 0 - self.cooling_model = cooling_model - self.sys_power = 0 - self.power_manager = power_manager - self.flops_manager = flops_manager - self.debug = sim_config.debug - self.continuous_workload = continuous_workload - self.replay = sim_config.replay - self.downscale = sim_config.downscale # Factor to downscale the 1s timesteps (power of 10) - self.simulate_network = sim_config.simulate_network - self.sys_util_history = [] - self.scheduler_queue_history = [] - self.scheduler_running_history = [] - self.avg_net_tx = [] - self.avg_net_rx = [] - self.net_util_history = [] - self.avg_slowdown_history = [] - self.max_slowdown_history = [] - self.node_occupancy_history = [] - self.downtime = Downtime(first_downtime=sim_config.downtime_first_int, - downtime_interval=sim_config.downtime_interval_int, - downtime_length=sim_config.downtime_length_int, - debug=sim_config.debug, - ) - - # Set scheduler type - either based on config or command-line args - defaults to 'default' - if self.config['multitenant']: - scheduler_type = 'multitenant' - else: - scheduler_type = sim_config.scheduler - - policy_type = sim_config.policy - backfill_type = sim_config.backfill - - self.scheduler = load_scheduler(scheduler_type)( - config=self.config, - policy=policy_type, - bfpolicy=backfill_type, - resource_manager=self.resource_manager, - jobs=jobs - ) - if sim_config.live: - assert self.scheduler.policy != PolicyType.REPLAY, \ - "Cannot replay from a live system. Choose a scheduling policy!" - print(f"Using scheduler: {str(self.scheduler.__class__).split('.')[2]}" - f", with policy {self.scheduler.policy} " - f"and backfill {self.scheduler.bfpolicy}") - - if self.simulate_network: - available_nodes = self.resource_manager.available_nodes - self.network_model = NetworkModel( - available_nodes=available_nodes, - config=self.config, - ) - else: - self.network_model = None - - @staticmethod - def from_sim_config(sim_config: SimConfig, partition: str | None = None): + def __init__(self, sim_config: SimConfig, partition: str | None = None): if partition: system_config = sim_config.get_system_config_by_name(partition) elif len(sim_config.system_configs) > 1: @@ -247,12 +149,12 @@ class Engine: np.random.seed(sim_config.seed + 1) if sim_config.live and not sim_config.replay: - td = Telemetry(**sim_config_dict) - wd = td.load_from_live_system() + telemetry = Telemetry(**sim_config_dict) + wd = telemetry.load_from_live_system() elif sim_config.replay: # TODO: this will have issues if running separate systems or custom systems partition_short = partition.split("/")[-1] if partition else None - td = Telemetry( + telemetry = Telemetry( **sim_config_dict, partition=partition, ) @@ -267,17 +169,17 @@ class Engine: else: replay_files = sim_config.replay - wd = td.load_from_files(replay_files) + wd = telemetry.load_from_files(replay_files) else: # Synthetic jobs wl = Workload(sim_config_args, system_config_dict) wd = wl.generate_jobs() - td = Telemetry(**sim_config_dict) + telemetry = Telemetry(**sim_config_dict) jobs = wd.jobs if len(jobs) == 0: print(f"Warning no jobs found for {partition or 'system'}") if partition and len(sim_config.system_configs) > 1: - for job in jobs: # TODO: I don't think Job.partition is actually used anywhere? + for job in jobs: job.partition = partition if sim_config.start: @@ -335,23 +237,87 @@ class Engine: else: accounts = job_accounts - engine = Engine( - start=start, - timestep_start=wd.telemetry_start, - timestep_end=wd.telemetry_end, - time_delta=time_delta, - power_manager=power_manager, - flops_manager=flops_manager, - cooling_model=cooling_model, - continuous_workload=continuous_workload, - jobs=jobs, - accounts=accounts, - telemetry=td, - sim_config=sim_config, - system_config=system_config, + self.sim_config = sim_config + self.system_config = system_config + self.config = system_config.get_legacy() + + self.start = start + self.timestep_start = wd.telemetry_start + self.timestep_end = wd.telemetry_end + self.time_delta = time_delta + + self.down_nodes = summarize_ranges(self.config['DOWN_NODES']) + self.resource_manager = ResourceManager( + total_nodes=self.config['TOTAL_NODES'], + down_nodes=self.config['DOWN_NODES'], + config=self.config ) + # Initialize running and queue, etc. + self.running = [] + self.queue = [] + self.accounts = accounts + self.telemetry = telemetry + self.job_history_dict = [] + self.jobs_completed = 0 + self.jobs_killed = 0 + self.jobs = jobs + self.total_initial_jobs = len(jobs) + self.current_timestep = 0 + self.cooling_model = cooling_model + self.sys_power = 0 + self.power_manager = power_manager + self.flops_manager = flops_manager + self.debug = sim_config.debug + self.continuous_workload = continuous_workload + self.replay = sim_config.replay + self.downscale = sim_config.downscale # Factor to downscale the 1s timesteps (power of 10) + self.simulate_network = sim_config.simulate_network + self.sys_util_history = [] + self.scheduler_queue_history = [] + self.scheduler_running_history = [] + self.avg_net_tx = [] + self.avg_net_rx = [] + self.net_util_history = [] + self.avg_slowdown_history = [] + self.max_slowdown_history = [] + self.node_occupancy_history = [] + self.downtime = Downtime(first_downtime=sim_config.downtime_first_int, + downtime_interval=sim_config.downtime_interval_int, + downtime_length=sim_config.downtime_length_int, + debug=sim_config.debug, + ) + + # Set scheduler type - either based on config or command-line args - defaults to 'default' + if self.config['multitenant']: + scheduler_type = 'multitenant' + else: + scheduler_type = sim_config.scheduler + + policy_type = sim_config.policy + backfill_type = sim_config.backfill + + self.scheduler = load_scheduler(scheduler_type)( + config=self.config, + policy=policy_type, + bfpolicy=backfill_type, + resource_manager=self.resource_manager, + jobs=jobs + ) + if sim_config.live: + assert self.scheduler.policy != PolicyType.REPLAY, \ + "Cannot replay from a live system. Choose a scheduling policy!" + print(f"Using scheduler: {str(self.scheduler.__class__).split('.')[2]}" + f", with policy {self.scheduler.policy} " + f"and backfill {self.scheduler.bfpolicy}") - return engine + if self.simulate_network: + available_nodes = self.resource_manager.available_nodes + self.network_model = NetworkModel( + available_nodes=available_nodes, + config=self.config, + ) + else: + self.network_model = None def get_workload_data(self) -> WorkloadData: return WorkloadData( diff --git a/raps/envs/raps_env.py b/raps/envs/raps_env.py index a5b6146..e4ca8eb 100644 --- a/raps/envs/raps_env.py +++ b/raps/envs/raps_env.py @@ -59,7 +59,7 @@ class RAPSEnv(gym.Env): self.action_space = spaces.Discrete(max_jobs) def _create_engine(self): - engine = Engine.from_sim_config(self.sim_config) + engine = Engine(self.sim_config) engine.scheduler.env = self self.jobs = engine.jobs self.generator = engine.run_simulation() diff --git a/raps/multi_part_engine.py b/raps/multi_part_engine.py index a5d62bf..6332aa5 100644 --- a/raps/multi_part_engine.py +++ b/raps/multi_part_engine.py @@ -4,16 +4,7 @@ from raps.sim_config import MultiPartSimConfig class MultiPartEngine: - def __init__(self, *, engines: dict[str, Engine]): - self.partition_names = sorted(engines.keys()) - self.engines = engines - first_engine = list(engines.values())[0] - self.start = first_engine.start - self.timestep_start = first_engine.timestep_start - self.timestep_end = first_engine.timestep_end - - @staticmethod - def from_sim_config(sim_config: MultiPartSimConfig): + def __init__(self, sim_config: MultiPartSimConfig): if sim_config.replay: root_systems = set(s.system_name.split("/")[0] for s in sim_config.system_configs) # TODO should consider how to pass separate replay values for separate systems @@ -23,16 +14,19 @@ class MultiPartEngine: engines: dict[str, Engine] = {} for partition in sim_config.system_configs: - engine = Engine.from_sim_config(sim_config, partition=partition.system_name) + engine = Engine(sim_config, partition=partition.system_name) engines[partition.system_name] = engine total_initial_jobs = sum(len(e.jobs) for e in engines.values()) for engine in engines.values(): engine.total_initial_jobs = total_initial_jobs - multi_engine = MultiPartEngine(engines=engines) - - return multi_engine + self.partition_names = sorted(engines.keys()) + self.engines = engines + first_engine = list(engines.values())[0] + self.start = first_engine.start + self.timestep_start = first_engine.timestep_start + self.timestep_end = first_engine.timestep_end def run_simulation(self) -> Iterable[dict[str, TickData | None]]: generators = [] diff --git a/raps/run_sim.py b/raps/run_sim.py index 7a09f60..51bf6f5 100644 --- a/raps/run_sim.py +++ b/raps/run_sim.py @@ -49,7 +49,7 @@ def run_sim(sim_config: SingleSimConfig): print("Use run-parts to run multi-partition simulations") sys.exit(1) - engine = Engine.from_sim_config(sim_config) + engine = Engine(sim_config) out = sim_config.get_output() if out: @@ -220,7 +220,7 @@ def run_parts_sim(sim_config: MultiPartSimConfig): UserWarning ) - multi_engine = MultiPartEngine.from_sim_config(sim_config) + multi_engine = MultiPartEngine(sim_config) out = sim_config.get_output() if out: diff --git a/tests/util.py b/tests/util.py index 59e3179..4bbf8f8 100644 --- a/tests/util.py +++ b/tests/util.py @@ -60,7 +60,7 @@ def run_engine(sim_config, include_ticks=False) -> tuple[Engine, dict[str, Any]] print(f"Command to reproduce run:\n {_get_cmd(sim_config, "run")}") sim_config = SingleSimConfig.model_validate(sim_config) - engine, workload_data, time_delta = Engine.from_sim_config(sim_config) + engine = Engine(sim_config) gen = engine.run_simulation() stats = { @@ -88,8 +88,7 @@ def run_multi_part_engine(sim_config, include_ticks=False) -> tuple[MultiPartEng print(f"Command to reproduce run:\n {_get_cmd(sim_config, "run-parts")}") sim_config = MultiPartSimConfig.model_validate(sim_config) - multi_engine, workload_results, timestep_start, timestep_end, time_delta = \ - MultiPartEngine.from_sim_config(sim_config) + multi_engine = MultiPartEngine(sim_config) gen = multi_engine.run_simulation() stats = { -- GitLab From 3896f23bfab3c6bcdca3a25f3163a91b29c520df Mon Sep 17 00:00:00 2001 From: Jesse Hines Date: Mon, 15 Sep 2025 15:33:26 -0400 Subject: [PATCH 6/6] Formatting --- raps/engine.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/raps/engine.py b/raps/engine.py index 4c125c7..ec02adb 100644 --- a/raps/engine.py +++ b/raps/engine.py @@ -10,7 +10,6 @@ import os import select import time import random -from datetime import datetime from raps.job import Job, JobState from raps.policy import PolicyType from raps.utils import ( @@ -41,7 +40,6 @@ from raps.account import Accounts from raps.downtime import Downtime from raps.weather import Weather from raps.sim_config import SimConfig -from raps.system_config import SystemConfig from bisect import bisect_right -- GitLab