Loading raps/cooling.py +2 −2 Original line number Diff line number Diff line Loading @@ -155,8 +155,8 @@ class ThermoFluidsModel: # If replay mode is on and weather data is available if self.weather and self.weather.has_coords: # Convert total seconds to timedelta object delta = timedelta(seconds=engine.current_timestep) target_datetime = self.weather.start + delta delta = timedelta(seconds=engine.current_timestep - engine.timestep_start) target_datetime = engine.start + delta # Get temperature from weather data temperature = self.weather.get_temperature(target_datetime) or self.config['WET_BULB_TEMP'] Loading raps/engine.py +107 −114 Original line number Diff line number Diff line Loading @@ -15,6 +15,7 @@ from raps.policy import PolicyType from raps.utils import ( summarize_ranges, get_current_utilization, WorkloadData, ) from raps.resmgr import ResourceManager from raps.schedulers import load_scheduler Loading @@ -39,7 +40,6 @@ from raps.account import Accounts from raps.downtime import Downtime from raps.weather import Weather from raps.sim_config import SimConfig from raps.system_config import SystemConfig from bisect import bisect_right Loading Loading @@ -125,94 +125,7 @@ def keyboard_listener(state): class Engine: """Job scheduling simulation engine.""" def __init__(self, *, power_manager: PowerManager, flops_manager: FLOPSManager, telemetry: Telemetry, cooling_model: ThermoFluidsModel | None = None, jobs=None, total_initial_jobs=0, # Workload class to generate from for continuous generation continuous_workload: Workload | None = None, accounts=None, sim_config: SimConfig, system_config: SystemConfig, ): self.config = system_config.get_legacy() self.down_nodes = summarize_ranges(self.config['DOWN_NODES']) self.resource_manager = ResourceManager( total_nodes=self.config['TOTAL_NODES'], down_nodes=self.config['DOWN_NODES'], config=self.config ) # Initialize running and queue, etc. self.running = [] self.queue = [] self.accounts = accounts self.telemetry = telemetry self.job_history_dict = [] self.jobs_completed = 0 self.jobs_killed = 0 self.total_initial_jobs = total_initial_jobs self.current_timestep = 0 self.cooling_model = cooling_model self.sys_power = 0 self.power_manager = power_manager self.flops_manager = flops_manager self.debug = sim_config.debug self.continuous_workload = continuous_workload self.replay = sim_config.replay self.downscale = sim_config.downscale # Factor to downscale the 1s timesteps (power of 10) self.simulate_network = sim_config.simulate_network self.sys_util_history = [] self.scheduler_queue_history = [] self.scheduler_running_history = [] self.avg_net_tx = [] self.avg_net_rx = [] self.net_util_history = [] self.avg_slowdown_history = [] self.max_slowdown_history = [] self.node_occupancy_history = [] self.downtime = Downtime(first_downtime=sim_config.downtime_first_int, downtime_interval=sim_config.downtime_interval_int, downtime_length=sim_config.downtime_length_int, debug=sim_config.debug, ) # Set scheduler type - either based on config or command-line args - defaults to 'default' if self.config['multitenant']: scheduler_type = 'multitenant' else: scheduler_type = sim_config.scheduler policy_type = sim_config.policy backfill_type = sim_config.backfill self.scheduler = load_scheduler(scheduler_type)( config=self.config, policy=policy_type, bfpolicy=backfill_type, resource_manager=self.resource_manager, jobs=jobs ) if sim_config.live: assert self.scheduler.policy != PolicyType.REPLAY, \ "Cannot replay from a live system. Choose a scheduling policy!" print(f"Using scheduler: {str(self.scheduler.__class__).split('.')[2]}" f", with policy {self.scheduler.policy} " f"and backfill {self.scheduler.bfpolicy}") if self.simulate_network: available_nodes = self.resource_manager.available_nodes self.network_model = NetworkModel( available_nodes=available_nodes, config=self.config, ) else: self.network_model = None @staticmethod def from_sim_config(sim_config: SimConfig, partition: str | None = None): def __init__(self, sim_config: SimConfig, partition: str | None = None): if partition: system_config = sim_config.get_system_config_by_name(partition) elif len(sim_config.system_configs) > 1: Loading @@ -234,12 +147,12 @@ class Engine: np.random.seed(sim_config.seed + 1) if sim_config.live and not sim_config.replay: td = Telemetry(**sim_config_dict) wd = td.load_from_live_system() telemetry = Telemetry(**sim_config_dict) wd = telemetry.load_from_live_system() elif sim_config.replay: # TODO: this will have issues if running separate systems or custom systems partition_short = partition.split("/")[-1] if partition else None td = Telemetry( telemetry = Telemetry( **sim_config_dict, partition=partition, ) Loading @@ -254,15 +167,18 @@ class Engine: else: replay_files = sim_config.replay wd = td.load_from_files(replay_files) wd = telemetry.load_from_files(replay_files) else: # Synthetic jobs wl = Workload(sim_config_args, system_config_dict) wd = wl.generate_jobs() td = Telemetry(**sim_config_dict) telemetry = Telemetry(**sim_config_dict) jobs = wd.jobs if len(jobs) == 0: print(f"Warning no jobs found for {partition or 'system'}") if partition and len(sim_config.system_configs) > 1: for job in jobs: job.partition = partition if sim_config.start: start = sim_config.start Loading Loading @@ -319,19 +235,94 @@ class Engine: else: accounts = job_accounts engine = Engine( power_manager=power_manager, flops_manager=flops_manager, cooling_model=cooling_model, continuous_workload=continuous_workload, jobs=jobs, accounts=accounts, telemetry=td, sim_config=sim_config, system_config=system_config, self.sim_config = sim_config self.system_config = system_config self.config = system_config.get_legacy() self.start = start self.timestep_start = wd.telemetry_start self.timestep_end = wd.telemetry_end self.time_delta = time_delta self.down_nodes = summarize_ranges(self.config['DOWN_NODES']) self.resource_manager = ResourceManager( total_nodes=self.config['TOTAL_NODES'], down_nodes=self.config['DOWN_NODES'], config=self.config ) # Initialize running and queue, etc. self.running = [] self.queue = [] self.accounts = accounts self.telemetry = telemetry self.job_history_dict = [] self.jobs_completed = 0 self.jobs_killed = 0 self.jobs = jobs self.total_initial_jobs = len(jobs) self.current_timestep = 0 self.cooling_model = cooling_model self.sys_power = 0 self.power_manager = power_manager self.flops_manager = flops_manager self.debug = sim_config.debug self.continuous_workload = continuous_workload self.replay = sim_config.replay self.downscale = sim_config.downscale # Factor to downscale the 1s timesteps (power of 10) self.simulate_network = sim_config.simulate_network self.sys_util_history = [] self.scheduler_queue_history = [] self.scheduler_running_history = [] self.avg_net_tx = [] self.avg_net_rx = [] self.net_util_history = [] self.avg_slowdown_history = [] self.max_slowdown_history = [] self.node_occupancy_history = [] self.downtime = Downtime(first_downtime=sim_config.downtime_first_int, downtime_interval=sim_config.downtime_interval_int, downtime_length=sim_config.downtime_length_int, debug=sim_config.debug, ) # Set scheduler type - either based on config or command-line args - defaults to 'default' if self.config['multitenant']: scheduler_type = 'multitenant' else: scheduler_type = sim_config.scheduler policy_type = sim_config.policy backfill_type = sim_config.backfill self.scheduler = load_scheduler(scheduler_type)( config=self.config, policy=policy_type, bfpolicy=backfill_type, resource_manager=self.resource_manager, jobs=jobs ) if sim_config.live: assert self.scheduler.policy != PolicyType.REPLAY, \ "Cannot replay from a live system. Choose a scheduling policy!" print(f"Using scheduler: {str(self.scheduler.__class__).split('.')[2]}" f", with policy {self.scheduler.policy} " f"and backfill {self.scheduler.bfpolicy}") if self.simulate_network: available_nodes = self.resource_manager.available_nodes self.network_model = NetworkModel( available_nodes=available_nodes, config=self.config, ) else: self.network_model = None return engine, wd, time_delta def get_workload_data(self) -> WorkloadData: return WorkloadData( jobs=self.jobs[:], telemetry_start=self.timestep_start, telemetry_end=self.timestep_end, start_date=self.start, ) def add_running_jobs_to_queue(self, jobs_to_submit: List): """ Loading Loading @@ -713,7 +704,7 @@ class Engine: self.scheduler.policy = target_policy self.scheduler.bfpolicy = target_bfpolicy def run_simulation(self, jobs, timestep_start, timestep_end, time_delta=1, autoshutdown=False): def run_simulation(self, autoshutdown=False): """Generator that yields after each simulation tick.""" if self.scheduler.policy == PolicyType.REPLAY: Loading @@ -722,24 +713,26 @@ class Engine: replay = False if self.debug: print(f"[DEBUG] run_simulation: Initial jobs count: {len(jobs)}") if jobs: print(f"[DEBUG] run_simulation: Initial jobs count: {len(self.jobs)}") if self.jobs: print("[DEBUG] run_simulation: First job submit_time: " f"{jobs[0].submit_time}, start_time: {jobs[0].start_time}") f"{self.jobs[0].submit_time}, start_time: {self.jobs[0].start_time}") # Set times and place jobs that are currently running, onto the system. self.prepare_system_state(all_jobs=jobs, timestep_start=timestep_start, timestep_end=timestep_end) self.prepare_system_state(all_jobs=self.jobs, timestep_start=self.timestep_start, timestep_end=self.timestep_end, ) # Process jobs in batches for better performance of timestep loop all_jobs = jobs.copy() all_jobs = self.jobs.copy() submit_times = [j.submit_time for j in all_jobs] cursor = 0 jobs = [] # Batch Jobs into 6h windows based on submit_time or twice the time_delta if larger batch_window = max(60 * 60 * 6, 2 * time_delta) # at least 6h batch_window = max(60 * 60 * 6, 2 * self.time_delta) # at least 6h sim_state = SimulationState(time_delta) sim_state = SimulationState(self.time_delta) # listener_thread = threading.Thread(target=keyboard_listener, args=(sim_state,), daemon=True) # listener_thread.start() Loading @@ -751,7 +744,7 @@ class Engine: current_time_delta = sim_state.get_time_delta() if (self.current_timestep % batch_window == 0) or (self.current_timestep == timestep_start): if (self.current_timestep % batch_window == 0) or (self.current_timestep == self.timestep_start): # Add jobs that are within the batching window and remove them from all jobs # jobs += [job for job in all_jobs if job.submit_time <= self.current_timestep + batch_window] # all_jobs[:] = [job for job in all_jobs if job.submit_time > self.current_timestep + batch_window] Loading raps/envs/raps_env.py +3 −5 Original line number Diff line number Diff line Loading @@ -59,12 +59,10 @@ class RAPSEnv(gym.Env): self.action_space = spaces.Discrete(max_jobs) def _create_engine(self): engine, workload_data, time_delta = Engine.from_sim_config(self.sim_config) engine = Engine(self.sim_config) engine.scheduler.env = self self.jobs = workload_data.jobs timestep_start = workload_data.telemetry_start timestep_end = workload_data.telemetry_end self.generator = engine.run_simulation(self.jobs, timestep_start, timestep_end, time_delta) self.jobs = engine.jobs self.generator = engine.run_simulation() return engine def reset(self, **kwargs): Loading raps/multi_part_engine.py +13 −33 Original line number Diff line number Diff line from collections.abc import Iterable from raps.engine import Engine, TickData from raps.sim_config import MultiPartSimConfig from raps.utils import WorkloadData class MultiPartEngine: def __init__(self, engines: dict[str, Engine], jobs: dict[str, list]): self.partition_names = sorted(engines.keys()) self.engines = engines self.jobs = jobs @staticmethod def from_sim_config(sim_config: MultiPartSimConfig): def __init__(self, sim_config: MultiPartSimConfig): if sim_config.replay: root_systems = set(s.system_name.split("/")[0] for s in sim_config.system_configs) # TODO should consider how to pass separate replay values for separate systems if len(root_systems) > 1: raise ValueError("Replay for multi-system runs is not supported") workloads_by_partition: dict[str, WorkloadData] = {} engines: dict[str, Engine] = {} time_delta = 0 for partition in sim_config.system_configs: name = partition.system_name engine, workload_data, time_delta = Engine.from_sim_config( sim_config, partition=name, ) for job in workload_data.jobs: job.partition = name workloads_by_partition[name] = workload_data engines[name] = engine timestep_start = min(w.telemetry_start for w in workloads_by_partition.values()) timestep_end = min(w.telemetry_end for w in workloads_by_partition.values()) total_initial_jobs = sum(len(j.jobs) for j in workloads_by_partition.values()) engine = Engine(sim_config, partition=partition.system_name) engines[partition.system_name] = engine total_initial_jobs = sum(len(e.jobs) for e in engines.values()) for engine in engines.values(): engine.total_initial_jobs = total_initial_jobs multi_engine = MultiPartEngine( engines=engines, jobs={p: w.jobs for p, w in workloads_by_partition.items()}, ) return multi_engine, workloads_by_partition, timestep_start, timestep_end, time_delta self.partition_names = sorted(engines.keys()) self.engines = engines first_engine = list(engines.values())[0] self.start = first_engine.start self.timestep_start = first_engine.timestep_start self.timestep_end = first_engine.timestep_end def run_simulation(self, jobs: dict, timestep_start, timestep_end, time_delta=1 ) -> Iterable[dict[str, TickData | None]]: def run_simulation(self) -> Iterable[dict[str, TickData | None]]: generators = [] for part in self.partition_names: generators.append(self.engines[part].run_simulation( jobs[part], timestep_start, timestep_end, time_delta, )) generators.append(self.engines[part].run_simulation()) for tick_datas in zip(*generators, strict=True): yield dict(zip(self.partition_names, tick_datas)) Loading raps/run_sim.py +9 −15 Original line number Diff line number Diff line Loading @@ -49,37 +49,34 @@ def run_sim(sim_config: SingleSimConfig): print("Use run-parts to run multi-partition simulations") sys.exit(1) engine, workload_data, time_delta = Engine.from_sim_config(sim_config) engine = Engine(sim_config) out = sim_config.get_output() if out: out.mkdir(parents=True) engine.telemetry.save_snapshot( dest=str(out / 'snapshot.npz'), result=workload_data, result=engine.get_workload_data(), args=sim_config, ) (out / 'sim_config.yaml').write_text(sim_config.dump_yaml()) jobs = workload_data.jobs timestep_start, timestep_end = workload_data.telemetry_start, workload_data.telemetry_end jobs = engine.jobs timestep_start, timestep_end = engine.timestep_start, engine.timestep_end total_timesteps = timestep_end - timestep_start downscale = sim_config.downscale downscale_str = ""if downscale == 1 else f"/{downscale}" print(f"Simulating {len(jobs)} jobs for {total_timesteps}{downscale_str}" f" seconds from {timestep_start} to {timestep_end}.") print(f"Simulation time delta: {time_delta}{downscale_str} s," print(f"Simulation time delta: {engine.time_delta}{downscale_str} s," f"Telemetry trace quanta: {jobs[0].trace_quanta}{downscale_str} s.") layout_manager = LayoutManager( sim_config.layout, engine=engine, debug=sim_config.debug, total_timesteps=total_timesteps, args_dict=sim_config.get_legacy_args_dict(), **sim_config.system_configs[0].get_legacy(), ) layout_manager.run( jobs, timestep_start=timestep_start, timestep_end=timestep_end, time_delta=time_delta, ) layout_manager.run() engine_stats = get_engine_stats(engine) job_stats = get_job_stats(engine) Loading Loading @@ -223,8 +220,7 @@ def run_parts_sim(sim_config: MultiPartSimConfig): UserWarning ) multi_engine, workload_results, timestep_start, timestep_end, time_delta = \ MultiPartEngine.from_sim_config(sim_config) multi_engine = MultiPartEngine(sim_config) out = sim_config.get_output() if out: Loading @@ -232,15 +228,13 @@ def run_parts_sim(sim_config: MultiPartSimConfig): for part, engine in multi_engine.engines.items(): engine.telemetry.save_snapshot( dest=str(out / part.split('/')[-1]), result=workload_results[part], result=engine.get_workload_data(), args=sim_config, ) (out / 'sim_config.yaml').write_text(sim_config.dump_yaml()) jobs = {p: w.jobs for p, w in workload_results.items()} ui_update_freq = sim_config.system_configs[0].scheduler.ui_update_freq gen = multi_engine.run_simulation(jobs, timestep_start, timestep_end, time_delta) gen = multi_engine.run_simulation() for tick_datas in gen: sys_power = 0 Loading Loading
raps/cooling.py +2 −2 Original line number Diff line number Diff line Loading @@ -155,8 +155,8 @@ class ThermoFluidsModel: # If replay mode is on and weather data is available if self.weather and self.weather.has_coords: # Convert total seconds to timedelta object delta = timedelta(seconds=engine.current_timestep) target_datetime = self.weather.start + delta delta = timedelta(seconds=engine.current_timestep - engine.timestep_start) target_datetime = engine.start + delta # Get temperature from weather data temperature = self.weather.get_temperature(target_datetime) or self.config['WET_BULB_TEMP'] Loading
raps/engine.py +107 −114 Original line number Diff line number Diff line Loading @@ -15,6 +15,7 @@ from raps.policy import PolicyType from raps.utils import ( summarize_ranges, get_current_utilization, WorkloadData, ) from raps.resmgr import ResourceManager from raps.schedulers import load_scheduler Loading @@ -39,7 +40,6 @@ from raps.account import Accounts from raps.downtime import Downtime from raps.weather import Weather from raps.sim_config import SimConfig from raps.system_config import SystemConfig from bisect import bisect_right Loading Loading @@ -125,94 +125,7 @@ def keyboard_listener(state): class Engine: """Job scheduling simulation engine.""" def __init__(self, *, power_manager: PowerManager, flops_manager: FLOPSManager, telemetry: Telemetry, cooling_model: ThermoFluidsModel | None = None, jobs=None, total_initial_jobs=0, # Workload class to generate from for continuous generation continuous_workload: Workload | None = None, accounts=None, sim_config: SimConfig, system_config: SystemConfig, ): self.config = system_config.get_legacy() self.down_nodes = summarize_ranges(self.config['DOWN_NODES']) self.resource_manager = ResourceManager( total_nodes=self.config['TOTAL_NODES'], down_nodes=self.config['DOWN_NODES'], config=self.config ) # Initialize running and queue, etc. self.running = [] self.queue = [] self.accounts = accounts self.telemetry = telemetry self.job_history_dict = [] self.jobs_completed = 0 self.jobs_killed = 0 self.total_initial_jobs = total_initial_jobs self.current_timestep = 0 self.cooling_model = cooling_model self.sys_power = 0 self.power_manager = power_manager self.flops_manager = flops_manager self.debug = sim_config.debug self.continuous_workload = continuous_workload self.replay = sim_config.replay self.downscale = sim_config.downscale # Factor to downscale the 1s timesteps (power of 10) self.simulate_network = sim_config.simulate_network self.sys_util_history = [] self.scheduler_queue_history = [] self.scheduler_running_history = [] self.avg_net_tx = [] self.avg_net_rx = [] self.net_util_history = [] self.avg_slowdown_history = [] self.max_slowdown_history = [] self.node_occupancy_history = [] self.downtime = Downtime(first_downtime=sim_config.downtime_first_int, downtime_interval=sim_config.downtime_interval_int, downtime_length=sim_config.downtime_length_int, debug=sim_config.debug, ) # Set scheduler type - either based on config or command-line args - defaults to 'default' if self.config['multitenant']: scheduler_type = 'multitenant' else: scheduler_type = sim_config.scheduler policy_type = sim_config.policy backfill_type = sim_config.backfill self.scheduler = load_scheduler(scheduler_type)( config=self.config, policy=policy_type, bfpolicy=backfill_type, resource_manager=self.resource_manager, jobs=jobs ) if sim_config.live: assert self.scheduler.policy != PolicyType.REPLAY, \ "Cannot replay from a live system. Choose a scheduling policy!" print(f"Using scheduler: {str(self.scheduler.__class__).split('.')[2]}" f", with policy {self.scheduler.policy} " f"and backfill {self.scheduler.bfpolicy}") if self.simulate_network: available_nodes = self.resource_manager.available_nodes self.network_model = NetworkModel( available_nodes=available_nodes, config=self.config, ) else: self.network_model = None @staticmethod def from_sim_config(sim_config: SimConfig, partition: str | None = None): def __init__(self, sim_config: SimConfig, partition: str | None = None): if partition: system_config = sim_config.get_system_config_by_name(partition) elif len(sim_config.system_configs) > 1: Loading @@ -234,12 +147,12 @@ class Engine: np.random.seed(sim_config.seed + 1) if sim_config.live and not sim_config.replay: td = Telemetry(**sim_config_dict) wd = td.load_from_live_system() telemetry = Telemetry(**sim_config_dict) wd = telemetry.load_from_live_system() elif sim_config.replay: # TODO: this will have issues if running separate systems or custom systems partition_short = partition.split("/")[-1] if partition else None td = Telemetry( telemetry = Telemetry( **sim_config_dict, partition=partition, ) Loading @@ -254,15 +167,18 @@ class Engine: else: replay_files = sim_config.replay wd = td.load_from_files(replay_files) wd = telemetry.load_from_files(replay_files) else: # Synthetic jobs wl = Workload(sim_config_args, system_config_dict) wd = wl.generate_jobs() td = Telemetry(**sim_config_dict) telemetry = Telemetry(**sim_config_dict) jobs = wd.jobs if len(jobs) == 0: print(f"Warning no jobs found for {partition or 'system'}") if partition and len(sim_config.system_configs) > 1: for job in jobs: job.partition = partition if sim_config.start: start = sim_config.start Loading Loading @@ -319,19 +235,94 @@ class Engine: else: accounts = job_accounts engine = Engine( power_manager=power_manager, flops_manager=flops_manager, cooling_model=cooling_model, continuous_workload=continuous_workload, jobs=jobs, accounts=accounts, telemetry=td, sim_config=sim_config, system_config=system_config, self.sim_config = sim_config self.system_config = system_config self.config = system_config.get_legacy() self.start = start self.timestep_start = wd.telemetry_start self.timestep_end = wd.telemetry_end self.time_delta = time_delta self.down_nodes = summarize_ranges(self.config['DOWN_NODES']) self.resource_manager = ResourceManager( total_nodes=self.config['TOTAL_NODES'], down_nodes=self.config['DOWN_NODES'], config=self.config ) # Initialize running and queue, etc. self.running = [] self.queue = [] self.accounts = accounts self.telemetry = telemetry self.job_history_dict = [] self.jobs_completed = 0 self.jobs_killed = 0 self.jobs = jobs self.total_initial_jobs = len(jobs) self.current_timestep = 0 self.cooling_model = cooling_model self.sys_power = 0 self.power_manager = power_manager self.flops_manager = flops_manager self.debug = sim_config.debug self.continuous_workload = continuous_workload self.replay = sim_config.replay self.downscale = sim_config.downscale # Factor to downscale the 1s timesteps (power of 10) self.simulate_network = sim_config.simulate_network self.sys_util_history = [] self.scheduler_queue_history = [] self.scheduler_running_history = [] self.avg_net_tx = [] self.avg_net_rx = [] self.net_util_history = [] self.avg_slowdown_history = [] self.max_slowdown_history = [] self.node_occupancy_history = [] self.downtime = Downtime(first_downtime=sim_config.downtime_first_int, downtime_interval=sim_config.downtime_interval_int, downtime_length=sim_config.downtime_length_int, debug=sim_config.debug, ) # Set scheduler type - either based on config or command-line args - defaults to 'default' if self.config['multitenant']: scheduler_type = 'multitenant' else: scheduler_type = sim_config.scheduler policy_type = sim_config.policy backfill_type = sim_config.backfill self.scheduler = load_scheduler(scheduler_type)( config=self.config, policy=policy_type, bfpolicy=backfill_type, resource_manager=self.resource_manager, jobs=jobs ) if sim_config.live: assert self.scheduler.policy != PolicyType.REPLAY, \ "Cannot replay from a live system. Choose a scheduling policy!" print(f"Using scheduler: {str(self.scheduler.__class__).split('.')[2]}" f", with policy {self.scheduler.policy} " f"and backfill {self.scheduler.bfpolicy}") if self.simulate_network: available_nodes = self.resource_manager.available_nodes self.network_model = NetworkModel( available_nodes=available_nodes, config=self.config, ) else: self.network_model = None return engine, wd, time_delta def get_workload_data(self) -> WorkloadData: return WorkloadData( jobs=self.jobs[:], telemetry_start=self.timestep_start, telemetry_end=self.timestep_end, start_date=self.start, ) def add_running_jobs_to_queue(self, jobs_to_submit: List): """ Loading Loading @@ -713,7 +704,7 @@ class Engine: self.scheduler.policy = target_policy self.scheduler.bfpolicy = target_bfpolicy def run_simulation(self, jobs, timestep_start, timestep_end, time_delta=1, autoshutdown=False): def run_simulation(self, autoshutdown=False): """Generator that yields after each simulation tick.""" if self.scheduler.policy == PolicyType.REPLAY: Loading @@ -722,24 +713,26 @@ class Engine: replay = False if self.debug: print(f"[DEBUG] run_simulation: Initial jobs count: {len(jobs)}") if jobs: print(f"[DEBUG] run_simulation: Initial jobs count: {len(self.jobs)}") if self.jobs: print("[DEBUG] run_simulation: First job submit_time: " f"{jobs[0].submit_time}, start_time: {jobs[0].start_time}") f"{self.jobs[0].submit_time}, start_time: {self.jobs[0].start_time}") # Set times and place jobs that are currently running, onto the system. self.prepare_system_state(all_jobs=jobs, timestep_start=timestep_start, timestep_end=timestep_end) self.prepare_system_state(all_jobs=self.jobs, timestep_start=self.timestep_start, timestep_end=self.timestep_end, ) # Process jobs in batches for better performance of timestep loop all_jobs = jobs.copy() all_jobs = self.jobs.copy() submit_times = [j.submit_time for j in all_jobs] cursor = 0 jobs = [] # Batch Jobs into 6h windows based on submit_time or twice the time_delta if larger batch_window = max(60 * 60 * 6, 2 * time_delta) # at least 6h batch_window = max(60 * 60 * 6, 2 * self.time_delta) # at least 6h sim_state = SimulationState(time_delta) sim_state = SimulationState(self.time_delta) # listener_thread = threading.Thread(target=keyboard_listener, args=(sim_state,), daemon=True) # listener_thread.start() Loading @@ -751,7 +744,7 @@ class Engine: current_time_delta = sim_state.get_time_delta() if (self.current_timestep % batch_window == 0) or (self.current_timestep == timestep_start): if (self.current_timestep % batch_window == 0) or (self.current_timestep == self.timestep_start): # Add jobs that are within the batching window and remove them from all jobs # jobs += [job for job in all_jobs if job.submit_time <= self.current_timestep + batch_window] # all_jobs[:] = [job for job in all_jobs if job.submit_time > self.current_timestep + batch_window] Loading
raps/envs/raps_env.py +3 −5 Original line number Diff line number Diff line Loading @@ -59,12 +59,10 @@ class RAPSEnv(gym.Env): self.action_space = spaces.Discrete(max_jobs) def _create_engine(self): engine, workload_data, time_delta = Engine.from_sim_config(self.sim_config) engine = Engine(self.sim_config) engine.scheduler.env = self self.jobs = workload_data.jobs timestep_start = workload_data.telemetry_start timestep_end = workload_data.telemetry_end self.generator = engine.run_simulation(self.jobs, timestep_start, timestep_end, time_delta) self.jobs = engine.jobs self.generator = engine.run_simulation() return engine def reset(self, **kwargs): Loading
raps/multi_part_engine.py +13 −33 Original line number Diff line number Diff line from collections.abc import Iterable from raps.engine import Engine, TickData from raps.sim_config import MultiPartSimConfig from raps.utils import WorkloadData class MultiPartEngine: def __init__(self, engines: dict[str, Engine], jobs: dict[str, list]): self.partition_names = sorted(engines.keys()) self.engines = engines self.jobs = jobs @staticmethod def from_sim_config(sim_config: MultiPartSimConfig): def __init__(self, sim_config: MultiPartSimConfig): if sim_config.replay: root_systems = set(s.system_name.split("/")[0] for s in sim_config.system_configs) # TODO should consider how to pass separate replay values for separate systems if len(root_systems) > 1: raise ValueError("Replay for multi-system runs is not supported") workloads_by_partition: dict[str, WorkloadData] = {} engines: dict[str, Engine] = {} time_delta = 0 for partition in sim_config.system_configs: name = partition.system_name engine, workload_data, time_delta = Engine.from_sim_config( sim_config, partition=name, ) for job in workload_data.jobs: job.partition = name workloads_by_partition[name] = workload_data engines[name] = engine timestep_start = min(w.telemetry_start for w in workloads_by_partition.values()) timestep_end = min(w.telemetry_end for w in workloads_by_partition.values()) total_initial_jobs = sum(len(j.jobs) for j in workloads_by_partition.values()) engine = Engine(sim_config, partition=partition.system_name) engines[partition.system_name] = engine total_initial_jobs = sum(len(e.jobs) for e in engines.values()) for engine in engines.values(): engine.total_initial_jobs = total_initial_jobs multi_engine = MultiPartEngine( engines=engines, jobs={p: w.jobs for p, w in workloads_by_partition.items()}, ) return multi_engine, workloads_by_partition, timestep_start, timestep_end, time_delta self.partition_names = sorted(engines.keys()) self.engines = engines first_engine = list(engines.values())[0] self.start = first_engine.start self.timestep_start = first_engine.timestep_start self.timestep_end = first_engine.timestep_end def run_simulation(self, jobs: dict, timestep_start, timestep_end, time_delta=1 ) -> Iterable[dict[str, TickData | None]]: def run_simulation(self) -> Iterable[dict[str, TickData | None]]: generators = [] for part in self.partition_names: generators.append(self.engines[part].run_simulation( jobs[part], timestep_start, timestep_end, time_delta, )) generators.append(self.engines[part].run_simulation()) for tick_datas in zip(*generators, strict=True): yield dict(zip(self.partition_names, tick_datas)) Loading
raps/run_sim.py +9 −15 Original line number Diff line number Diff line Loading @@ -49,37 +49,34 @@ def run_sim(sim_config: SingleSimConfig): print("Use run-parts to run multi-partition simulations") sys.exit(1) engine, workload_data, time_delta = Engine.from_sim_config(sim_config) engine = Engine(sim_config) out = sim_config.get_output() if out: out.mkdir(parents=True) engine.telemetry.save_snapshot( dest=str(out / 'snapshot.npz'), result=workload_data, result=engine.get_workload_data(), args=sim_config, ) (out / 'sim_config.yaml').write_text(sim_config.dump_yaml()) jobs = workload_data.jobs timestep_start, timestep_end = workload_data.telemetry_start, workload_data.telemetry_end jobs = engine.jobs timestep_start, timestep_end = engine.timestep_start, engine.timestep_end total_timesteps = timestep_end - timestep_start downscale = sim_config.downscale downscale_str = ""if downscale == 1 else f"/{downscale}" print(f"Simulating {len(jobs)} jobs for {total_timesteps}{downscale_str}" f" seconds from {timestep_start} to {timestep_end}.") print(f"Simulation time delta: {time_delta}{downscale_str} s," print(f"Simulation time delta: {engine.time_delta}{downscale_str} s," f"Telemetry trace quanta: {jobs[0].trace_quanta}{downscale_str} s.") layout_manager = LayoutManager( sim_config.layout, engine=engine, debug=sim_config.debug, total_timesteps=total_timesteps, args_dict=sim_config.get_legacy_args_dict(), **sim_config.system_configs[0].get_legacy(), ) layout_manager.run( jobs, timestep_start=timestep_start, timestep_end=timestep_end, time_delta=time_delta, ) layout_manager.run() engine_stats = get_engine_stats(engine) job_stats = get_job_stats(engine) Loading Loading @@ -223,8 +220,7 @@ def run_parts_sim(sim_config: MultiPartSimConfig): UserWarning ) multi_engine, workload_results, timestep_start, timestep_end, time_delta = \ MultiPartEngine.from_sim_config(sim_config) multi_engine = MultiPartEngine(sim_config) out = sim_config.get_output() if out: Loading @@ -232,15 +228,13 @@ def run_parts_sim(sim_config: MultiPartSimConfig): for part, engine in multi_engine.engines.items(): engine.telemetry.save_snapshot( dest=str(out / part.split('/')[-1]), result=workload_results[part], result=engine.get_workload_data(), args=sim_config, ) (out / 'sim_config.yaml').write_text(sim_config.dump_yaml()) jobs = {p: w.jobs for p, w in workload_results.items()} ui_update_freq = sim_config.system_configs[0].scheduler.ui_update_freq gen = multi_engine.run_simulation(jobs, timestep_start, timestep_end, time_delta) gen = multi_engine.run_simulation() for tick_datas in gen: sys_power = 0 Loading