Commit f0fe1ab7 authored by Maiterth, Matthias's avatar Maiterth, Matthias
Browse files

Merge branch 'new-replay-provoking-error' into 'scheduler-replay'

Rewrite of the zero/NaN padded telemetry to use

See merge request !75
parents 1d3013a1 e7a6c96a
Loading
Loading
Loading
Loading
+8 −5
Original line number Diff line number Diff line
@@ -71,12 +71,14 @@ sc = Engine(
)
layout_manager = LayoutManager(args.layout, engine=sc, debug=args.debug, **config)

if args.replay:

timestep_start = 0
if args.fastforward:
    args.fastforward = convert_to_seconds(args.fastforward)
    timestep_start = args.fastforward


if args.replay:

    td = Telemetry(**args_dict)

    # Try to extract date from given name to use as case directory
@@ -106,8 +108,9 @@ if args.replay:

    else:  # custom data loader
        print(*args.replay)
        jobs, timestep_start, timestep_end = td.load_data(args.replay)
        jobs, timestep_start_from_data, timestep_end = td.load_data(args.replay)
        td.save_snapshot(jobs, filename=DIR_NAME)
        timestep_start += timestep_start_from_data  # + timestep_start_from_data

    # Set number of timesteps based on the last job running which we assume
    # is the maximum value of submit_time + wall_time of all the jobs
