Commit 24f2507c authored by Maiterth, Matthias's avatar Maiterth, Matthias
Browse files

Added fixes to make sure the incentive strucutre work can proceed.

Fixes are: Missing data in datasets mitigated in
1. dataloader
2. engine

Removed burst factor in lassen as this throws errors.
parent 0e5e2fbd
Loading
Loading
Loading
Loading
+5 −3
Original line number Diff line number Diff line
@@ -232,10 +232,13 @@ def load_data_from_df(jobs_df: pd.DataFrame, jobprofile_df: pd.DataFrame, **kwar
            wall_time = 0

        trace_time = gpu_trace.size * config['TRACE_QUANTA']  # seconds



        trace_start_time = 0
        trace_end_time = trace_time
        if wall_time > trace_time:
            missing_trace_time = wall_time - trace_time
            missing_trace_time = int(wall_time - trace_time)
            if start_time < 0:
                trace_start_time = missing_trace_time
                trace_end_time = wall_time
@@ -243,8 +246,7 @@ def load_data_from_df(jobs_df: pd.DataFrame, jobprofile_df: pd.DataFrame, **kwar
                trace_start_time = 0
                trace_end_time = trace_time
            else:
                print(f"Job: {job_id} {start_time} - {end_time}!")
                raise ValueError("Missing values not at start nor end.")
                print(f"Job: {job_id} {end_state} {start_time} - {end_time},Trace: {trace_start_time} - {trace_end_time} Missing: {missing_trace_time}!")

        xnames = jobs_df.loc[jidx, 'xnames']
        # Don't replay any job with an empty set of xnames
+6 −6
Original line number Diff line number Diff line
@@ -225,12 +225,12 @@ def adjust_bursts(burst_intervals, total, intervals):
    bursts = np.round(bursts).astype(int)
    adjustment = total - np.sum(bursts)

    # Distribute adjustment across non-zero elements to avoid negative values
    if adjustment != 0:
        for i in range(len(bursts)):
            if bursts[i] > 0:
                bursts[i] += adjustment % (2^64-1)
                break  # Apply adjustment only once where it won't cause a negative
    ## Distribute adjustment across non-zero elements to avoid negative values
    #if adjustment != 0:
    #    for i in range(len(bursts)):
    #        if bursts[i] > 0:
    #            bursts[i] += adjustment % (2^64-1)  # This can overflow!
    #            break  # Apply adjustment only once where it won't cause a negative

    return bursts

+25 −25
Original line number Diff line number Diff line
@@ -115,6 +115,10 @@ class Engine:
            job_instance = Job(job_data)
            eligible_jobs_list.append(job_instance)
        self.queue += eligible_jobs_list
        if eligible_jobs_list != []:
            return True
        else:
            return False

    def prepare_timestep(self, replay:bool = True):
        completed_jobs = [job for job in self.running if job.end_time is not None and job.end_time <= self.current_time]
@@ -171,19 +175,8 @@ class Engine:
                                       {job.running_time} > {job.wall_time}\n\
                                       {len(job.cpu_trace)} vs. {job.running_time // self.config['TRACE_QUANTA']}\
                                    ")
                # job.running_time < job.trace_start_time or
                if job.running_time >= job.trace_end_time:
                    cpu_util = 0  # No values available therefore we assume IDLE == 0
                    gpu_util = 0
                    net_util = 0
                    if self.debug:
                        print("No Values in trace, using IDLE.")
                    if self.scheduler.policy == PolicyType.REPLAY and not job.trace_missing_values:
                        print(f"{job.running_time} < {job.trace_start_time} or {job.running_time} > {job.trace_end_time}")
                        raise Exception("Replay is using IDLE values! Something is wrong!")
                else:

                time_quanta_index = int((job.running_time - job.trace_start_time) // self.config['TRACE_QUANTA'])
                    if isinstance(job.cpu_trace, List) and time_quanta_index == len(job.cpu_trace):
                # If the running time is past the last time step in the
                # trace, use the last value in the trace. This can
                # happen if the last valid timesteps is e.g. 17%15,
@@ -192,9 +185,16 @@ class Engine:
                # job ended before.
                # For every other error condition trace_start_ and
                # _end_time are used!
                        time_quanta_index -= 1

                if time_quanta_index < len(job.cpu_trace):
                    cpu_util = get_utilization(job.cpu_trace, time_quanta_index)
                else:
                    cpu_util = get_utilization(job.cpu_trace, len(job.cpu_trace) - 1)

                if time_quanta_index < len(job.gpu_trace):
                    gpu_util = get_utilization(job.gpu_trace, time_quanta_index)
                else:
                    gpu_util = get_utilization(job.gpu_trace, len(job.gpu_trace) - 1)
                net_util = 0

                if isinstance(job.ntx_trace,List) and len(job.ntx_trace) and isinstance(job.nrx_trace,List) and len(job.nrx_trace):
@@ -336,9 +336,9 @@ class Engine:
            completed_jobs, newly_downed_nodes = self.prepare_timestep(replay)

            # 2. Identify eligible jobs and add them to the queue.
            self.add_eligible_jobs_to_queue(jobs)
            has_new_additions = self.add_eligible_jobs_to_queue(jobs)
            # 3. Schedule jobs that are now in the queue.
            self.scheduler.schedule(self.queue, self.running, self.current_time, sorted=False)
            self.scheduler.schedule(self.queue, self.running, self.current_time, sorted=(not has_new_additions))

            # Stop the simulation if no more jobs are running or in the queue or in the job list.
            if autoshutdown and not self.queue and not self.running and not self.replay and not all_jobs and not jobs:
+4 −0
Original line number Diff line number Diff line
python main.py -f ~/data/marconi100/job_table.parquet --system marconi100 -ff 4381000 -t 61000 -o --policy replay
python main.py -f ~/data/marconi100/job_table.parquet --system marconi100 -ff 4381000 -t 61000 -o --policy fcfs
python main.py -f ~/data/marconi100/job_table.parquet --system marconi100 -ff 4381000 -t 61000 -o --policy fcfs --backfill easy
python main.py -f ~/data/marconi100/job_table.parquet --system marconi100 -ff 4381000 -t 61000 -o --policy priority --backfill firstfit