diff --git a/raps/cooling.py b/raps/cooling.py index 066c12c3318a847f19a98f3a0031decc10be90b1..68a8a30a86a72a30c639661c16620f2c20fa32b1 100644 --- a/raps/cooling.py +++ b/raps/cooling.py @@ -155,8 +155,8 @@ class ThermoFluidsModel: # If replay mode is on and weather data is available if self.weather and self.weather.has_coords: # Convert total seconds to timedelta object - delta = timedelta(seconds=engine.current_timestep) - target_datetime = self.weather.start + delta + delta = timedelta(seconds=engine.current_timestep - engine.timestep_start) + target_datetime = engine.start + delta # Get temperature from weather data temperature = self.weather.get_temperature(target_datetime) or self.config['WET_BULB_TEMP'] diff --git a/raps/engine.py b/raps/engine.py index 8d7e2310b4708daf464ae3250824bb4ca48fcefb..ec02adbf2ff6f7089efc8f3546d109c86283d0ea 100644 --- a/raps/engine.py +++ b/raps/engine.py @@ -15,6 +15,7 @@ from raps.policy import PolicyType from raps.utils import ( summarize_ranges, get_current_utilization, + WorkloadData, ) from raps.resmgr import ResourceManager from raps.schedulers import load_scheduler @@ -39,7 +40,6 @@ from raps.account import Accounts from raps.downtime import Downtime from raps.weather import Weather from raps.sim_config import SimConfig -from raps.system_config import SystemConfig from bisect import bisect_right @@ -125,94 +125,7 @@ def keyboard_listener(state): class Engine: """Job scheduling simulation engine.""" - def __init__(self, *, - power_manager: PowerManager, - flops_manager: FLOPSManager, - telemetry: Telemetry, - cooling_model: ThermoFluidsModel | None = None, - jobs=None, - total_initial_jobs=0, - # Workload class to generate from for continuous generation - continuous_workload: Workload | None = None, - accounts=None, - sim_config: SimConfig, - system_config: SystemConfig, - ): - self.config = system_config.get_legacy() - self.down_nodes = summarize_ranges(self.config['DOWN_NODES']) - self.resource_manager = ResourceManager( - total_nodes=self.config['TOTAL_NODES'], - down_nodes=self.config['DOWN_NODES'], - config=self.config - ) - # Initialize running and queue, etc. - self.running = [] - self.queue = [] - self.accounts = accounts - self.telemetry = telemetry - self.job_history_dict = [] - self.jobs_completed = 0 - self.jobs_killed = 0 - self.total_initial_jobs = total_initial_jobs - self.current_timestep = 0 - self.cooling_model = cooling_model - self.sys_power = 0 - self.power_manager = power_manager - self.flops_manager = flops_manager - self.debug = sim_config.debug - self.continuous_workload = continuous_workload - self.replay = sim_config.replay - self.downscale = sim_config.downscale # Factor to downscale the 1s timesteps (power of 10) - self.simulate_network = sim_config.simulate_network - self.sys_util_history = [] - self.scheduler_queue_history = [] - self.scheduler_running_history = [] - self.avg_net_tx = [] - self.avg_net_rx = [] - self.net_util_history = [] - self.avg_slowdown_history = [] - self.max_slowdown_history = [] - self.node_occupancy_history = [] - self.downtime = Downtime(first_downtime=sim_config.downtime_first_int, - downtime_interval=sim_config.downtime_interval_int, - downtime_length=sim_config.downtime_length_int, - debug=sim_config.debug, - ) - - # Set scheduler type - either based on config or command-line args - defaults to 'default' - if self.config['multitenant']: - scheduler_type = 'multitenant' - else: - scheduler_type = sim_config.scheduler - - policy_type = sim_config.policy - backfill_type = sim_config.backfill - - self.scheduler = load_scheduler(scheduler_type)( - config=self.config, - policy=policy_type, - bfpolicy=backfill_type, - resource_manager=self.resource_manager, - jobs=jobs - ) - if sim_config.live: - assert self.scheduler.policy != PolicyType.REPLAY, \ - "Cannot replay from a live system. Choose a scheduling policy!" - print(f"Using scheduler: {str(self.scheduler.__class__).split('.')[2]}" - f", with policy {self.scheduler.policy} " - f"and backfill {self.scheduler.bfpolicy}") - - if self.simulate_network: - available_nodes = self.resource_manager.available_nodes - self.network_model = NetworkModel( - available_nodes=available_nodes, - config=self.config, - ) - else: - self.network_model = None - - @staticmethod - def from_sim_config(sim_config: SimConfig, partition: str | None = None): + def __init__(self, sim_config: SimConfig, partition: str | None = None): if partition: system_config = sim_config.get_system_config_by_name(partition) elif len(sim_config.system_configs) > 1: @@ -234,12 +147,12 @@ class Engine: np.random.seed(sim_config.seed + 1) if sim_config.live and not sim_config.replay: - td = Telemetry(**sim_config_dict) - wd = td.load_from_live_system() + telemetry = Telemetry(**sim_config_dict) + wd = telemetry.load_from_live_system() elif sim_config.replay: # TODO: this will have issues if running separate systems or custom systems partition_short = partition.split("/")[-1] if partition else None - td = Telemetry( + telemetry = Telemetry( **sim_config_dict, partition=partition, ) @@ -254,15 +167,18 @@ class Engine: else: replay_files = sim_config.replay - wd = td.load_from_files(replay_files) + wd = telemetry.load_from_files(replay_files) else: # Synthetic jobs wl = Workload(sim_config_args, system_config_dict) wd = wl.generate_jobs() - td = Telemetry(**sim_config_dict) + telemetry = Telemetry(**sim_config_dict) jobs = wd.jobs if len(jobs) == 0: print(f"Warning no jobs found for {partition or 'system'}") + if partition and len(sim_config.system_configs) > 1: + for job in jobs: + job.partition = partition if sim_config.start: start = sim_config.start @@ -319,19 +235,94 @@ class Engine: else: accounts = job_accounts - engine = Engine( - power_manager=power_manager, - flops_manager=flops_manager, - cooling_model=cooling_model, - continuous_workload=continuous_workload, - jobs=jobs, - accounts=accounts, - telemetry=td, - sim_config=sim_config, - system_config=system_config, + self.sim_config = sim_config + self.system_config = system_config + self.config = system_config.get_legacy() + + self.start = start + self.timestep_start = wd.telemetry_start + self.timestep_end = wd.telemetry_end + self.time_delta = time_delta + + self.down_nodes = summarize_ranges(self.config['DOWN_NODES']) + self.resource_manager = ResourceManager( + total_nodes=self.config['TOTAL_NODES'], + down_nodes=self.config['DOWN_NODES'], + config=self.config ) + # Initialize running and queue, etc. + self.running = [] + self.queue = [] + self.accounts = accounts + self.telemetry = telemetry + self.job_history_dict = [] + self.jobs_completed = 0 + self.jobs_killed = 0 + self.jobs = jobs + self.total_initial_jobs = len(jobs) + self.current_timestep = 0 + self.cooling_model = cooling_model + self.sys_power = 0 + self.power_manager = power_manager + self.flops_manager = flops_manager + self.debug = sim_config.debug + self.continuous_workload = continuous_workload + self.replay = sim_config.replay + self.downscale = sim_config.downscale # Factor to downscale the 1s timesteps (power of 10) + self.simulate_network = sim_config.simulate_network + self.sys_util_history = [] + self.scheduler_queue_history = [] + self.scheduler_running_history = [] + self.avg_net_tx = [] + self.avg_net_rx = [] + self.net_util_history = [] + self.avg_slowdown_history = [] + self.max_slowdown_history = [] + self.node_occupancy_history = [] + self.downtime = Downtime(first_downtime=sim_config.downtime_first_int, + downtime_interval=sim_config.downtime_interval_int, + downtime_length=sim_config.downtime_length_int, + debug=sim_config.debug, + ) + + # Set scheduler type - either based on config or command-line args - defaults to 'default' + if self.config['multitenant']: + scheduler_type = 'multitenant' + else: + scheduler_type = sim_config.scheduler + + policy_type = sim_config.policy + backfill_type = sim_config.backfill + + self.scheduler = load_scheduler(scheduler_type)( + config=self.config, + policy=policy_type, + bfpolicy=backfill_type, + resource_manager=self.resource_manager, + jobs=jobs + ) + if sim_config.live: + assert self.scheduler.policy != PolicyType.REPLAY, \ + "Cannot replay from a live system. Choose a scheduling policy!" + print(f"Using scheduler: {str(self.scheduler.__class__).split('.')[2]}" + f", with policy {self.scheduler.policy} " + f"and backfill {self.scheduler.bfpolicy}") + + if self.simulate_network: + available_nodes = self.resource_manager.available_nodes + self.network_model = NetworkModel( + available_nodes=available_nodes, + config=self.config, + ) + else: + self.network_model = None - return engine, wd, time_delta + def get_workload_data(self) -> WorkloadData: + return WorkloadData( + jobs=self.jobs[:], + telemetry_start=self.timestep_start, telemetry_end=self.timestep_end, + start_date=self.start, + ) def add_running_jobs_to_queue(self, jobs_to_submit: List): """ @@ -713,7 +704,7 @@ class Engine: self.scheduler.policy = target_policy self.scheduler.bfpolicy = target_bfpolicy - def run_simulation(self, jobs, timestep_start, timestep_end, time_delta=1, autoshutdown=False): + def run_simulation(self, autoshutdown=False): """Generator that yields after each simulation tick.""" if self.scheduler.policy == PolicyType.REPLAY: @@ -722,24 +713,26 @@ class Engine: replay = False if self.debug: - print(f"[DEBUG] run_simulation: Initial jobs count: {len(jobs)}") - if jobs: + print(f"[DEBUG] run_simulation: Initial jobs count: {len(self.jobs)}") + if self.jobs: print("[DEBUG] run_simulation: First job submit_time: " - f"{jobs[0].submit_time}, start_time: {jobs[0].start_time}") + f"{self.jobs[0].submit_time}, start_time: {self.jobs[0].start_time}") # Set times and place jobs that are currently running, onto the system. - self.prepare_system_state(all_jobs=jobs, timestep_start=timestep_start, timestep_end=timestep_end) + self.prepare_system_state(all_jobs=self.jobs, + timestep_start=self.timestep_start, timestep_end=self.timestep_end, + ) # Process jobs in batches for better performance of timestep loop - all_jobs = jobs.copy() + all_jobs = self.jobs.copy() submit_times = [j.submit_time for j in all_jobs] cursor = 0 jobs = [] # Batch Jobs into 6h windows based on submit_time or twice the time_delta if larger - batch_window = max(60 * 60 * 6, 2 * time_delta) # at least 6h + batch_window = max(60 * 60 * 6, 2 * self.time_delta) # at least 6h - sim_state = SimulationState(time_delta) + sim_state = SimulationState(self.time_delta) # listener_thread = threading.Thread(target=keyboard_listener, args=(sim_state,), daemon=True) # listener_thread.start() @@ -751,7 +744,7 @@ class Engine: current_time_delta = sim_state.get_time_delta() - if (self.current_timestep % batch_window == 0) or (self.current_timestep == timestep_start): + if (self.current_timestep % batch_window == 0) or (self.current_timestep == self.timestep_start): # Add jobs that are within the batching window and remove them from all jobs # jobs += [job for job in all_jobs if job.submit_time <= self.current_timestep + batch_window] # all_jobs[:] = [job for job in all_jobs if job.submit_time > self.current_timestep + batch_window] diff --git a/raps/envs/raps_env.py b/raps/envs/raps_env.py index ec9fcad464972f3d56e6d3c3bc80c5367e41c2b4..e4ca8eb9a6e6178865ba79cfed6285af45c22f84 100644 --- a/raps/envs/raps_env.py +++ b/raps/envs/raps_env.py @@ -59,12 +59,10 @@ class RAPSEnv(gym.Env): self.action_space = spaces.Discrete(max_jobs) def _create_engine(self): - engine, workload_data, time_delta = Engine.from_sim_config(self.sim_config) + engine = Engine(self.sim_config) engine.scheduler.env = self - self.jobs = workload_data.jobs - timestep_start = workload_data.telemetry_start - timestep_end = workload_data.telemetry_end - self.generator = engine.run_simulation(self.jobs, timestep_start, timestep_end, time_delta) + self.jobs = engine.jobs + self.generator = engine.run_simulation() return engine def reset(self, **kwargs): diff --git a/raps/multi_part_engine.py b/raps/multi_part_engine.py index 57e3e27d28e44b9eb72dec37d327360cea275a85..6332aa5ac17afd5bed918d99ddc6a67734e8f6f9 100644 --- a/raps/multi_part_engine.py +++ b/raps/multi_part_engine.py @@ -1,57 +1,37 @@ from collections.abc import Iterable from raps.engine import Engine, TickData from raps.sim_config import MultiPartSimConfig -from raps.utils import WorkloadData class MultiPartEngine: - def __init__(self, engines: dict[str, Engine], jobs: dict[str, list]): - self.partition_names = sorted(engines.keys()) - self.engines = engines - self.jobs = jobs - - @staticmethod - def from_sim_config(sim_config: MultiPartSimConfig): + def __init__(self, sim_config: MultiPartSimConfig): if sim_config.replay: root_systems = set(s.system_name.split("/")[0] for s in sim_config.system_configs) # TODO should consider how to pass separate replay values for separate systems if len(root_systems) > 1: raise ValueError("Replay for multi-system runs is not supported") - workloads_by_partition: dict[str, WorkloadData] = {} engines: dict[str, Engine] = {} - time_delta = 0 for partition in sim_config.system_configs: - name = partition.system_name - engine, workload_data, time_delta = Engine.from_sim_config( - sim_config, partition=name, - ) - for job in workload_data.jobs: - job.partition = name - workloads_by_partition[name] = workload_data - engines[name] = engine - timestep_start = min(w.telemetry_start for w in workloads_by_partition.values()) - timestep_end = min(w.telemetry_end for w in workloads_by_partition.values()) - - total_initial_jobs = sum(len(j.jobs) for j in workloads_by_partition.values()) + engine = Engine(sim_config, partition=partition.system_name) + engines[partition.system_name] = engine + + total_initial_jobs = sum(len(e.jobs) for e in engines.values()) for engine in engines.values(): engine.total_initial_jobs = total_initial_jobs - multi_engine = MultiPartEngine( - engines=engines, - jobs={p: w.jobs for p, w in workloads_by_partition.items()}, - ) - - return multi_engine, workloads_by_partition, timestep_start, timestep_end, time_delta + self.partition_names = sorted(engines.keys()) + self.engines = engines + first_engine = list(engines.values())[0] + self.start = first_engine.start + self.timestep_start = first_engine.timestep_start + self.timestep_end = first_engine.timestep_end - def run_simulation(self, jobs: dict, timestep_start, timestep_end, time_delta=1 - ) -> Iterable[dict[str, TickData | None]]: + def run_simulation(self) -> Iterable[dict[str, TickData | None]]: generators = [] for part in self.partition_names: - generators.append(self.engines[part].run_simulation( - jobs[part], timestep_start, timestep_end, time_delta, - )) + generators.append(self.engines[part].run_simulation()) for tick_datas in zip(*generators, strict=True): yield dict(zip(self.partition_names, tick_datas)) diff --git a/raps/run_sim.py b/raps/run_sim.py index 74ea87a0f05c24bf1d6cbea71a9fee1e506e0cc7..51bf6f5e6ce01b0e41744cae2f224f7ba5799d0a 100644 --- a/raps/run_sim.py +++ b/raps/run_sim.py @@ -49,37 +49,34 @@ def run_sim(sim_config: SingleSimConfig): print("Use run-parts to run multi-partition simulations") sys.exit(1) - engine, workload_data, time_delta = Engine.from_sim_config(sim_config) + engine = Engine(sim_config) out = sim_config.get_output() if out: out.mkdir(parents=True) engine.telemetry.save_snapshot( dest=str(out / 'snapshot.npz'), - result=workload_data, + result=engine.get_workload_data(), args=sim_config, ) (out / 'sim_config.yaml').write_text(sim_config.dump_yaml()) - jobs = workload_data.jobs - timestep_start, timestep_end = workload_data.telemetry_start, workload_data.telemetry_end + jobs = engine.jobs + timestep_start, timestep_end = engine.timestep_start, engine.timestep_end total_timesteps = timestep_end - timestep_start downscale = sim_config.downscale downscale_str = ""if downscale == 1 else f"/{downscale}" print(f"Simulating {len(jobs)} jobs for {total_timesteps}{downscale_str}" f" seconds from {timestep_start} to {timestep_end}.") - print(f"Simulation time delta: {time_delta}{downscale_str} s," + print(f"Simulation time delta: {engine.time_delta}{downscale_str} s," f"Telemetry trace quanta: {jobs[0].trace_quanta}{downscale_str} s.") layout_manager = LayoutManager( sim_config.layout, engine=engine, debug=sim_config.debug, total_timesteps=total_timesteps, args_dict=sim_config.get_legacy_args_dict(), **sim_config.system_configs[0].get_legacy(), ) - layout_manager.run( - jobs, - timestep_start=timestep_start, timestep_end=timestep_end, time_delta=time_delta, - ) + layout_manager.run() engine_stats = get_engine_stats(engine) job_stats = get_job_stats(engine) @@ -223,8 +220,7 @@ def run_parts_sim(sim_config: MultiPartSimConfig): UserWarning ) - multi_engine, workload_results, timestep_start, timestep_end, time_delta = \ - MultiPartEngine.from_sim_config(sim_config) + multi_engine = MultiPartEngine(sim_config) out = sim_config.get_output() if out: @@ -232,15 +228,13 @@ def run_parts_sim(sim_config: MultiPartSimConfig): for part, engine in multi_engine.engines.items(): engine.telemetry.save_snapshot( dest=str(out / part.split('/')[-1]), - result=workload_results[part], + result=engine.get_workload_data(), args=sim_config, ) (out / 'sim_config.yaml').write_text(sim_config.dump_yaml()) - jobs = {p: w.jobs for p, w in workload_results.items()} - ui_update_freq = sim_config.system_configs[0].scheduler.ui_update_freq - gen = multi_engine.run_simulation(jobs, timestep_start, timestep_end, time_delta) + gen = multi_engine.run_simulation() for tick_datas in gen: sys_power = 0 diff --git a/raps/sim_config.py b/raps/sim_config.py index 867dac0bd4429352df830de38ba609c5d472c412..c27a2aba21acf6ad592613254663a37e5c44b7f9 100644 --- a/raps/sim_config.py +++ b/raps/sim_config.py @@ -4,6 +4,7 @@ from pathlib import Path from functools import cached_property from datetime import timedelta from typing import Literal, Annotated as A +from annotated_types import Len import importlib from raps.schedulers.default import PolicyType, BackfillType from raps.utils import ( @@ -435,7 +436,7 @@ class SingleSimConfig(SimConfig, abc.ABC): class MultiPartSimConfig(SimConfig): - partitions: list[SystemConfig | str] + partitions: A[list[SystemConfig | str], Len(min_length=1)] """ List of multiple systems/partitions to run. Can be names of preconfigured systems, or paths to custom SystemConfig yaml files. diff --git a/raps/ui.py b/raps/ui.py index 3965935bf8672e6472bf965b5b13e4ae24a79967..6330bc984e6c261286a77bb62abace73117b6508 100644 --- a/raps/ui.py +++ b/raps/ui.py @@ -556,7 +556,7 @@ class LayoutManager: data.system_util, uncertainties=uncertainties, ) - def run(self, jobs, timestep_start, timestep_end, time_delta): + def run(self): """ Runs the UI, blocking until the simulation is complete """ if not self.debug and not self.noui: context = Live(self.layout, auto_refresh=True, refresh_per_second=3) @@ -565,13 +565,11 @@ class LayoutManager: try: with context: # last_i = 0 - for i, data in enumerate(self.engine.run_simulation(jobs, - timestep_start, - timestep_end, - time_delta, - autoshutdown=True)): + for i, data in enumerate(self.engine.run_simulation(autoshutdown=True)): if data and (not self.debug and not self.noui): - self.update_full_layout(data, time_delta, timestep_start=timestep_start) + self.update_full_layout(data, + self.engine.time_delta, + timestep_start=self.engine.timestep_start) # self.update_progress_bar(i-last_i) # last_i=i if not self.debug and not self.noui: diff --git a/tests/util.py b/tests/util.py index b5ba49578377d978eff05b019abf0c0f9e75f895..4bbf8f8c18ff074d373dc31d727e7ae7b8ff7c19 100644 --- a/tests/util.py +++ b/tests/util.py @@ -60,11 +60,8 @@ def run_engine(sim_config, include_ticks=False) -> tuple[Engine, dict[str, Any]] print(f"Command to reproduce run:\n {_get_cmd(sim_config, "run")}") sim_config = SingleSimConfig.model_validate(sim_config) - engine, workload_data, time_delta = Engine.from_sim_config(sim_config) - jobs = workload_data.jobs - timestep_start = workload_data.telemetry_start - timestep_end = workload_data.telemetry_end - gen = engine.run_simulation(jobs, timestep_start, timestep_end, time_delta) + engine = Engine(sim_config) + gen = engine.run_simulation() stats = { "tick_count": 0, @@ -91,10 +88,8 @@ def run_multi_part_engine(sim_config, include_ticks=False) -> tuple[MultiPartEng print(f"Command to reproduce run:\n {_get_cmd(sim_config, "run-parts")}") sim_config = MultiPartSimConfig.model_validate(sim_config) - multi_engine, workload_results, timestep_start, timestep_end, time_delta = \ - MultiPartEngine.from_sim_config(sim_config) - jobs = {p: w.jobs for p, w in workload_results.items()} - gen = multi_engine.run_simulation(jobs, timestep_start, timestep_end, time_delta) + multi_engine = MultiPartEngine(sim_config) + gen = multi_engine.run_simulation() stats = { "tick_count": 0,