+17 −17
Original line number Diff line number Diff line
@@ -100,7 +100,8 @@ def load_data_from_df(jobs_df: pd.DataFrame, jobprofile_df: pd.DataFrame, **kwar

    This means that the time between first_start_timestamp and telemetry_start
    has no associated values in the traces!
    The missing values after simulation_end can be ignored, as the simulatuion will have stoped before.
    The missing values after simulation_end can be ignored, as the simulatuion
    will have stoped before.

    However, the times before telemetry_start have to be padded to generate
    correct offsets within their data!
@@ -114,8 +115,13 @@ def load_data_from_df(jobs_df: pd.DataFrame, jobprofile_df: pd.DataFrame, **kwar
    - end_time
    - wall_time (end_time-start_time, actual runtime in seconds)
    - trace_time (lenght of each trace in seconds)
    - trace_start_time (time offset in seconds after which the trace starts)
    - trace_end_time (time offset in seconds after which the trace ends)
    has to be set for use within the simulation

    The values trace_start_time are similar to the telemetry_start and
    telemetry_stop but job specific.

    The returned values are these three:
        - The list of parsed jobs. (as a job_dict)
        - telemetry_start: int (in seconds)
@@ -157,7 +163,6 @@ def load_data_from_df(jobs_df: pd.DataFrame, jobprofile_df: pd.DataFrame, **kwar
    diff = first_start_timestamp - telemetry_start_timestamp
    first_start = int(diff.total_seconds())  # negative seconds or 0


    num_jobs = len(jobs_df)
    if debug:
        print("num_jobs:", num_jobs)
@@ -191,7 +196,7 @@ def load_data_from_df(jobs_df: pd.DataFrame, jobprofile_df: pd.DataFrame, **kwar
            cpu_power_array = cpu_power.values
            cpu_min_power = nodes_required * config['POWER_CPU_IDLE'] * config['CPUS_PER_NODE']
            cpu_max_power = nodes_required * config['POWER_CPU_MAX'] * config['CPUS_PER_NODE']
            cpu_util = power_to_utilization(cpu_power_array, cpu_min_power, cpu_max_power)
            cpu_util = power_to_utilization(cpu_power_array, cpu_min_power, cpu_max_power)  # Will be negative! as cpu_power_array[i] can be smaller than cpu_min_power
            cpu_trace = cpu_util * config['CPUS_PER_NODE']

            gpu_power = jobprofile_df[jobprofile_df['allocation_id'] \
@@ -222,28 +227,24 @@ def load_data_from_df(jobs_df: pd.DataFrame, jobprofile_df: pd.DataFrame, **kwar
        diff = end_time_timestamp - telemetry_start_timestamp
        end_time = diff.total_seconds()


        wall_time = end_time - start_time
        if np.isnan(wall_time):
            wall_time = 0

        trace_time = gpu_trace.size * config['TRACE_QUANTA']  # seconds
        trace_start_time = 0
        trace_end_time = trace_time
        if wall_time > trace_time:
            missing_steps = int(wall_time - trace_time)
            missing_trace_time = wall_time - trace_time
            if start_time < 0:
                cpu_trace = np.concatenate((np.array([0] * missing_steps),cpu_trace))
                gpu_trace = np.concatenate((np.array([0] * missing_steps),gpu_trace))
                print(f"Job: {job_id} prepended {missing_steps} Values with idle power!")
                print(f"{start_time} - {end_time}")
                trace_start_time = missing_trace_time
                trace_end_time = wall_time
            elif end_time > telemetry_end:
                cpu_trace = np.concatenate((cpu_trace,np.array([0] * missing_steps)))
                gpu_trace = np.concatenate((gpu_trace,np.array([0] * missing_steps)))
                print(f"Job: {job_id} appended {missing_steps} Values with idle power!")
                print(f"{start_time} - {end_time}")
                trace_start_time = 0
                trace_end_time = trace_time
            else:
                print(f"Job: {job_id} {start_time} - {end_time}!")
                raise ValueError("Missing values not at start nor end.")
            trace_time = wall_time  # Pretending to have a full trace, This may not be needed!

        xnames = jobs_df.loc[jidx, 'xnames']
        # Don't replay any job with an empty set of xnames
@@ -277,15 +278,14 @@ def load_data_from_df(jobs_df: pd.DataFrame, jobprofile_df: pd.DataFrame, **kwar
            print("Job starts after last recorded telemetry entry:",job_id, "start:", start_time,"end:",end_time, " Telemetry: ", len(gpu_trace), "entries.")
            continue  # SKIP!



        if gpu_trace.size > 0 and (jid == job_id or jid == '*'):  # and time_submit >= 0:
            job_info = job_dict(nodes_required, name, account, cpu_trace, gpu_trace, [], [],
                                end_state, scheduled_nodes,
                                job_id, priority,  # partition missing
                                submit_time=submit_time, time_limit=time_limit,
                                start_time=start_time, end_time=end_time,
                                wall_time=wall_time, trace_time=trace_time)
                                wall_time=wall_time, trace_time=trace_time,
                                trace_start_time=trace_start_time, trace_end_time=trace_end_time)
            jobs.append(job_info)

    return jobs, telemetry_start, telemetry_end
+42 −19
Original line number Diff line number Diff line
@@ -148,16 +148,42 @@ class Engine:
        cpu_utils = []
        gpu_utils = []
        net_utils = []
        if self.debug:
                print(f"Current Time: {self.current_time}")

        for job in self.running:

            if self.debug:
                print(f"JobID: {job.id}")
            if job.state == JobState.RUNNING:
                job.running_time = self.current_time - job.start_time
                if job.running_time > job.trace_time:
                    raise ValueError(f"Trace Ended before job ended!\n\
                                       {job.running_time} > {job.trace_time}\n\
                                       {len(job.cpu_trace)} vs. {self.running_time // self.config['TRACE_QUANTA']}\

                if job.running_time > job.wall_time:
                    raise Exception(f"Job should have ended already!\n\
                                       {job.running_time} > {job.wall_time}\n\
                                       {len(job.cpu_trace)} vs. {job.running_time // self.config['TRACE_QUANTA']}\
                                      ")
                time_quanta_index = int(job.running_time // self.config['TRACE_QUANTA'])
                if job.running_time < job.trace_start_time or job.running_time > job.trace_end_time:
                    cpu_util = 0  # No values available therefore we assume IDLE == 0
                    gpu_util = 0
                    net_util = 0
                    if self.debug:
                        print("No Values in trace, using IDLE.")
                    if self.scheduler.policy == PolicyType.REPLAY:
                        print(f"{job.running_time} < {job.trace_start_time} or {job.running_time} > {job.trace_end_time}")
                        raise Exception("Replay is using IDLE values! Something is wrong!")
                else:
                    time_quanta_index = int((job.running_time - job.trace_start_time) // self.config['TRACE_QUANTA'])
                    if time_quanta_index == len(job.cpu_trace):
                        # If the running time is past the last time step in the
                        # trace, use the last value in the trace. This can
                        # happen if the last valid timesteps is e.g. 17%15,
                        # the last trace value is 15%15 and the next possible
                        # trace value 30%15 but was not recorded because the
                        # job ended before.
                        # For every other error condition trace_start_ and
                        # _end_time are used!
                        time_quanta_index -= 1
                    cpu_util = get_utilization(job.cpu_trace, time_quanta_index)
                    gpu_util = get_utilization(job.gpu_trace, time_quanta_index)
                    net_util = 0
@@ -188,7 +214,6 @@ class Engine:
                    job.power_history.append(jobs_power[i] * len(job.scheduled_nodes))
            del _running_jobs


        # Update the power array UI component
        rack_power, rect_losses = self.power_manager.compute_rack_power()
        sivoc_losses = self.power_manager.compute_sivoc_losses()
@@ -252,41 +277,39 @@ class Engine:
        self.current_time += 1
        return tick_data

    def prepare_system_state(self, all_jobs:List, timestep_start):
    def prepare_system_state(self, all_jobs:List, timestep_start, replay:bool):
        # Modifies Jobs object
        self.current_time = timestep_start

        #keep only jobs that have not yet ended
        # Keep only jobs that have not yet ended
        all_jobs[:] = [job for job in all_jobs if job['end_time'] >= timestep_start]

        all_jobs.sort(key=lambda j: j['submit_time'])

        self.add_running_jobs_to_queue(all_jobs)
        # Now process job queue one by one (needed to get the start_time right!)
        for job in self.queue:
        for job in self.queue[:]:  # operate over a slice copy to be able to remove jobs from queue if placed.
            self.scheduler.schedule([job], self.running, job.start_time, sorted=True)
        if len(self.queue) != len(self.running):
            self.queue.remove(job)
        if replay and len(self.queue) != 0:
            raise ValueError(f"Something went wrong! Not all jobs could be placed!\nPotential confligt in queue:\n{self.queue}")
        self.queue = []  # Empty queue needed as addition one by one does not empty the queue!

    def run_simulation(self, jobs, timestep_start, timestep_end, autoshutdown=False):
        """Generator that yields after each simulation tick."""
        self.timesteps = timestep_end - timestep_start  # Where is this used?

        # Place jobs that are currently running, onto the system.
        self.prepare_system_state(jobs, timestep_start)

        if self.scheduler.policy == PolicyType.REPLAY:
            replay = True
        else:
            replay = False

        # Place jobs that are currently running, onto the system.
        self.prepare_system_state(jobs, timestep_start, replay)

        for timestep in range(timestep_start,timestep_end):
            completed_jobs, newly_downed_nodes = self.prepare_timestep(replay)

            # Identify eligible jobs and add them to the queue.
            #self.queue += self.eligible_jobs(jobs, self.current_time)
            #jobs = self.add_eligible_jobs_to_queue(jobs)
            self.add_eligible_jobs_to_queue(jobs)
            # Schedule jobs that are now in the queue.
            self.scheduler.schedule(self.queue, self.running, self.current_time, sorted=False)
+8 −2
Original line number Diff line number Diff line
@@ -4,7 +4,7 @@ def job_dict(nodes_required, name, account, \
             cpu_trace, gpu_trace, ntx_trace, nrx_trace, \
             end_state, scheduled_nodes, job_id, priority=0, partition=0,
             submit_time=0, time_limit=0, start_time=0, end_time=0,
             wall_time=0, trace_time=0):
             wall_time=0, trace_time=0, trace_start_time=0,trace_end_time=0):
    """ Return job info dictionary """
    return {
        'nodes_required': nodes_required,
@@ -25,7 +25,9 @@ def job_dict(nodes_required, name, account, \
        'start_time': start_time,
        'end_time': end_time,
        'wall_time': wall_time,
        'trace_time': trace_time
        'trace_time': trace_time,
        'trace_start_time': trace_start_time,
        'trace_end_time': trace_end_time
    }


@@ -65,6 +67,8 @@ class Job:
        self.end_time = None      # Actual end time when executing or from telemetry
        self.wall_time = None     # end_time - start_time
        self.trace_time = None    # Time period for which traces are available
        self.trace_start_time = None  # Relative start time of the trace (to running time)
        self.trace_end_time = None    # Relative end time of the trace
        self.running_time = 0     # Current running time updated when simulating

        # If a job dict was given, override the values from the job_dict:
@@ -87,6 +91,8 @@ class Job:
                f"start_time={self.start_time}, end_time={self.end_time}, "
                f"wall_time={self.wall_time}, "
                f"trace_time={self.trace_time}, "
                f"trace_start_time={self.trace_start_time}, "
                f"trace_end_time={self.trace_end_time}, "
                f"running_time={self.running_time}, state={self._state}, "
                f"scheduled_nodes={self.scheduled_nodes}, power={self.power}, "
                f"power_history={self.power_history})")
+16 −7
Original line number Diff line number Diff line
@@ -85,7 +85,8 @@ class Workload:

            jobs.append(job_dict(nodes_required, name, account, cpu_trace, gpu_trace, net_tx, net_rx, \
                        end_state, None, job_index, priority, partition,
                        time_to_next_job, time_limit, time_to_next_job, time_to_next_job + wall_time, wall_time, wall_time))
                        time_to_next_job, time_limit, time_to_next_job, time_to_next_job + wall_time, wall_time,
                        wall_time, 0, wall_time))

        return jobs

@@ -128,7 +129,9 @@ class Workload:
                0,                               # Start time / or None
                len(gpu_trace) * config['TRACE_QUANTA'],  # End time / or None
                len(gpu_trace) * config['TRACE_QUANTA'],  # Wall time
                len(gpu_trace) * config['TRACE_QUANTA']   # Trace time
                len(gpu_trace) * config['TRACE_QUANTA'],  # Trace time
                0,                                        # Trace start time
                len(gpu_trace) * config['TRACE_QUANTA']   # Trace end time
            )
            print(job_info)
            jobs.append(job_info)  # Add job to the list
@@ -170,7 +173,9 @@ class Workload:
                0,                               # Start time / or None
                len(gpu_trace) * config['TRACE_QUANTA'],  # End time / or None
                len(gpu_trace) * config['TRACE_QUANTA'],  # Wall time
                len(gpu_trace) * config['TRACE_QUANTA']   # Trace time
                len(gpu_trace) * config['TRACE_QUANTA'],  # Trace time
                0,                                        # Trace start time
                len(gpu_trace) * config['TRACE_QUANTA']   # Trace end time
            )
            jobs.append(job_info)  # Add job to the list

@@ -199,7 +204,8 @@ class Workload:
                f"Max Test {partition}", account, cpu_trace, gpu_trace, net_tx, net_rx,
                'COMPLETED', None, None, 100, partition,
                0, len(gpu_trace) * config['TRACE_QUANTA'] + 1,
                0, 10800, len(gpu_trace) * config['TRACE_QUANTA'], len(gpu_trace) * config['TRACE_QUANTA']
                0, 10800, len(gpu_trace) * config['TRACE_QUANTA'],
                len(gpu_trace) * config['TRACE_QUANTA'], 0, len(gpu_trace) * config['TRACE_QUANTA']
            )
            jobs.append(job_info)

@@ -211,7 +217,8 @@ class Workload:
                f"OpenMxP {partition}", account, cpu_trace, gpu_trace, net_tx, net_rx,
                'COMPLETED', None, None, 100, partition,
                0, len(gpu_trace) * config['TRACE_QUANTA'] + 1,
                10800, 14200, len(gpu_trace) * config['TRACE_QUANTA'], len(gpu_trace) * config['TRACE_QUANTA']
                10800, 14200, len(gpu_trace) * config['TRACE_QUANTA'],
                len(gpu_trace) * config['TRACE_QUANTA'], 0, len(gpu_trace) * config['TRACE_QUANTA']
            )
            jobs.append(job_info)

@@ -223,7 +230,8 @@ class Workload:
                f"HPL {partition}", account, cpu_trace, gpu_trace, net_tx, net_rx,
                'COMPLETED', None, None, 100, partition,
                0, len(gpu_trace) * config['TRACE_QUANTA'] + 1,
                14200, 17800, len(gpu_trace) * config['TRACE_QUANTA'], len(gpu_trace) * config['TRACE_QUANTA']
                14200, 17800, len(gpu_trace) * config['TRACE_QUANTA'],
                len(gpu_trace) * config['TRACE_QUANTA'], 0, len(gpu_trace) * config['TRACE_QUANTA']
            )
            jobs.append(job_info)

@@ -235,7 +243,8 @@ class Workload:
                f"Idle Test {partition}", account, cpu_trace, gpu_trace, net_tx, net_rx,
                'COMPLETED', None, None, 100, partition,
                0, len(gpu_trace) * config['TRACE_QUANTA'] + 1,
                17800, 21400, len(gpu_trace) * config['TRACE_QUANTA'], len(gpu_trace) * config['TRACE_QUANTA']
                17800, 21400, len(gpu_trace) * config['TRACE_QUANTA'],
                len(gpu_trace) * config['TRACE_QUANTA'], 0, len(gpu_trace) * config['TRACE_QUANTA']
            )
            jobs.append(job_info)