diff --git a/raps/engine.py b/raps/engine.py index ec02adbf2ff6f7089efc8f3546d109c86283d0ea..67cd999f2b436ee9f3d74ccc1a47f54899436771 100644 --- a/raps/engine.py +++ b/raps/engine.py @@ -43,6 +43,22 @@ from raps.sim_config import SimConfig from bisect import bisect_right +@dataclasses.dataclass +class TickReturn: + """ Represents the state output from the simulation each tick """ + power_df: Optional[pd.DataFrame] + p_flops: Optional[float] + g_flops_w: Optional[float] + system_util: float + fmu_inputs: Optional[dict] + fmu_outputs: Optional[dict] + avg_net_tx: Optional[float] + avg_net_rx: Optional[float] + avg_net_util: Optional[float] + slowdown_per_job: float + node_occupancy: dict[int, int] + + @dataclasses.dataclass class TickData: """ Represents the state output from the simulation each tick """ @@ -55,16 +71,16 @@ class TickData: power_df: Optional[pd.DataFrame] p_flops: Optional[float] g_flops_w: Optional[float] - system_util: float + system_util: Optional[float] fmu_inputs: Optional[dict] fmu_outputs: Optional[dict] num_active_nodes: int num_free_nodes: int - avg_net_tx: float - avg_net_rx: float - avg_net_util: float - slowdown_per_job: float - node_occupancy: dict[int, int] + avg_net_tx: Optional[float] + avg_net_rx: Optional[float] + avg_net_util: Optional[float] + slowdown_per_job: Optional[float] + node_occupancy: Optional[dict[int, int]] time_delta: int @@ -130,8 +146,8 @@ class Engine: system_config = sim_config.get_system_config_by_name(partition) elif len(sim_config.system_configs) > 1: raise ValueError( - "Engine can only run single-partition simulations. Use MultiPartEngine for " + - "multi-partition simulations, or pass partition to select the partition to run." + "Engine can only run single-partition simulations. Use MultiPartEngine for " + + "multi-partition simulations, or pass partition to select the partition to run." ) else: system_config = sim_config.system_configs[0] @@ -185,8 +201,8 @@ class Engine: diff = start - wd.start_date if diff.total_seconds() < 0: raise Exception( - f"{start.isoformat()} is before data range in workload. " + - f"Workload data begins at {wd.start_date.isoformat()}" + f"{start.isoformat()} is before data range in workload. " + + f"Workload data begins at {wd.start_date.isoformat()}" ) wd.telemetry_start += int(diff.total_seconds()) wd.start_date = start @@ -366,16 +382,21 @@ class Engine: return False def prepare_timestep(self, *, replay: bool = True, jobs): + # 0 track need to reschedule # 1 identify completed jobs # 2 Check continuous job generation # 3 Simulate node failure # Defunct feature! # 4 Simulate downtime # 5 Update active and free nodes + need_reschedule = False + # 1 Identify Completed Jobs completed_jobs = [job for job in self.running if job.end_time is not None and job.end_time <= self.current_timestep] + need_reschedule = need_reschedule or (completed_jobs != []) + # Update Completed Jobs, their account and and Free resources. for job in completed_jobs: self.power_manager.set_idle(job.scheduled_nodes) @@ -393,6 +414,8 @@ class Engine: killed_jobs = [job for job in self.running if job.end_time is not None and job.start_time + job.time_limit <= self.current_timestep] + need_reschedule = need_reschedule or (killed_jobs != []) + for job in killed_jobs: self.power_manager.set_idle(job.scheduled_nodes) job.current_state = JobState.TIMEOUT @@ -419,9 +442,12 @@ class Engine: else: newly_downed_nodes = [] - need_reschedule = False + need_reschedule = need_reschedule or (newly_downed_nodes != []) + # 4 Simulate downtime - need_reschedule = self.downtime.check_and_trigger(timestep=self.current_timestep, engine=self) + downtime = self.downtime.check_and_trigger(timestep=self.current_timestep, engine=self) + + need_reschedule = need_reschedule or downtime # 5 Update active/free nodes based on core/GPU utilization if self.config['multitenant']: @@ -447,10 +473,12 @@ class Engine: self.num_active_nodes = self.config['TOTAL_NODES'] \ - len(self.resource_manager.available_nodes) \ - len(self.resource_manager.down_nodes) + if self.down_nodes != self.resource_manager.down_nodes: + need_reschedule = need_reschedule or True self.down_nodes = self.resource_manager.down_nodes # TODO This should only be managed in the resource manager! - return completed_jobs, killed_jobs, newly_downed_nodes, need_reschedule + return completed_jobs, killed_jobs, need_reschedule def complete_timestep(self, *, actively_considered_jobs: List, @@ -522,8 +550,8 @@ class Engine: if job.current_state != JobState.RUNNING: raise ValueError( - f"Job {job.id} is in running list, " + - f"but state is not RUNNING: job.state == {job.current_state}" + f"Job {job.id} is in running list, " + + f"but state is not RUNNING: job.state == {job.current_state}" ) else: # if job.state == JobState.RUNNING: # Error checks @@ -531,13 +559,13 @@ class Engine: raise Exception(f"Job exceded time limit! " f"{job.running_time} > {job.time_limit}" f"\n{job}" - f"\nCurrent timestep:{self.current_timestep-self.timestep_start} (rel)" + f"\nCurrent timestep:{self.current_timestep - self.timestep_start} (rel)" ) if replay and job.running_time > job.expected_run_time: raise Exception(f"Job should have ended in replay! " f" {job.running_time} > {job.expected_run_time}" f"\n{job}" - f"\nCurrent timestep:{self.current_timestep-self.timestep_start} (rel)" + f"\nCurrent timestep:{self.current_timestep - self.timestep_start} (rel)" ) # Aggregate scheduled nodes @@ -627,7 +655,7 @@ class Engine: net_rx_list=net_rx_list, slowdown_factors=slowdown_factors ) - slowdown_per_job = sum(slowdown_factors)/len(slowdown_factors) if len(slowdown_factors) != 0 else 0 + slowdown_per_job = sum(slowdown_factors) / len(slowdown_factors) if len(slowdown_factors) != 0 else 0 self.record_network_stats(avg_tx=avg_tx, avg_rx=avg_rx, avg_net=avg_net) @@ -646,29 +674,19 @@ class Engine: self.node_occupancy_history.append(node_occupancy) - tick_data = TickData( - current_timestep=self.current_timestep, - completed=None, - killed=None, - running=self.running, - queue=self.queue, - down_nodes=self.down_nodes, + return TickReturn( power_df=power_df, p_flops=pflops, g_flops_w=gflops_per_watt, system_util=system_util, fmu_inputs=cooling_inputs, fmu_outputs=cooling_outputs, - num_active_nodes=self.num_active_nodes, - num_free_nodes=self.num_free_nodes, avg_net_tx=avg_tx, avg_net_rx=avg_rx, avg_net_util=avg_net, slowdown_per_job=slowdown_per_job, node_occupancy=node_occupancy, - time_delta=time_delta ) - return tick_data def prepare_system_state(self, *, all_jobs: List, timestep_start, timestep_end): # Set engine timesteps @@ -755,18 +773,14 @@ class Engine: cursor = r # 1. Prepare Timestep: - completed_jobs, killed_jobs, newly_downed_nodes, need_reschedule = \ - self.prepare_timestep(jobs=jobs) + completed_jobs, killed_jobs, need_reschedule = self.prepare_timestep(jobs=jobs) # 2. Identify eligible jobs and add them to the queue. has_new_additions = self.add_eligible_jobs_to_queue(jobs) + need_reschedule = need_reschedule or has_new_additions # 3. Schedule jobs that are now in the queue. - if completed_jobs != [] \ - or killed_jobs != [] \ - or newly_downed_nodes != [] \ - or has_new_additions \ - or need_reschedule: + if need_reschedule: self.scheduler.schedule(self.queue, self.running, self.current_timestep, accounts=self.accounts, @@ -776,16 +790,34 @@ class Engine: print(".", end="", flush=True) # 4. Run tick only at specified time_delta - if 0 == (self.current_timestep % current_time_delta) \ - and ((current_time_delta == 1 - and self.current_timestep % self.config['POWER_UPDATE_FREQ'] == 0) - or (current_time_delta != 1 or self.downscale != 1) - ): - tick_data = self.tick(time_delta=current_time_delta, replay=replay) - tick_data.completed = completed_jobs - tick_data.killed = completed_jobs + if 0 == (self.current_timestep % current_time_delta): + tick_return = self.tick(time_delta=current_time_delta, replay=replay) else: - tick_data = None + pass + + # Yield TickData here! + yield TickData( + current_timestep=self.current_timestep, + completed=completed_jobs, + killed=killed_jobs, + running=self.running, + queue=self.queue, + down_nodes=self.down_nodes, + power_df=tick_return.power_df, + p_flops=tick_return.p_flops, + g_flops_w=tick_return.g_flops_w, + system_util=tick_return.system_util, + fmu_inputs=tick_return.fmu_inputs, + fmu_outputs=tick_return.fmu_outputs, + num_active_nodes=self.num_active_nodes, + num_free_nodes=self.num_free_nodes, + avg_net_rx=tick_return.avg_net_rx, + avg_net_tx=tick_return.avg_net_tx, + avg_net_util=tick_return.avg_net_util, + slowdown_per_job=tick_return.slowdown_per_job, + node_occupancy=tick_return.node_occupancy, + time_delta=self.time_delta + ) # 5. Complete the timestep simulation_done = self.complete_timestep(actively_considered_jobs=jobs, @@ -795,7 +827,6 @@ class Engine: cursor=cursor) if simulation_done: break - yield tick_data def get_job_history_dict(self): return self.job_history_dict