diff --git a/main.py b/main.py index c34b78ca7ee9cb59509ccbf0e3b5a4c791d03fda..b2ca73a3011fcc958d6a7a7629dd1185d163d756 100644 --- a/main.py +++ b/main.py @@ -71,11 +71,13 @@ sc = Engine( ) layout_manager = LayoutManager(args.layout, engine=sc, debug=args.debug, **config) -if args.replay: +timestep_start = 0 +if args.fastforward: + args.fastforward = convert_to_seconds(args.fastforward) + timestep_start = args.fastforward + - if args.fastforward: - args.fastforward = convert_to_seconds(args.fastforward) - timestep_start = args.fastforward +if args.replay: td = Telemetry(**args_dict) @@ -106,8 +108,9 @@ if args.replay: else: # custom data loader print(*args.replay) - jobs, timestep_start, timestep_end = td.load_data(args.replay) + jobs, timestep_start_from_data, timestep_end = td.load_data(args.replay) td.save_snapshot(jobs, filename=DIR_NAME) + timestep_start += timestep_start_from_data # + timestep_start_from_data # Set number of timesteps based on the last job running which we assume # is the maximum value of submit_time + wall_time of all the jobs diff --git a/raps/dataloaders/frontier.py b/raps/dataloaders/frontier.py index e4158c0251160d439a95c82e9a3861350d94bb6c..510fe4c672a8c6eb5b0dc0662b45820afa33d0f4 100644 --- a/raps/dataloaders/frontier.py +++ b/raps/dataloaders/frontier.py @@ -100,7 +100,8 @@ def load_data_from_df(jobs_df: pd.DataFrame, jobprofile_df: pd.DataFrame, **kwar This means that the time between first_start_timestamp and telemetry_start has no associated values in the traces! - The missing values after simulation_end can be ignored, as the simulatuion will have stoped before. + The missing values after simulation_end can be ignored, as the simulatuion + will have stoped before. However, the times before telemetry_start have to be padded to generate correct offsets within their data! @@ -114,8 +115,13 @@ def load_data_from_df(jobs_df: pd.DataFrame, jobprofile_df: pd.DataFrame, **kwar - end_time - wall_time (end_time-start_time, actual runtime in seconds) - trace_time (lenght of each trace in seconds) + - trace_start_time (time offset in seconds after which the trace starts) + - trace_end_time (time offset in seconds after which the trace ends) has to be set for use within the simulation + The values trace_start_time are similar to the telemetry_start and + telemetry_stop but job specific. + The returned values are these three: - The list of parsed jobs. (as a job_dict) - telemetry_start: int (in seconds) @@ -157,7 +163,6 @@ def load_data_from_df(jobs_df: pd.DataFrame, jobprofile_df: pd.DataFrame, **kwar diff = first_start_timestamp - telemetry_start_timestamp first_start = int(diff.total_seconds()) # negative seconds or 0 - num_jobs = len(jobs_df) if debug: print("num_jobs:", num_jobs) @@ -191,7 +196,7 @@ def load_data_from_df(jobs_df: pd.DataFrame, jobprofile_df: pd.DataFrame, **kwar cpu_power_array = cpu_power.values cpu_min_power = nodes_required * config['POWER_CPU_IDLE'] * config['CPUS_PER_NODE'] cpu_max_power = nodes_required * config['POWER_CPU_MAX'] * config['CPUS_PER_NODE'] - cpu_util = power_to_utilization(cpu_power_array, cpu_min_power, cpu_max_power) + cpu_util = power_to_utilization(cpu_power_array, cpu_min_power, cpu_max_power) # Will be negative! as cpu_power_array[i] can be smaller than cpu_min_power cpu_trace = cpu_util * config['CPUS_PER_NODE'] gpu_power = jobprofile_df[jobprofile_df['allocation_id'] \ @@ -222,28 +227,24 @@ def load_data_from_df(jobs_df: pd.DataFrame, jobprofile_df: pd.DataFrame, **kwar diff = end_time_timestamp - telemetry_start_timestamp end_time = diff.total_seconds() - wall_time = end_time - start_time if np.isnan(wall_time): wall_time = 0 trace_time = gpu_trace.size * config['TRACE_QUANTA'] # seconds + trace_start_time = 0 + trace_end_time = trace_time if wall_time > trace_time: - missing_steps = int(wall_time - trace_time) + missing_trace_time = wall_time - trace_time if start_time < 0: - cpu_trace = np.concatenate((np.array([0] * missing_steps),cpu_trace)) - gpu_trace = np.concatenate((np.array([0] * missing_steps),gpu_trace)) - print(f"Job: {job_id} prepended {missing_steps} Values with idle power!") - print(f"{start_time} - {end_time}") + trace_start_time = missing_trace_time + trace_end_time = wall_time elif end_time > telemetry_end: - cpu_trace = np.concatenate((cpu_trace,np.array([0] * missing_steps))) - gpu_trace = np.concatenate((gpu_trace,np.array([0] * missing_steps))) - print(f"Job: {job_id} appended {missing_steps} Values with idle power!") - print(f"{start_time} - {end_time}") + trace_start_time = 0 + trace_end_time = trace_time else: print(f"Job: {job_id} {start_time} - {end_time}!") raise ValueError("Missing values not at start nor end.") - trace_time = wall_time # Pretending to have a full trace, This may not be needed! xnames = jobs_df.loc[jidx, 'xnames'] # Don't replay any job with an empty set of xnames @@ -277,15 +278,14 @@ def load_data_from_df(jobs_df: pd.DataFrame, jobprofile_df: pd.DataFrame, **kwar print("Job starts after last recorded telemetry entry:",job_id, "start:", start_time,"end:",end_time, " Telemetry: ", len(gpu_trace), "entries.") continue # SKIP! - - if gpu_trace.size > 0 and (jid == job_id or jid == '*'): # and time_submit >= 0: job_info = job_dict(nodes_required, name, account, cpu_trace, gpu_trace, [], [], end_state, scheduled_nodes, job_id, priority, # partition missing submit_time=submit_time, time_limit=time_limit, start_time=start_time, end_time=end_time, - wall_time=wall_time, trace_time=trace_time) + wall_time=wall_time, trace_time=trace_time, + trace_start_time=trace_start_time, trace_end_time=trace_end_time) jobs.append(job_info) return jobs, telemetry_start, telemetry_end diff --git a/raps/engine.py b/raps/engine.py index f0560220046c53dbd39bef0d629156325a3c3853..63beea6344244ea8ab5401e1adb7bf9c20c6ce22 100644 --- a/raps/engine.py +++ b/raps/engine.py @@ -148,19 +148,45 @@ class Engine: cpu_utils = [] gpu_utils = [] net_utils = [] + if self.debug: + print(f"Current Time: {self.current_time}") + for job in self.running: + if self.debug: + print(f"JobID: {job.id}") if job.state == JobState.RUNNING: job.running_time = self.current_time - job.start_time - if job.running_time > job.trace_time: - raise ValueError(f"Trace Ended before job ended!\n\ - {job.running_time} > {job.trace_time}\n\ - {len(job.cpu_trace)} vs. {self.running_time // self.config['TRACE_QUANTA']}\ + + if job.running_time > job.wall_time: + raise Exception(f"Job should have ended already!\n\ + {job.running_time} > {job.wall_time}\n\ + {len(job.cpu_trace)} vs. {job.running_time // self.config['TRACE_QUANTA']}\ ") - time_quanta_index = int(job.running_time // self.config['TRACE_QUANTA']) - cpu_util = get_utilization(job.cpu_trace, time_quanta_index) - gpu_util = get_utilization(job.gpu_trace, time_quanta_index) - net_util = 0 + if job.running_time < job.trace_start_time or job.running_time > job.trace_end_time: + cpu_util = 0 # No values available therefore we assume IDLE == 0 + gpu_util = 0 + net_util = 0 + if self.debug: + print("No Values in trace, using IDLE.") + if self.scheduler.policy == PolicyType.REPLAY: + print(f"{job.running_time} < {job.trace_start_time} or {job.running_time} > {job.trace_end_time}") + raise Exception("Replay is using IDLE values! Something is wrong!") + else: + time_quanta_index = int((job.running_time - job.trace_start_time) // self.config['TRACE_QUANTA']) + if time_quanta_index == len(job.cpu_trace): + # If the running time is past the last time step in the + # trace, use the last value in the trace. This can + # happen if the last valid timesteps is e.g. 17%15, + # the last trace value is 15%15 and the next possible + # trace value 30%15 but was not recorded because the + # job ended before. + # For every other error condition trace_start_ and + # _end_time are used! + time_quanta_index -= 1 + cpu_util = get_utilization(job.cpu_trace, time_quanta_index) + gpu_util = get_utilization(job.gpu_trace, time_quanta_index) + net_util = 0 if len(job.ntx_trace) and len(job.nrx_trace): net_tx = get_utilization(job.ntx_trace, time_quanta_index) @@ -188,7 +214,6 @@ class Engine: job.power_history.append(jobs_power[i] * len(job.scheduled_nodes)) del _running_jobs - # Update the power array UI component rack_power, rect_losses = self.power_manager.compute_rack_power() sivoc_losses = self.power_manager.compute_sivoc_losses() @@ -252,41 +277,39 @@ class Engine: self.current_time += 1 return tick_data - def prepare_system_state(self, all_jobs:List, timestep_start): + def prepare_system_state(self, all_jobs:List, timestep_start, replay:bool): # Modifies Jobs object self.current_time = timestep_start - #keep only jobs that have not yet ended + # Keep only jobs that have not yet ended all_jobs[:] = [job for job in all_jobs if job['end_time'] >= timestep_start] all_jobs.sort(key=lambda j: j['submit_time']) self.add_running_jobs_to_queue(all_jobs) # Now process job queue one by one (needed to get the start_time right!) - for job in self.queue: + for job in self.queue[:]: # operate over a slice copy to be able to remove jobs from queue if placed. self.scheduler.schedule([job], self.running, job.start_time, sorted=True) - if len(self.queue) != len(self.running): + self.queue.remove(job) + if replay and len(self.queue) != 0: raise ValueError(f"Something went wrong! Not all jobs could be placed!\nPotential confligt in queue:\n{self.queue}") - self.queue = [] # Empty queue needed as addition one by one does not empty the queue! def run_simulation(self, jobs, timestep_start, timestep_end, autoshutdown=False): """Generator that yields after each simulation tick.""" self.timesteps = timestep_end - timestep_start # Where is this used? - # Place jobs that are currently running, onto the system. - self.prepare_system_state(jobs, timestep_start) - if self.scheduler.policy == PolicyType.REPLAY: replay = True else: replay = False + # Place jobs that are currently running, onto the system. + self.prepare_system_state(jobs, timestep_start, replay) + for timestep in range(timestep_start,timestep_end): completed_jobs, newly_downed_nodes = self.prepare_timestep(replay) # Identify eligible jobs and add them to the queue. - #self.queue += self.eligible_jobs(jobs, self.current_time) - #jobs = self.add_eligible_jobs_to_queue(jobs) self.add_eligible_jobs_to_queue(jobs) # Schedule jobs that are now in the queue. self.scheduler.schedule(self.queue, self.running, self.current_time, sorted=False) diff --git a/raps/job.py b/raps/job.py index 1492da24b58314891e6a2258c4d3092d000b7c70..7fa287eaf9dd98e478441b6bf438938c4ceb737f 100644 --- a/raps/job.py +++ b/raps/job.py @@ -4,7 +4,7 @@ def job_dict(nodes_required, name, account, \ cpu_trace, gpu_trace, ntx_trace, nrx_trace, \ end_state, scheduled_nodes, job_id, priority=0, partition=0, submit_time=0, time_limit=0, start_time=0, end_time=0, - wall_time=0, trace_time=0): + wall_time=0, trace_time=0, trace_start_time=0,trace_end_time=0): """ Return job info dictionary """ return { 'nodes_required': nodes_required, @@ -25,7 +25,9 @@ def job_dict(nodes_required, name, account, \ 'start_time': start_time, 'end_time': end_time, 'wall_time': wall_time, - 'trace_time': trace_time + 'trace_time': trace_time, + 'trace_start_time': trace_start_time, + 'trace_end_time': trace_end_time } @@ -65,6 +67,8 @@ class Job: self.end_time = None # Actual end time when executing or from telemetry self.wall_time = None # end_time - start_time self.trace_time = None # Time period for which traces are available + self.trace_start_time = None # Relative start time of the trace (to running time) + self.trace_end_time = None # Relative end time of the trace self.running_time = 0 # Current running time updated when simulating # If a job dict was given, override the values from the job_dict: @@ -87,6 +91,8 @@ class Job: f"start_time={self.start_time}, end_time={self.end_time}, " f"wall_time={self.wall_time}, " f"trace_time={self.trace_time}, " + f"trace_start_time={self.trace_start_time}, " + f"trace_end_time={self.trace_end_time}, " f"running_time={self.running_time}, state={self._state}, " f"scheduled_nodes={self.scheduled_nodes}, power={self.power}, " f"power_history={self.power_history})") diff --git a/raps/workload.py b/raps/workload.py index 6b20495a37d872f740c0e7ae5e6556255a163154..93b091ba9a5fa76da5edf080ee9294ccd5f15a36 100644 --- a/raps/workload.py +++ b/raps/workload.py @@ -85,7 +85,8 @@ class Workload: jobs.append(job_dict(nodes_required, name, account, cpu_trace, gpu_trace, net_tx, net_rx, \ end_state, None, job_index, priority, partition, - time_to_next_job, time_limit, time_to_next_job, time_to_next_job + wall_time, wall_time, wall_time)) + time_to_next_job, time_limit, time_to_next_job, time_to_next_job + wall_time, wall_time, + wall_time, 0, wall_time)) return jobs @@ -128,7 +129,9 @@ class Workload: 0, # Start time / or None len(gpu_trace) * config['TRACE_QUANTA'], # End time / or None len(gpu_trace) * config['TRACE_QUANTA'], # Wall time - len(gpu_trace) * config['TRACE_QUANTA'] # Trace time + len(gpu_trace) * config['TRACE_QUANTA'], # Trace time + 0, # Trace start time + len(gpu_trace) * config['TRACE_QUANTA'] # Trace end time ) print(job_info) jobs.append(job_info) # Add job to the list @@ -170,7 +173,9 @@ class Workload: 0, # Start time / or None len(gpu_trace) * config['TRACE_QUANTA'], # End time / or None len(gpu_trace) * config['TRACE_QUANTA'], # Wall time - len(gpu_trace) * config['TRACE_QUANTA'] # Trace time + len(gpu_trace) * config['TRACE_QUANTA'], # Trace time + 0, # Trace start time + len(gpu_trace) * config['TRACE_QUANTA'] # Trace end time ) jobs.append(job_info) # Add job to the list @@ -199,7 +204,8 @@ class Workload: f"Max Test {partition}", account, cpu_trace, gpu_trace, net_tx, net_rx, 'COMPLETED', None, None, 100, partition, 0, len(gpu_trace) * config['TRACE_QUANTA'] + 1, - 0, 10800, len(gpu_trace) * config['TRACE_QUANTA'], len(gpu_trace) * config['TRACE_QUANTA'] + 0, 10800, len(gpu_trace) * config['TRACE_QUANTA'], + len(gpu_trace) * config['TRACE_QUANTA'], 0, len(gpu_trace) * config['TRACE_QUANTA'] ) jobs.append(job_info) @@ -211,7 +217,8 @@ class Workload: f"OpenMxP {partition}", account, cpu_trace, gpu_trace, net_tx, net_rx, 'COMPLETED', None, None, 100, partition, 0, len(gpu_trace) * config['TRACE_QUANTA'] + 1, - 10800, 14200, len(gpu_trace) * config['TRACE_QUANTA'], len(gpu_trace) * config['TRACE_QUANTA'] + 10800, 14200, len(gpu_trace) * config['TRACE_QUANTA'], + len(gpu_trace) * config['TRACE_QUANTA'], 0, len(gpu_trace) * config['TRACE_QUANTA'] ) jobs.append(job_info) @@ -223,7 +230,8 @@ class Workload: f"HPL {partition}", account, cpu_trace, gpu_trace, net_tx, net_rx, 'COMPLETED', None, None, 100, partition, 0, len(gpu_trace) * config['TRACE_QUANTA'] + 1, - 14200, 17800, len(gpu_trace) * config['TRACE_QUANTA'], len(gpu_trace) * config['TRACE_QUANTA'] + 14200, 17800, len(gpu_trace) * config['TRACE_QUANTA'], + len(gpu_trace) * config['TRACE_QUANTA'], 0, len(gpu_trace) * config['TRACE_QUANTA'] ) jobs.append(job_info) @@ -235,7 +243,8 @@ class Workload: f"Idle Test {partition}", account, cpu_trace, gpu_trace, net_tx, net_rx, 'COMPLETED', None, None, 100, partition, 0, len(gpu_trace) * config['TRACE_QUANTA'] + 1, - 17800, 21400, len(gpu_trace) * config['TRACE_QUANTA'], len(gpu_trace) * config['TRACE_QUANTA'] + 17800, 21400, len(gpu_trace) * config['TRACE_QUANTA'], + len(gpu_trace) * config['TRACE_QUANTA'], 0, len(gpu_trace) * config['TRACE_QUANTA'] ) jobs.append(job_info)