Loading raps/engine.py +77 −46 Original line number Diff line number Diff line Loading @@ -43,6 +43,22 @@ from raps.sim_config import SimConfig from bisect import bisect_right @dataclasses.dataclass class TickReturn: """ Represents the state output from the simulation each tick """ power_df: Optional[pd.DataFrame] p_flops: Optional[float] g_flops_w: Optional[float] system_util: float fmu_inputs: Optional[dict] fmu_outputs: Optional[dict] avg_net_tx: Optional[float] avg_net_rx: Optional[float] avg_net_util: Optional[float] slowdown_per_job: float node_occupancy: dict[int, int] @dataclasses.dataclass class TickData: """ Represents the state output from the simulation each tick """ Loading @@ -55,16 +71,16 @@ class TickData: power_df: Optional[pd.DataFrame] p_flops: Optional[float] g_flops_w: Optional[float] system_util: float system_util: Optional[float] fmu_inputs: Optional[dict] fmu_outputs: Optional[dict] num_active_nodes: int num_free_nodes: int avg_net_tx: float avg_net_rx: float avg_net_util: float slowdown_per_job: float node_occupancy: dict[int, int] avg_net_tx: Optional[float] avg_net_rx: Optional[float] avg_net_util: Optional[float] slowdown_per_job: Optional[float] node_occupancy: Optional[dict[int, int]] time_delta: int Loading Loading @@ -130,8 +146,8 @@ class Engine: system_config = sim_config.get_system_config_by_name(partition) elif len(sim_config.system_configs) > 1: raise ValueError( "Engine can only run single-partition simulations. Use MultiPartEngine for " + "multi-partition simulations, or pass partition to select the partition to run." "Engine can only run single-partition simulations. Use MultiPartEngine for " + "multi-partition simulations, or pass partition to select the partition to run." ) else: system_config = sim_config.system_configs[0] Loading Loading @@ -185,8 +201,8 @@ class Engine: diff = start - wd.start_date if diff.total_seconds() < 0: raise Exception( f"{start.isoformat()} is before data range in workload. " + f"Workload data begins at {wd.start_date.isoformat()}" f"{start.isoformat()} is before data range in workload. " + f"Workload data begins at {wd.start_date.isoformat()}" ) wd.telemetry_start += int(diff.total_seconds()) wd.start_date = start Loading Loading @@ -366,16 +382,21 @@ class Engine: return False def prepare_timestep(self, *, replay: bool = True, jobs): # 0 track need to reschedule # 1 identify completed jobs # 2 Check continuous job generation # 3 Simulate node failure # Defunct feature! # 4 Simulate downtime # 5 Update active and free nodes need_reschedule = False # 1 Identify Completed Jobs completed_jobs = [job for job in self.running if job.end_time is not None and job.end_time <= self.current_timestep] need_reschedule = need_reschedule or (completed_jobs != []) # Update Completed Jobs, their account and and Free resources. for job in completed_jobs: self.power_manager.set_idle(job.scheduled_nodes) Loading @@ -393,6 +414,8 @@ class Engine: killed_jobs = [job for job in self.running if job.end_time is not None and job.start_time + job.time_limit <= self.current_timestep] need_reschedule = need_reschedule or (killed_jobs != []) for job in killed_jobs: self.power_manager.set_idle(job.scheduled_nodes) job.current_state = JobState.TIMEOUT Loading @@ -419,9 +442,12 @@ class Engine: else: newly_downed_nodes = [] need_reschedule = False need_reschedule = need_reschedule or (newly_downed_nodes != []) # 4 Simulate downtime need_reschedule = self.downtime.check_and_trigger(timestep=self.current_timestep, engine=self) downtime = self.downtime.check_and_trigger(timestep=self.current_timestep, engine=self) need_reschedule = need_reschedule or downtime # 5 Update active/free nodes based on core/GPU utilization if self.config['multitenant']: Loading @@ -447,10 +473,12 @@ class Engine: self.num_active_nodes = self.config['TOTAL_NODES'] \ - len(self.resource_manager.available_nodes) \ - len(self.resource_manager.down_nodes) if self.down_nodes != self.resource_manager.down_nodes: need_reschedule = need_reschedule or True self.down_nodes = self.resource_manager.down_nodes # TODO This should only be managed in the resource manager! return completed_jobs, killed_jobs, newly_downed_nodes, need_reschedule return completed_jobs, killed_jobs, need_reschedule def complete_timestep(self, *, actively_considered_jobs: List, Loading Loading @@ -522,8 +550,8 @@ class Engine: if job.current_state != JobState.RUNNING: raise ValueError( f"Job {job.id} is in running list, " + f"but state is not RUNNING: job.state == {job.current_state}" f"Job {job.id} is in running list, " + f"but state is not RUNNING: job.state == {job.current_state}" ) else: # if job.state == JobState.RUNNING: # Error checks Loading Loading @@ -646,29 +674,19 @@ class Engine: self.node_occupancy_history.append(node_occupancy) tick_data = TickData( current_timestep=self.current_timestep, completed=None, killed=None, running=self.running, queue=self.queue, down_nodes=self.down_nodes, return TickReturn( power_df=power_df, p_flops=pflops, g_flops_w=gflops_per_watt, system_util=system_util, fmu_inputs=cooling_inputs, fmu_outputs=cooling_outputs, num_active_nodes=self.num_active_nodes, num_free_nodes=self.num_free_nodes, avg_net_tx=avg_tx, avg_net_rx=avg_rx, avg_net_util=avg_net, slowdown_per_job=slowdown_per_job, node_occupancy=node_occupancy, time_delta=time_delta ) return tick_data def prepare_system_state(self, *, all_jobs: List, timestep_start, timestep_end): # Set engine timesteps Loading Loading @@ -755,18 +773,14 @@ class Engine: cursor = r # 1. Prepare Timestep: completed_jobs, killed_jobs, newly_downed_nodes, need_reschedule = \ self.prepare_timestep(jobs=jobs) completed_jobs, killed_jobs, need_reschedule = self.prepare_timestep(jobs=jobs) # 2. Identify eligible jobs and add them to the queue. has_new_additions = self.add_eligible_jobs_to_queue(jobs) need_reschedule = need_reschedule or has_new_additions # 3. Schedule jobs that are now in the queue. if completed_jobs != [] \ or killed_jobs != [] \ or newly_downed_nodes != [] \ or has_new_additions \ or need_reschedule: if need_reschedule: self.scheduler.schedule(self.queue, self.running, self.current_timestep, accounts=self.accounts, Loading @@ -776,16 +790,34 @@ class Engine: print(".", end="", flush=True) # 4. Run tick only at specified time_delta if 0 == (self.current_timestep % current_time_delta) \ and ((current_time_delta == 1 and self.current_timestep % self.config['POWER_UPDATE_FREQ'] == 0) or (current_time_delta != 1 or self.downscale != 1) ): tick_data = self.tick(time_delta=current_time_delta, replay=replay) tick_data.completed = completed_jobs tick_data.killed = completed_jobs if 0 == (self.current_timestep % current_time_delta): tick_return = self.tick(time_delta=current_time_delta, replay=replay) else: tick_data = None pass # Yield TickData here! yield TickData( current_timestep=self.current_timestep, completed=completed_jobs, killed=killed_jobs, running=self.running, queue=self.queue, down_nodes=self.down_nodes, power_df=tick_return.power_df, p_flops=tick_return.p_flops, g_flops_w=tick_return.g_flops_w, system_util=tick_return.system_util, fmu_inputs=tick_return.fmu_inputs, fmu_outputs=tick_return.fmu_outputs, num_active_nodes=self.num_active_nodes, num_free_nodes=self.num_free_nodes, avg_net_rx=tick_return.avg_net_rx, avg_net_tx=tick_return.avg_net_tx, avg_net_util=tick_return.avg_net_util, slowdown_per_job=tick_return.slowdown_per_job, node_occupancy=tick_return.node_occupancy, time_delta=self.time_delta ) # 5. Complete the timestep simulation_done = self.complete_timestep(actively_considered_jobs=jobs, Loading @@ -795,7 +827,6 @@ class Engine: cursor=cursor) if simulation_done: break yield tick_data def get_job_history_dict(self): return self.job_history_dict Loading Loading
raps/engine.py +77 −46 Original line number Diff line number Diff line Loading @@ -43,6 +43,22 @@ from raps.sim_config import SimConfig from bisect import bisect_right @dataclasses.dataclass class TickReturn: """ Represents the state output from the simulation each tick """ power_df: Optional[pd.DataFrame] p_flops: Optional[float] g_flops_w: Optional[float] system_util: float fmu_inputs: Optional[dict] fmu_outputs: Optional[dict] avg_net_tx: Optional[float] avg_net_rx: Optional[float] avg_net_util: Optional[float] slowdown_per_job: float node_occupancy: dict[int, int] @dataclasses.dataclass class TickData: """ Represents the state output from the simulation each tick """ Loading @@ -55,16 +71,16 @@ class TickData: power_df: Optional[pd.DataFrame] p_flops: Optional[float] g_flops_w: Optional[float] system_util: float system_util: Optional[float] fmu_inputs: Optional[dict] fmu_outputs: Optional[dict] num_active_nodes: int num_free_nodes: int avg_net_tx: float avg_net_rx: float avg_net_util: float slowdown_per_job: float node_occupancy: dict[int, int] avg_net_tx: Optional[float] avg_net_rx: Optional[float] avg_net_util: Optional[float] slowdown_per_job: Optional[float] node_occupancy: Optional[dict[int, int]] time_delta: int Loading Loading @@ -130,8 +146,8 @@ class Engine: system_config = sim_config.get_system_config_by_name(partition) elif len(sim_config.system_configs) > 1: raise ValueError( "Engine can only run single-partition simulations. Use MultiPartEngine for " + "multi-partition simulations, or pass partition to select the partition to run." "Engine can only run single-partition simulations. Use MultiPartEngine for " + "multi-partition simulations, or pass partition to select the partition to run." ) else: system_config = sim_config.system_configs[0] Loading Loading @@ -185,8 +201,8 @@ class Engine: diff = start - wd.start_date if diff.total_seconds() < 0: raise Exception( f"{start.isoformat()} is before data range in workload. " + f"Workload data begins at {wd.start_date.isoformat()}" f"{start.isoformat()} is before data range in workload. " + f"Workload data begins at {wd.start_date.isoformat()}" ) wd.telemetry_start += int(diff.total_seconds()) wd.start_date = start Loading Loading @@ -366,16 +382,21 @@ class Engine: return False def prepare_timestep(self, *, replay: bool = True, jobs): # 0 track need to reschedule # 1 identify completed jobs # 2 Check continuous job generation # 3 Simulate node failure # Defunct feature! # 4 Simulate downtime # 5 Update active and free nodes need_reschedule = False # 1 Identify Completed Jobs completed_jobs = [job for job in self.running if job.end_time is not None and job.end_time <= self.current_timestep] need_reschedule = need_reschedule or (completed_jobs != []) # Update Completed Jobs, their account and and Free resources. for job in completed_jobs: self.power_manager.set_idle(job.scheduled_nodes) Loading @@ -393,6 +414,8 @@ class Engine: killed_jobs = [job for job in self.running if job.end_time is not None and job.start_time + job.time_limit <= self.current_timestep] need_reschedule = need_reschedule or (killed_jobs != []) for job in killed_jobs: self.power_manager.set_idle(job.scheduled_nodes) job.current_state = JobState.TIMEOUT Loading @@ -419,9 +442,12 @@ class Engine: else: newly_downed_nodes = [] need_reschedule = False need_reschedule = need_reschedule or (newly_downed_nodes != []) # 4 Simulate downtime need_reschedule = self.downtime.check_and_trigger(timestep=self.current_timestep, engine=self) downtime = self.downtime.check_and_trigger(timestep=self.current_timestep, engine=self) need_reschedule = need_reschedule or downtime # 5 Update active/free nodes based on core/GPU utilization if self.config['multitenant']: Loading @@ -447,10 +473,12 @@ class Engine: self.num_active_nodes = self.config['TOTAL_NODES'] \ - len(self.resource_manager.available_nodes) \ - len(self.resource_manager.down_nodes) if self.down_nodes != self.resource_manager.down_nodes: need_reschedule = need_reschedule or True self.down_nodes = self.resource_manager.down_nodes # TODO This should only be managed in the resource manager! return completed_jobs, killed_jobs, newly_downed_nodes, need_reschedule return completed_jobs, killed_jobs, need_reschedule def complete_timestep(self, *, actively_considered_jobs: List, Loading Loading @@ -522,8 +550,8 @@ class Engine: if job.current_state != JobState.RUNNING: raise ValueError( f"Job {job.id} is in running list, " + f"but state is not RUNNING: job.state == {job.current_state}" f"Job {job.id} is in running list, " + f"but state is not RUNNING: job.state == {job.current_state}" ) else: # if job.state == JobState.RUNNING: # Error checks Loading Loading @@ -646,29 +674,19 @@ class Engine: self.node_occupancy_history.append(node_occupancy) tick_data = TickData( current_timestep=self.current_timestep, completed=None, killed=None, running=self.running, queue=self.queue, down_nodes=self.down_nodes, return TickReturn( power_df=power_df, p_flops=pflops, g_flops_w=gflops_per_watt, system_util=system_util, fmu_inputs=cooling_inputs, fmu_outputs=cooling_outputs, num_active_nodes=self.num_active_nodes, num_free_nodes=self.num_free_nodes, avg_net_tx=avg_tx, avg_net_rx=avg_rx, avg_net_util=avg_net, slowdown_per_job=slowdown_per_job, node_occupancy=node_occupancy, time_delta=time_delta ) return tick_data def prepare_system_state(self, *, all_jobs: List, timestep_start, timestep_end): # Set engine timesteps Loading Loading @@ -755,18 +773,14 @@ class Engine: cursor = r # 1. Prepare Timestep: completed_jobs, killed_jobs, newly_downed_nodes, need_reschedule = \ self.prepare_timestep(jobs=jobs) completed_jobs, killed_jobs, need_reschedule = self.prepare_timestep(jobs=jobs) # 2. Identify eligible jobs and add them to the queue. has_new_additions = self.add_eligible_jobs_to_queue(jobs) need_reschedule = need_reschedule or has_new_additions # 3. Schedule jobs that are now in the queue. if completed_jobs != [] \ or killed_jobs != [] \ or newly_downed_nodes != [] \ or has_new_additions \ or need_reschedule: if need_reschedule: self.scheduler.schedule(self.queue, self.running, self.current_timestep, accounts=self.accounts, Loading @@ -776,16 +790,34 @@ class Engine: print(".", end="", flush=True) # 4. Run tick only at specified time_delta if 0 == (self.current_timestep % current_time_delta) \ and ((current_time_delta == 1 and self.current_timestep % self.config['POWER_UPDATE_FREQ'] == 0) or (current_time_delta != 1 or self.downscale != 1) ): tick_data = self.tick(time_delta=current_time_delta, replay=replay) tick_data.completed = completed_jobs tick_data.killed = completed_jobs if 0 == (self.current_timestep % current_time_delta): tick_return = self.tick(time_delta=current_time_delta, replay=replay) else: tick_data = None pass # Yield TickData here! yield TickData( current_timestep=self.current_timestep, completed=completed_jobs, killed=killed_jobs, running=self.running, queue=self.queue, down_nodes=self.down_nodes, power_df=tick_return.power_df, p_flops=tick_return.p_flops, g_flops_w=tick_return.g_flops_w, system_util=tick_return.system_util, fmu_inputs=tick_return.fmu_inputs, fmu_outputs=tick_return.fmu_outputs, num_active_nodes=self.num_active_nodes, num_free_nodes=self.num_free_nodes, avg_net_rx=tick_return.avg_net_rx, avg_net_tx=tick_return.avg_net_tx, avg_net_util=tick_return.avg_net_util, slowdown_per_job=tick_return.slowdown_per_job, node_occupancy=tick_return.node_occupancy, time_delta=self.time_delta ) # 5. Complete the timestep simulation_done = self.complete_timestep(actively_considered_jobs=jobs, Loading @@ -795,7 +827,6 @@ class Engine: cursor=cursor) if simulation_done: break yield tick_data def get_job_history_dict(self): return self.job_history_dict Loading