Commit e7a6c96a authored by Maiterth, Matthias's avatar Maiterth, Matthias
Browse files

Rewrote engine, jobs and frontier dataloader to use trace_start_time and trace_end_time.

This replaces the nan/0 padded version of the trace files.
For replay errors are raised if the simulation tries to access values outside of the recorded data.
For re-schedule jobs idle values are used if the re-scheduled part of the job has no associated telemetry.
parent 3e6a3d9b
Loading
Loading
Loading
Loading
+5 −11
Original line number Diff line number Diff line
@@ -100,7 +100,8 @@ def load_data_from_df(jobs_df: pd.DataFrame, jobprofile_df: pd.DataFrame, **kwar

    This means that the time between first_start_timestamp and telemetry_start
    has no associated values in the traces!
    The missing values after simulation_end can be ignored, as the simulatuion will have stoped before.
    The missing values after simulation_end can be ignored, as the simulatuion
    will have stoped before.

    However, the times before telemetry_start have to be padded to generate
    correct offsets within their data!
@@ -118,6 +119,9 @@ def load_data_from_df(jobs_df: pd.DataFrame, jobprofile_df: pd.DataFrame, **kwar
    - trace_end_time (time offset in seconds after which the trace ends)
    has to be set for use within the simulation

    The values trace_start_time are similar to the telemetry_start and
    telemetry_stop but job specific.

    The returned values are these three:
        - The list of parsed jobs. (as a job_dict)
        - telemetry_start: int (in seconds)
@@ -159,7 +163,6 @@ def load_data_from_df(jobs_df: pd.DataFrame, jobprofile_df: pd.DataFrame, **kwar
    diff = first_start_timestamp - telemetry_start_timestamp
    first_start = int(diff.total_seconds())  # negative seconds or 0


    num_jobs = len(jobs_df)
    if debug:
        print("num_jobs:", num_jobs)
@@ -234,23 +237,14 @@ def load_data_from_df(jobs_df: pd.DataFrame, jobprofile_df: pd.DataFrame, **kwar
        if wall_time > trace_time:
            missing_trace_time = wall_time - trace_time
            if start_time < 0:
                #cpu_trace = np.concatenate((np.array([np.NaN] * missing_steps),cpu_trace))
                #gpu_trace = np.concatenate((np.array([np.NaN] * missing_steps),gpu_trace))
                #print(f"Job: {job_id} prepended {missing_steps} Values with idle power!")
                #print(f"{start_time} - {end_time}")
                trace_start_time = missing_trace_time
                trace_end_time = wall_time
            elif end_time > telemetry_end:
                #cpu_trace = np.concatenate((cpu_trace,np.array([np.NaN] * missing_steps)))
                #gpu_trace = np.concatenate((gpu_trace,np.array([np.NaN] * missing_steps)))
                #print(f"Job: {job_id} appended {missing_steps} Values with idle power!")
                #print(f"{start_time} - {end_time}")
                trace_start_time = 0
                trace_end_time = trace_time
            else:
                print(f"Job: {job_id} {start_time} - {end_time}!")
                raise ValueError("Missing values not at start nor end.")
            #trace_time = gpu_trace.size * config["TRACE_QUANTA"]  # Update trace_time to padded trace

        xnames = jobs_df.loc[jidx, 'xnames']
        # Don't replay any job with an empty set of xnames
+13 −29
Original line number Diff line number Diff line
@@ -163,30 +163,9 @@ class Engine:
                                       {job.running_time} > {job.wall_time}\n\
                                       {len(job.cpu_trace)} vs. {job.running_time // self.config['TRACE_QUANTA']}\
                                      ")
                # Next: compute the time_quanta_index:
                # If the running time is past the last time step in the trace,
                # use the last value in the trace. This can happen if the end
                # time valid timesteps is e.g. 17%15, the last trace value is
                # 15%15 and the next possible trace value 30%15 but was not
                # recorded because the job ended before. Instead of using a
                # additional padding or raisinig an error, the last valid value
                # is used.
                #if int(job.trace_time // self.config['TRACE_QUANTA']) \
                #   <= int(job.running_time // self.config['TRACE_QUANTA']):
                #    # Make sure only the last interval is missing if any:
                #    if job.running_time - job.trace_time < self.config['TRACE_QUANTA']:
                #        time_quanta_index = len(job.cpu_trace) - 1
                #    else:
                #        raise Exception(f"Job is not padded correctly!\n\
                #                        {job.running_time} > {job.trace_time}\n\
                #                        {len(job.cpu_trace)} vs. {job.running_time // self.config['TRACE_QUANTA']}\
                #                        ")
                #else:
                #    time_quanta_index = int(job.running_time // self.config['TRACE_QUANTA'])

                if job.running_time < job.trace_start_time or job.running_time > job.trace_end_time:
                    cpu_util = 0  # get_utilization(job.cpu_trace, time_quanta_index)
                    gpu_util = 0  # get_utilization(job.gpu_trace, time_quanta_index)
                    cpu_util = 0  # No values available therefore we assume IDLE == 0
                    gpu_util = 0
                    net_util = 0
                    if self.debug:
                        print("No Values in trace, using IDLE.")
@@ -195,6 +174,16 @@ class Engine:
                        raise Exception("Replay is using IDLE values! Something is wrong!")
                else:
                    time_quanta_index = int((job.running_time - job.trace_start_time) // self.config['TRACE_QUANTA'])
                    if time_quanta_index == len(job.cpu_trace):
                        # If the running time is past the last time step in the
                        # trace, use the last value in the trace. This can
                        # happen if the last valid timesteps is e.g. 17%15,
                        # the last trace value is 15%15 and the next possible
                        # trace value 30%15 but was not recorded because the
                        # job ended before.
                        # For every other error condition trace_start_ and
                        # _end_time are used!
                        time_quanta_index -= 1
                    cpu_util = get_utilization(job.cpu_trace, time_quanta_index)
                    gpu_util = get_utilization(job.gpu_trace, time_quanta_index)
                    net_util = 0
@@ -225,7 +214,6 @@ class Engine:
                    job.power_history.append(jobs_power[i] * len(job.scheduled_nodes))
            del _running_jobs


        # Update the power array UI component
        rack_power, rect_losses = self.power_manager.compute_rack_power()
        sivoc_losses = self.power_manager.compute_sivoc_losses()
@@ -293,7 +281,7 @@ class Engine:
        # Modifies Jobs object
        self.current_time = timestep_start

        #keep only jobs that have not yet ended
        # Keep only jobs that have not yet ended
        all_jobs[:] = [job for job in all_jobs if job['end_time'] >= timestep_start]

        all_jobs.sort(key=lambda j: j['submit_time'])
@@ -306,7 +294,6 @@ class Engine:
        if replay and len(self.queue) != 0:
            raise ValueError(f"Something went wrong! Not all jobs could be placed!\nPotential confligt in queue:\n{self.queue}")


    def run_simulation(self, jobs, timestep_start, timestep_end, autoshutdown=False):
        """Generator that yields after each simulation tick."""
        self.timesteps = timestep_end - timestep_start  # Where is this used?
@@ -319,13 +306,10 @@ class Engine:
        # Place jobs that are currently running, onto the system.
        self.prepare_system_state(jobs, timestep_start, replay)


        for timestep in range(timestep_start,timestep_end):
            completed_jobs, newly_downed_nodes = self.prepare_timestep(replay)

            # Identify eligible jobs and add them to the queue.
            #self.queue += self.eligible_jobs(jobs, self.current_time)
            #jobs = self.add_eligible_jobs_to_queue(jobs)
            self.add_eligible_jobs_to_queue(jobs)
            # Schedule jobs that are now in the queue.
            self.scheduler.schedule(self.queue, self.running, self.current_time, sorted=False)
+2 −2
Original line number Diff line number Diff line
@@ -67,8 +67,8 @@ class Job:
        self.end_time = None      # Actual end time when executing or from telemetry
        self.wall_time = None     # end_time - start_time
        self.trace_time = None    # Time period for which traces are available
        self.trace_start_time = None    # Time period for which traces are available
        self.trace_end_time = None    # Time period for which traces are available
        self.trace_start_time = None  # Relative start time of the trace (to running time)
        self.trace_end_time = None    # Relative end time of the trace
        self.running_time = 0     # Current running time updated when simulating

        # If a job dict was given, override the values from the job_dict: