From 3e8fbb29335802a03881d2bb1a34dbf03748a6cc Mon Sep 17 00:00:00 2001 From: Matthias Maiterth Date: Thu, 20 Feb 2025 18:53:12 -0500 Subject: [PATCH 1/4] This change allows to reproduce a logic error that needs to be fixed! The previous change added padding to the job telemetry if the job started before the simulation. Original Code: cpu_trace = np.concatenate((np.array([] * missing_steps),cpu_trace)) This resulted in GW Power as the values should be between 0 and NUM_PROC. This was fixed by setting the values to 0: cpu_trace = np.concatenate((np.array([0] * missing_steps),cpu_trace)) However: The inial code showed that the error is not only in the number produced, but in the offset chosen within the cpu_trace (and gpu_trace). The newly committed code adds cpu_trace = np.concatenate((np.array([np.NaN] * missing_steps),cpu_trace)) Which triggers this error. The np.NaN should be in the code an the logic error needs to be fixed! --- raps/dataloaders/frontier.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/raps/dataloaders/frontier.py b/raps/dataloaders/frontier.py index e4158c0..6e59686 100644 --- a/raps/dataloaders/frontier.py +++ b/raps/dataloaders/frontier.py @@ -231,13 +231,13 @@ def load_data_from_df(jobs_df: pd.DataFrame, jobprofile_df: pd.DataFrame, **kwar if wall_time > trace_time: missing_steps = int(wall_time - trace_time) if start_time < 0: - cpu_trace = np.concatenate((np.array([0] * missing_steps),cpu_trace)) - gpu_trace = np.concatenate((np.array([0] * missing_steps),gpu_trace)) + cpu_trace = np.concatenate((np.array([np.NaN] * missing_steps),cpu_trace)) + gpu_trace = np.concatenate((np.array([np.NaN] * missing_steps),gpu_trace)) print(f"Job: {job_id} prepended {missing_steps} Values with idle power!") print(f"{start_time} - {end_time}") elif end_time > telemetry_end: - cpu_trace = np.concatenate((cpu_trace,np.array([0] * missing_steps))) - gpu_trace = np.concatenate((gpu_trace,np.array([0] * missing_steps))) + cpu_trace = np.concatenate((cpu_trace,np.array([np.NaN] * missing_steps))) + gpu_trace = np.concatenate((gpu_trace,np.array([np.NaN] * missing_steps))) print(f"Job: {job_id} appended {missing_steps} Values with idle power!") print(f"{start_time} - {end_time}") else: -- GitLab From 8def503eccd880570626981ed05587a843235b0b Mon Sep 17 00:00:00 2001 From: Matthias Maiterth Date: Tue, 25 Feb 2025 14:04:46 -0500 Subject: [PATCH 2/4] Initial fix to fast-forward missing timestep calculation and queue cleanup in prepare timestep --- main.py | 13 ++++++++----- raps/dataloaders/frontier.py | 4 ++-- raps/engine.py | 21 ++++++++++++++------- 3 files changed, 24 insertions(+), 14 deletions(-) diff --git a/main.py b/main.py index c34b78c..b2ca73a 100644 --- a/main.py +++ b/main.py @@ -71,11 +71,13 @@ sc = Engine( ) layout_manager = LayoutManager(args.layout, engine=sc, debug=args.debug, **config) -if args.replay: +timestep_start = 0 +if args.fastforward: + args.fastforward = convert_to_seconds(args.fastforward) + timestep_start = args.fastforward + - if args.fastforward: - args.fastforward = convert_to_seconds(args.fastforward) - timestep_start = args.fastforward +if args.replay: td = Telemetry(**args_dict) @@ -106,8 +108,9 @@ if args.replay: else: # custom data loader print(*args.replay) - jobs, timestep_start, timestep_end = td.load_data(args.replay) + jobs, timestep_start_from_data, timestep_end = td.load_data(args.replay) td.save_snapshot(jobs, filename=DIR_NAME) + timestep_start += timestep_start_from_data # + timestep_start_from_data # Set number of timesteps based on the last job running which we assume # is the maximum value of submit_time + wall_time of all the jobs diff --git a/raps/dataloaders/frontier.py b/raps/dataloaders/frontier.py index 6e59686..123707f 100644 --- a/raps/dataloaders/frontier.py +++ b/raps/dataloaders/frontier.py @@ -191,7 +191,7 @@ def load_data_from_df(jobs_df: pd.DataFrame, jobprofile_df: pd.DataFrame, **kwar cpu_power_array = cpu_power.values cpu_min_power = nodes_required * config['POWER_CPU_IDLE'] * config['CPUS_PER_NODE'] cpu_max_power = nodes_required * config['POWER_CPU_MAX'] * config['CPUS_PER_NODE'] - cpu_util = power_to_utilization(cpu_power_array, cpu_min_power, cpu_max_power) + cpu_util = power_to_utilization(cpu_power_array, cpu_min_power, cpu_max_power) # Will be negative! as cpu_power_array[i] can be smaller than cpu_min_power cpu_trace = cpu_util * config['CPUS_PER_NODE'] gpu_power = jobprofile_df[jobprofile_df['allocation_id'] \ @@ -229,7 +229,7 @@ def load_data_from_df(jobs_df: pd.DataFrame, jobprofile_df: pd.DataFrame, **kwar trace_time = gpu_trace.size * config['TRACE_QUANTA'] # seconds if wall_time > trace_time: - missing_steps = int(wall_time - trace_time) + missing_steps = int((wall_time - trace_time) // config['TRACE_QUANTA']) if start_time < 0: cpu_trace = np.concatenate((np.array([np.NaN] * missing_steps),cpu_trace)) gpu_trace = np.concatenate((np.array([np.NaN] * missing_steps),gpu_trace)) diff --git a/raps/engine.py b/raps/engine.py index f056022..f8c3917 100644 --- a/raps/engine.py +++ b/raps/engine.py @@ -148,8 +148,13 @@ class Engine: cpu_utils = [] gpu_utils = [] net_utils = [] + if self.debug: + print(f"Current Time: {self.current_time}") + for job in self.running: + if self.debug: + print(f"JobID: {job.id}") if job.state == JobState.RUNNING: job.running_time = self.current_time - job.start_time if job.running_time > job.trace_time: @@ -252,7 +257,7 @@ class Engine: self.current_time += 1 return tick_data - def prepare_system_state(self, all_jobs:List, timestep_start): + def prepare_system_state(self, all_jobs:List, timestep_start, replay:bool): # Modifies Jobs object self.current_time = timestep_start @@ -263,24 +268,26 @@ class Engine: self.add_running_jobs_to_queue(all_jobs) # Now process job queue one by one (needed to get the start_time right!) - for job in self.queue: + for job in self.queue[:]: # operate over a slice copy to be able to remove jobs from queue if placed. self.scheduler.schedule([job], self.running, job.start_time, sorted=True) - if len(self.queue) != len(self.running): + self.queue.remove(job) + if replay and len(self.queue) != 0: raise ValueError(f"Something went wrong! Not all jobs could be placed!\nPotential confligt in queue:\n{self.queue}") - self.queue = [] # Empty queue needed as addition one by one does not empty the queue! + def run_simulation(self, jobs, timestep_start, timestep_end, autoshutdown=False): """Generator that yields after each simulation tick.""" self.timesteps = timestep_end - timestep_start # Where is this used? - # Place jobs that are currently running, onto the system. - self.prepare_system_state(jobs, timestep_start) - if self.scheduler.policy == PolicyType.REPLAY: replay = True else: replay = False + # Place jobs that are currently running, onto the system. + self.prepare_system_state(jobs, timestep_start, replay) + + for timestep in range(timestep_start,timestep_end): completed_jobs, newly_downed_nodes = self.prepare_timestep(replay) -- GitLab From 3e6a3d9b1026708e7648c93a5ad6da284466e62c Mon Sep 17 00:00:00 2001 From: Matthias Maiterth Date: Tue, 25 Feb 2025 17:24:51 -0500 Subject: [PATCH 3/4] Rewrite of trace_times using trace_start and trace_end time Commit Before removing comments with offset calculation --- raps/dataloaders/frontier.py | 34 ++++++++++++++----------- raps/engine.py | 48 ++++++++++++++++++++++++++++++------ raps/job.py | 10 ++++++-- raps/workload.py | 23 +++++++++++------ 4 files changed, 84 insertions(+), 31 deletions(-) diff --git a/raps/dataloaders/frontier.py b/raps/dataloaders/frontier.py index 123707f..5ac742e 100644 --- a/raps/dataloaders/frontier.py +++ b/raps/dataloaders/frontier.py @@ -114,6 +114,8 @@ def load_data_from_df(jobs_df: pd.DataFrame, jobprofile_df: pd.DataFrame, **kwar - end_time - wall_time (end_time-start_time, actual runtime in seconds) - trace_time (lenght of each trace in seconds) + - trace_start_time (time offset in seconds after which the trace starts) + - trace_end_time (time offset in seconds after which the trace ends) has to be set for use within the simulation The returned values are these three: @@ -222,28 +224,33 @@ def load_data_from_df(jobs_df: pd.DataFrame, jobprofile_df: pd.DataFrame, **kwar diff = end_time_timestamp - telemetry_start_timestamp end_time = diff.total_seconds() - wall_time = end_time - start_time if np.isnan(wall_time): wall_time = 0 trace_time = gpu_trace.size * config['TRACE_QUANTA'] # seconds + trace_start_time = 0 + trace_end_time = trace_time if wall_time > trace_time: - missing_steps = int((wall_time - trace_time) // config['TRACE_QUANTA']) + missing_trace_time = wall_time - trace_time if start_time < 0: - cpu_trace = np.concatenate((np.array([np.NaN] * missing_steps),cpu_trace)) - gpu_trace = np.concatenate((np.array([np.NaN] * missing_steps),gpu_trace)) - print(f"Job: {job_id} prepended {missing_steps} Values with idle power!") - print(f"{start_time} - {end_time}") + #cpu_trace = np.concatenate((np.array([np.NaN] * missing_steps),cpu_trace)) + #gpu_trace = np.concatenate((np.array([np.NaN] * missing_steps),gpu_trace)) + #print(f"Job: {job_id} prepended {missing_steps} Values with idle power!") + #print(f"{start_time} - {end_time}") + trace_start_time = missing_trace_time + trace_end_time = wall_time elif end_time > telemetry_end: - cpu_trace = np.concatenate((cpu_trace,np.array([np.NaN] * missing_steps))) - gpu_trace = np.concatenate((gpu_trace,np.array([np.NaN] * missing_steps))) - print(f"Job: {job_id} appended {missing_steps} Values with idle power!") - print(f"{start_time} - {end_time}") + #cpu_trace = np.concatenate((cpu_trace,np.array([np.NaN] * missing_steps))) + #gpu_trace = np.concatenate((gpu_trace,np.array([np.NaN] * missing_steps))) + #print(f"Job: {job_id} appended {missing_steps} Values with idle power!") + #print(f"{start_time} - {end_time}") + trace_start_time = 0 + trace_end_time = trace_time else: print(f"Job: {job_id} {start_time} - {end_time}!") raise ValueError("Missing values not at start nor end.") - trace_time = wall_time # Pretending to have a full trace, This may not be needed! + #trace_time = gpu_trace.size * config["TRACE_QUANTA"] # Update trace_time to padded trace xnames = jobs_df.loc[jidx, 'xnames'] # Don't replay any job with an empty set of xnames @@ -277,15 +284,14 @@ def load_data_from_df(jobs_df: pd.DataFrame, jobprofile_df: pd.DataFrame, **kwar print("Job starts after last recorded telemetry entry:",job_id, "start:", start_time,"end:",end_time, " Telemetry: ", len(gpu_trace), "entries.") continue # SKIP! - - if gpu_trace.size > 0 and (jid == job_id or jid == '*'): # and time_submit >= 0: job_info = job_dict(nodes_required, name, account, cpu_trace, gpu_trace, [], [], end_state, scheduled_nodes, job_id, priority, # partition missing submit_time=submit_time, time_limit=time_limit, start_time=start_time, end_time=end_time, - wall_time=wall_time, trace_time=trace_time) + wall_time=wall_time, trace_time=trace_time, + trace_start_time=trace_start_time, trace_end_time=trace_end_time) jobs.append(job_info) return jobs, telemetry_start, telemetry_end diff --git a/raps/engine.py b/raps/engine.py index f8c3917..34ea533 100644 --- a/raps/engine.py +++ b/raps/engine.py @@ -157,15 +157,47 @@ class Engine: print(f"JobID: {job.id}") if job.state == JobState.RUNNING: job.running_time = self.current_time - job.start_time - if job.running_time > job.trace_time: - raise ValueError(f"Trace Ended before job ended!\n\ - {job.running_time} > {job.trace_time}\n\ - {len(job.cpu_trace)} vs. {self.running_time // self.config['TRACE_QUANTA']}\ + + if job.running_time > job.wall_time: + raise Exception(f"Job should have ended already!\n\ + {job.running_time} > {job.wall_time}\n\ + {len(job.cpu_trace)} vs. {job.running_time // self.config['TRACE_QUANTA']}\ ") - time_quanta_index = int(job.running_time // self.config['TRACE_QUANTA']) - cpu_util = get_utilization(job.cpu_trace, time_quanta_index) - gpu_util = get_utilization(job.gpu_trace, time_quanta_index) - net_util = 0 + # Next: compute the time_quanta_index: + # If the running time is past the last time step in the trace, + # use the last value in the trace. This can happen if the end + # time valid timesteps is e.g. 17%15, the last trace value is + # 15%15 and the next possible trace value 30%15 but was not + # recorded because the job ended before. Instead of using a + # additional padding or raisinig an error, the last valid value + # is used. + #if int(job.trace_time // self.config['TRACE_QUANTA']) \ + # <= int(job.running_time // self.config['TRACE_QUANTA']): + # # Make sure only the last interval is missing if any: + # if job.running_time - job.trace_time < self.config['TRACE_QUANTA']: + # time_quanta_index = len(job.cpu_trace) - 1 + # else: + # raise Exception(f"Job is not padded correctly!\n\ + # {job.running_time} > {job.trace_time}\n\ + # {len(job.cpu_trace)} vs. {job.running_time // self.config['TRACE_QUANTA']}\ + # ") + #else: + # time_quanta_index = int(job.running_time // self.config['TRACE_QUANTA']) + + if job.running_time < job.trace_start_time or job.running_time > job.trace_end_time: + cpu_util = 0 # get_utilization(job.cpu_trace, time_quanta_index) + gpu_util = 0 # get_utilization(job.gpu_trace, time_quanta_index) + net_util = 0 + if self.debug: + print("No Values in trace, using IDLE.") + if self.scheduler.policy == PolicyType.REPLAY: + print(f"{job.running_time} < {job.trace_start_time} or {job.running_time} > {job.trace_end_time}") + raise Exception("Replay is using IDLE values! Something is wrong!") + else: + time_quanta_index = int((job.running_time - job.trace_start_time) // self.config['TRACE_QUANTA']) + cpu_util = get_utilization(job.cpu_trace, time_quanta_index) + gpu_util = get_utilization(job.gpu_trace, time_quanta_index) + net_util = 0 if len(job.ntx_trace) and len(job.nrx_trace): net_tx = get_utilization(job.ntx_trace, time_quanta_index) diff --git a/raps/job.py b/raps/job.py index 1492da2..c992598 100644 --- a/raps/job.py +++ b/raps/job.py @@ -4,7 +4,7 @@ def job_dict(nodes_required, name, account, \ cpu_trace, gpu_trace, ntx_trace, nrx_trace, \ end_state, scheduled_nodes, job_id, priority=0, partition=0, submit_time=0, time_limit=0, start_time=0, end_time=0, - wall_time=0, trace_time=0): + wall_time=0, trace_time=0, trace_start_time=0,trace_end_time=0): """ Return job info dictionary """ return { 'nodes_required': nodes_required, @@ -25,7 +25,9 @@ def job_dict(nodes_required, name, account, \ 'start_time': start_time, 'end_time': end_time, 'wall_time': wall_time, - 'trace_time': trace_time + 'trace_time': trace_time, + 'trace_start_time': trace_start_time, + 'trace_end_time': trace_end_time } @@ -65,6 +67,8 @@ class Job: self.end_time = None # Actual end time when executing or from telemetry self.wall_time = None # end_time - start_time self.trace_time = None # Time period for which traces are available + self.trace_start_time = None # Time period for which traces are available + self.trace_end_time = None # Time period for which traces are available self.running_time = 0 # Current running time updated when simulating # If a job dict was given, override the values from the job_dict: @@ -87,6 +91,8 @@ class Job: f"start_time={self.start_time}, end_time={self.end_time}, " f"wall_time={self.wall_time}, " f"trace_time={self.trace_time}, " + f"trace_start_time={self.trace_start_time}, " + f"trace_end_time={self.trace_end_time}, " f"running_time={self.running_time}, state={self._state}, " f"scheduled_nodes={self.scheduled_nodes}, power={self.power}, " f"power_history={self.power_history})") diff --git a/raps/workload.py b/raps/workload.py index 6b20495..93b091b 100644 --- a/raps/workload.py +++ b/raps/workload.py @@ -85,7 +85,8 @@ class Workload: jobs.append(job_dict(nodes_required, name, account, cpu_trace, gpu_trace, net_tx, net_rx, \ end_state, None, job_index, priority, partition, - time_to_next_job, time_limit, time_to_next_job, time_to_next_job + wall_time, wall_time, wall_time)) + time_to_next_job, time_limit, time_to_next_job, time_to_next_job + wall_time, wall_time, + wall_time, 0, wall_time)) return jobs @@ -128,7 +129,9 @@ class Workload: 0, # Start time / or None len(gpu_trace) * config['TRACE_QUANTA'], # End time / or None len(gpu_trace) * config['TRACE_QUANTA'], # Wall time - len(gpu_trace) * config['TRACE_QUANTA'] # Trace time + len(gpu_trace) * config['TRACE_QUANTA'], # Trace time + 0, # Trace start time + len(gpu_trace) * config['TRACE_QUANTA'] # Trace end time ) print(job_info) jobs.append(job_info) # Add job to the list @@ -170,7 +173,9 @@ class Workload: 0, # Start time / or None len(gpu_trace) * config['TRACE_QUANTA'], # End time / or None len(gpu_trace) * config['TRACE_QUANTA'], # Wall time - len(gpu_trace) * config['TRACE_QUANTA'] # Trace time + len(gpu_trace) * config['TRACE_QUANTA'], # Trace time + 0, # Trace start time + len(gpu_trace) * config['TRACE_QUANTA'] # Trace end time ) jobs.append(job_info) # Add job to the list @@ -199,7 +204,8 @@ class Workload: f"Max Test {partition}", account, cpu_trace, gpu_trace, net_tx, net_rx, 'COMPLETED', None, None, 100, partition, 0, len(gpu_trace) * config['TRACE_QUANTA'] + 1, - 0, 10800, len(gpu_trace) * config['TRACE_QUANTA'], len(gpu_trace) * config['TRACE_QUANTA'] + 0, 10800, len(gpu_trace) * config['TRACE_QUANTA'], + len(gpu_trace) * config['TRACE_QUANTA'], 0, len(gpu_trace) * config['TRACE_QUANTA'] ) jobs.append(job_info) @@ -211,7 +217,8 @@ class Workload: f"OpenMxP {partition}", account, cpu_trace, gpu_trace, net_tx, net_rx, 'COMPLETED', None, None, 100, partition, 0, len(gpu_trace) * config['TRACE_QUANTA'] + 1, - 10800, 14200, len(gpu_trace) * config['TRACE_QUANTA'], len(gpu_trace) * config['TRACE_QUANTA'] + 10800, 14200, len(gpu_trace) * config['TRACE_QUANTA'], + len(gpu_trace) * config['TRACE_QUANTA'], 0, len(gpu_trace) * config['TRACE_QUANTA'] ) jobs.append(job_info) @@ -223,7 +230,8 @@ class Workload: f"HPL {partition}", account, cpu_trace, gpu_trace, net_tx, net_rx, 'COMPLETED', None, None, 100, partition, 0, len(gpu_trace) * config['TRACE_QUANTA'] + 1, - 14200, 17800, len(gpu_trace) * config['TRACE_QUANTA'], len(gpu_trace) * config['TRACE_QUANTA'] + 14200, 17800, len(gpu_trace) * config['TRACE_QUANTA'], + len(gpu_trace) * config['TRACE_QUANTA'], 0, len(gpu_trace) * config['TRACE_QUANTA'] ) jobs.append(job_info) @@ -235,7 +243,8 @@ class Workload: f"Idle Test {partition}", account, cpu_trace, gpu_trace, net_tx, net_rx, 'COMPLETED', None, None, 100, partition, 0, len(gpu_trace) * config['TRACE_QUANTA'] + 1, - 17800, 21400, len(gpu_trace) * config['TRACE_QUANTA'], len(gpu_trace) * config['TRACE_QUANTA'] + 17800, 21400, len(gpu_trace) * config['TRACE_QUANTA'], + len(gpu_trace) * config['TRACE_QUANTA'], 0, len(gpu_trace) * config['TRACE_QUANTA'] ) jobs.append(job_info) -- GitLab From e7a6c96a19a1e0d7bece25ca70e616ccc3960c78 Mon Sep 17 00:00:00 2001 From: Matthias Maiterth Date: Tue, 25 Feb 2025 18:09:10 -0500 Subject: [PATCH 4/4] Rewrote engine, jobs and frontier dataloader to use trace_start_time and trace_end_time. This replaces the nan/0 padded version of the trace files. For replay errors are raised if the simulation tries to access values outside of the recorded data. For re-schedule jobs idle values are used if the re-scheduled part of the job has no associated telemetry. --- raps/dataloaders/frontier.py | 16 +++++--------- raps/engine.py | 42 +++++++++++------------------------- raps/job.py | 4 ++-- 3 files changed, 20 insertions(+), 42 deletions(-) diff --git a/raps/dataloaders/frontier.py b/raps/dataloaders/frontier.py index 5ac742e..510fe4c 100644 --- a/raps/dataloaders/frontier.py +++ b/raps/dataloaders/frontier.py @@ -100,7 +100,8 @@ def load_data_from_df(jobs_df: pd.DataFrame, jobprofile_df: pd.DataFrame, **kwar This means that the time between first_start_timestamp and telemetry_start has no associated values in the traces! - The missing values after simulation_end can be ignored, as the simulatuion will have stoped before. + The missing values after simulation_end can be ignored, as the simulatuion + will have stoped before. However, the times before telemetry_start have to be padded to generate correct offsets within their data! @@ -118,6 +119,9 @@ def load_data_from_df(jobs_df: pd.DataFrame, jobprofile_df: pd.DataFrame, **kwar - trace_end_time (time offset in seconds after which the trace ends) has to be set for use within the simulation + The values trace_start_time are similar to the telemetry_start and + telemetry_stop but job specific. + The returned values are these three: - The list of parsed jobs. (as a job_dict) - telemetry_start: int (in seconds) @@ -159,7 +163,6 @@ def load_data_from_df(jobs_df: pd.DataFrame, jobprofile_df: pd.DataFrame, **kwar diff = first_start_timestamp - telemetry_start_timestamp first_start = int(diff.total_seconds()) # negative seconds or 0 - num_jobs = len(jobs_df) if debug: print("num_jobs:", num_jobs) @@ -234,23 +237,14 @@ def load_data_from_df(jobs_df: pd.DataFrame, jobprofile_df: pd.DataFrame, **kwar if wall_time > trace_time: missing_trace_time = wall_time - trace_time if start_time < 0: - #cpu_trace = np.concatenate((np.array([np.NaN] * missing_steps),cpu_trace)) - #gpu_trace = np.concatenate((np.array([np.NaN] * missing_steps),gpu_trace)) - #print(f"Job: {job_id} prepended {missing_steps} Values with idle power!") - #print(f"{start_time} - {end_time}") trace_start_time = missing_trace_time trace_end_time = wall_time elif end_time > telemetry_end: - #cpu_trace = np.concatenate((cpu_trace,np.array([np.NaN] * missing_steps))) - #gpu_trace = np.concatenate((gpu_trace,np.array([np.NaN] * missing_steps))) - #print(f"Job: {job_id} appended {missing_steps} Values with idle power!") - #print(f"{start_time} - {end_time}") trace_start_time = 0 trace_end_time = trace_time else: print(f"Job: {job_id} {start_time} - {end_time}!") raise ValueError("Missing values not at start nor end.") - #trace_time = gpu_trace.size * config["TRACE_QUANTA"] # Update trace_time to padded trace xnames = jobs_df.loc[jidx, 'xnames'] # Don't replay any job with an empty set of xnames diff --git a/raps/engine.py b/raps/engine.py index 34ea533..63beea6 100644 --- a/raps/engine.py +++ b/raps/engine.py @@ -163,30 +163,9 @@ class Engine: {job.running_time} > {job.wall_time}\n\ {len(job.cpu_trace)} vs. {job.running_time // self.config['TRACE_QUANTA']}\ ") - # Next: compute the time_quanta_index: - # If the running time is past the last time step in the trace, - # use the last value in the trace. This can happen if the end - # time valid timesteps is e.g. 17%15, the last trace value is - # 15%15 and the next possible trace value 30%15 but was not - # recorded because the job ended before. Instead of using a - # additional padding or raisinig an error, the last valid value - # is used. - #if int(job.trace_time // self.config['TRACE_QUANTA']) \ - # <= int(job.running_time // self.config['TRACE_QUANTA']): - # # Make sure only the last interval is missing if any: - # if job.running_time - job.trace_time < self.config['TRACE_QUANTA']: - # time_quanta_index = len(job.cpu_trace) - 1 - # else: - # raise Exception(f"Job is not padded correctly!\n\ - # {job.running_time} > {job.trace_time}\n\ - # {len(job.cpu_trace)} vs. {job.running_time // self.config['TRACE_QUANTA']}\ - # ") - #else: - # time_quanta_index = int(job.running_time // self.config['TRACE_QUANTA']) - if job.running_time < job.trace_start_time or job.running_time > job.trace_end_time: - cpu_util = 0 # get_utilization(job.cpu_trace, time_quanta_index) - gpu_util = 0 # get_utilization(job.gpu_trace, time_quanta_index) + cpu_util = 0 # No values available therefore we assume IDLE == 0 + gpu_util = 0 net_util = 0 if self.debug: print("No Values in trace, using IDLE.") @@ -195,6 +174,16 @@ class Engine: raise Exception("Replay is using IDLE values! Something is wrong!") else: time_quanta_index = int((job.running_time - job.trace_start_time) // self.config['TRACE_QUANTA']) + if time_quanta_index == len(job.cpu_trace): + # If the running time is past the last time step in the + # trace, use the last value in the trace. This can + # happen if the last valid timesteps is e.g. 17%15, + # the last trace value is 15%15 and the next possible + # trace value 30%15 but was not recorded because the + # job ended before. + # For every other error condition trace_start_ and + # _end_time are used! + time_quanta_index -= 1 cpu_util = get_utilization(job.cpu_trace, time_quanta_index) gpu_util = get_utilization(job.gpu_trace, time_quanta_index) net_util = 0 @@ -225,7 +214,6 @@ class Engine: job.power_history.append(jobs_power[i] * len(job.scheduled_nodes)) del _running_jobs - # Update the power array UI component rack_power, rect_losses = self.power_manager.compute_rack_power() sivoc_losses = self.power_manager.compute_sivoc_losses() @@ -293,7 +281,7 @@ class Engine: # Modifies Jobs object self.current_time = timestep_start - #keep only jobs that have not yet ended + # Keep only jobs that have not yet ended all_jobs[:] = [job for job in all_jobs if job['end_time'] >= timestep_start] all_jobs.sort(key=lambda j: j['submit_time']) @@ -306,7 +294,6 @@ class Engine: if replay and len(self.queue) != 0: raise ValueError(f"Something went wrong! Not all jobs could be placed!\nPotential confligt in queue:\n{self.queue}") - def run_simulation(self, jobs, timestep_start, timestep_end, autoshutdown=False): """Generator that yields after each simulation tick.""" self.timesteps = timestep_end - timestep_start # Where is this used? @@ -319,13 +306,10 @@ class Engine: # Place jobs that are currently running, onto the system. self.prepare_system_state(jobs, timestep_start, replay) - for timestep in range(timestep_start,timestep_end): completed_jobs, newly_downed_nodes = self.prepare_timestep(replay) # Identify eligible jobs and add them to the queue. - #self.queue += self.eligible_jobs(jobs, self.current_time) - #jobs = self.add_eligible_jobs_to_queue(jobs) self.add_eligible_jobs_to_queue(jobs) # Schedule jobs that are now in the queue. self.scheduler.schedule(self.queue, self.running, self.current_time, sorted=False) diff --git a/raps/job.py b/raps/job.py index c992598..7fa287e 100644 --- a/raps/job.py +++ b/raps/job.py @@ -67,8 +67,8 @@ class Job: self.end_time = None # Actual end time when executing or from telemetry self.wall_time = None # end_time - start_time self.trace_time = None # Time period for which traces are available - self.trace_start_time = None # Time period for which traces are available - self.trace_end_time = None # Time period for which traces are available + self.trace_start_time = None # Relative start time of the trace (to running time) + self.trace_end_time = None # Relative end time of the trace self.running_time = 0 # Current running time updated when simulating # If a job dict was given, override the values from the job_dict: -- GitLab