From 3efae58edd4fe531a56305c82560e393ac6cc0d9 Mon Sep 17 00:00:00 2001 From: Matthias Maiterth Date: Tue, 23 Sep 2025 12:40:59 -0400 Subject: [PATCH 1/2] Update to TickData and condition handling in the simulation loop and tick. - Added TickReturn as datalcass to collect returnvalues from tick. This is used to update the TickData which is yielded from run_simulation. - updated tracking of need for reschedule - updated run_simulation loop such that each functions returns its values that are needed to build the TickData. These are then finally used to construct the TickData class. The state is therefore more transparent. And updates are certain to be the latests when yielding. The 5. step "complete the timestep" is last, after yield (as it is equivalent to the update condition in a for loop (i=0;i 1: raise ValueError( - "Engine can only run single-partition simulations. Use MultiPartEngine for " + - "multi-partition simulations, or pass partition to select the partition to run." + "Engine can only run single-partition simulations. Use MultiPartEngine for " + + "multi-partition simulations, or pass partition to select the partition to run." ) else: system_config = sim_config.system_configs[0] @@ -185,8 +201,8 @@ class Engine: diff = start - wd.start_date if diff.total_seconds() < 0: raise Exception( - f"{start.isoformat()} is before data range in workload. " + - f"Workload data begins at {wd.start_date.isoformat()}" + f"{start.isoformat()} is before data range in workload. " + + f"Workload data begins at {wd.start_date.isoformat()}" ) wd.telemetry_start += int(diff.total_seconds()) wd.start_date = start @@ -366,16 +382,21 @@ class Engine: return False def prepare_timestep(self, *, replay: bool = True, jobs): + # 0 track need to reschedule # 1 identify completed jobs # 2 Check continuous job generation # 3 Simulate node failure # Defunct feature! # 4 Simulate downtime # 5 Update active and free nodes + need_reschedule = False + # 1 Identify Completed Jobs completed_jobs = [job for job in self.running if job.end_time is not None and job.end_time <= self.current_timestep] + need_reschedule = need_reschedule or (completed_jobs != []) + # Update Completed Jobs, their account and and Free resources. for job in completed_jobs: self.power_manager.set_idle(job.scheduled_nodes) @@ -393,6 +414,8 @@ class Engine: killed_jobs = [job for job in self.running if job.end_time is not None and job.start_time + job.time_limit <= self.current_timestep] + need_reschedule = need_reschedule or (killed_jobs != []) + for job in killed_jobs: self.power_manager.set_idle(job.scheduled_nodes) job.current_state = JobState.TIMEOUT @@ -419,9 +442,12 @@ class Engine: else: newly_downed_nodes = [] - need_reschedule = False + need_reschedule = need_reschedule or (newly_downed_nodes != []) + # 4 Simulate downtime - need_reschedule = self.downtime.check_and_trigger(timestep=self.current_timestep, engine=self) + downtime = self.downtime.check_and_trigger(timestep=self.current_timestep, engine=self) + + need_reschedule = need_reschedule or downtime # 5 Update active/free nodes based on core/GPU utilization if self.config['multitenant']: @@ -447,10 +473,12 @@ class Engine: self.num_active_nodes = self.config['TOTAL_NODES'] \ - len(self.resource_manager.available_nodes) \ - len(self.resource_manager.down_nodes) + if self.down_nodes != self.resource_manager.down_nodes: + need_reschedule = need_reschedule or True self.down_nodes = self.resource_manager.down_nodes # TODO This should only be managed in the resource manager! - return completed_jobs, killed_jobs, newly_downed_nodes, need_reschedule + return completed_jobs, killed_jobs, need_reschedule def complete_timestep(self, *, actively_considered_jobs: List, @@ -522,8 +550,8 @@ class Engine: if job.current_state != JobState.RUNNING: raise ValueError( - f"Job {job.id} is in running list, " + - f"but state is not RUNNING: job.state == {job.current_state}" + f"Job {job.id} is in running list, " + + f"but state is not RUNNING: job.state == {job.current_state}" ) else: # if job.state == JobState.RUNNING: # Error checks @@ -531,13 +559,13 @@ class Engine: raise Exception(f"Job exceded time limit! " f"{job.running_time} > {job.time_limit}" f"\n{job}" - f"\nCurrent timestep:{self.current_timestep-self.timestep_start} (rel)" + f"\nCurrent timestep:{self.current_timestep - self.timestep_start} (rel)" ) if replay and job.running_time > job.expected_run_time: raise Exception(f"Job should have ended in replay! " f" {job.running_time} > {job.expected_run_time}" f"\n{job}" - f"\nCurrent timestep:{self.current_timestep-self.timestep_start} (rel)" + f"\nCurrent timestep:{self.current_timestep - self.timestep_start} (rel)" ) # Aggregate scheduled nodes @@ -627,7 +655,7 @@ class Engine: net_rx_list=net_rx_list, slowdown_factors=slowdown_factors ) - slowdown_per_job = sum(slowdown_factors)/len(slowdown_factors) if len(slowdown_factors) != 0 else 0 + slowdown_per_job = sum(slowdown_factors) / len(slowdown_factors) if len(slowdown_factors) != 0 else 0 self.record_network_stats(avg_tx=avg_tx, avg_rx=avg_rx, avg_net=avg_net) @@ -646,29 +674,19 @@ class Engine: self.node_occupancy_history.append(node_occupancy) - tick_data = TickData( - current_timestep=self.current_timestep, - completed=None, - killed=None, - running=self.running, - queue=self.queue, - down_nodes=self.down_nodes, + return TickReturn( power_df=power_df, p_flops=pflops, g_flops_w=gflops_per_watt, system_util=system_util, fmu_inputs=cooling_inputs, fmu_outputs=cooling_outputs, - num_active_nodes=self.num_active_nodes, - num_free_nodes=self.num_free_nodes, avg_net_tx=avg_tx, avg_net_rx=avg_rx, avg_net_util=avg_net, slowdown_per_job=slowdown_per_job, node_occupancy=node_occupancy, - time_delta=time_delta ) - return tick_data def prepare_system_state(self, *, all_jobs: List, timestep_start, timestep_end): # Set engine timesteps @@ -755,18 +773,14 @@ class Engine: cursor = r # 1. Prepare Timestep: - completed_jobs, killed_jobs, newly_downed_nodes, need_reschedule = \ - self.prepare_timestep(jobs=jobs) + completed_jobs, killed_jobs, need_reschedule = self.prepare_timestep(jobs=jobs) # 2. Identify eligible jobs and add them to the queue. has_new_additions = self.add_eligible_jobs_to_queue(jobs) + need_reschedule = need_reschedule or has_new_additions # 3. Schedule jobs that are now in the queue. - if completed_jobs != [] \ - or killed_jobs != [] \ - or newly_downed_nodes != [] \ - or has_new_additions \ - or need_reschedule: + if need_reschedule: self.scheduler.schedule(self.queue, self.running, self.current_timestep, accounts=self.accounts, @@ -776,16 +790,34 @@ class Engine: print(".", end="", flush=True) # 4. Run tick only at specified time_delta - if 0 == (self.current_timestep % current_time_delta) \ - and ((current_time_delta == 1 - and self.current_timestep % self.config['POWER_UPDATE_FREQ'] == 0) - or (current_time_delta != 1 or self.downscale != 1) - ): - tick_data = self.tick(time_delta=current_time_delta, replay=replay) - tick_data.completed = completed_jobs - tick_data.killed = completed_jobs + if 0 == (self.current_timestep % current_time_delta): + tick_return = self.tick(time_delta=current_time_delta, replay=replay) else: - tick_data = None + pass + + # Yield TickData here! + yield TickData( + current_timestep=self.current_timestep, + completed=completed_jobs, + killed=killed_jobs, + running=self.running, + queue=self.queue, + down_nodes=self.down_nodes, + power_df=tick_return.power_df, + p_flops=tick_return.p_flops, + g_flops_w=tick_return.g_flops_w, + system_util=tick_return.system_util, + fmu_inputs=tick_return.fmu_inputs, + fmu_outputs=tick_return.fmu_outputs, + num_active_nodes=self.num_active_nodes, + num_free_nodes=self.num_free_nodes, + avg_net_rx=tick_return.avg_net_rx, + avg_net_tx=tick_return.avg_net_tx, + avg_net_util=tick_return.avg_net_util, + slowdown_per_job=tick_return.slowdown_per_job, + node_occupancy=tick_return.node_occupancy, + time_delta=self.time_delta + ) # 5. Complete the timestep simulation_done = self.complete_timestep(actively_considered_jobs=jobs, @@ -795,7 +827,6 @@ class Engine: cursor=cursor) if simulation_done: break - yield tick_data def get_job_history_dict(self): return self.job_history_dict -- GitLab From fa9f93d663803e3a90e097177277d2cf48c208b0 Mon Sep 17 00:00:00 2001 From: Matthias Maiterth Date: Tue, 23 Sep 2025 16:43:19 -0400 Subject: [PATCH 2/2] Fix to the optionality of the class' values. --- raps/engine.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/raps/engine.py b/raps/engine.py index fc33e66..67cd999 100644 --- a/raps/engine.py +++ b/raps/engine.py @@ -52,9 +52,9 @@ class TickReturn: system_util: float fmu_inputs: Optional[dict] fmu_outputs: Optional[dict] - avg_net_tx: float - avg_net_rx: float - avg_net_util: float + avg_net_tx: Optional[float] + avg_net_rx: Optional[float] + avg_net_util: Optional[float] slowdown_per_job: float node_occupancy: dict[int, int] @@ -71,16 +71,16 @@ class TickData: power_df: Optional[pd.DataFrame] p_flops: Optional[float] g_flops_w: Optional[float] - system_util: float + system_util: Optional[float] fmu_inputs: Optional[dict] fmu_outputs: Optional[dict] num_active_nodes: int num_free_nodes: int - avg_net_tx: float - avg_net_rx: float - avg_net_util: float - slowdown_per_job: float - node_occupancy: dict[int, int] + avg_net_tx: Optional[float] + avg_net_rx: Optional[float] + avg_net_util: Optional[float] + slowdown_per_job: Optional[float] + node_occupancy: Optional[dict[int, int]] time_delta: int -- GitLab