Commit cd97807a authored by Maiterth, Matthias's avatar Maiterth, Matthias
Browse files

Upadted fugaku data loader for new engine loop.

Updated engine loop to work with single utilization values again, instead of expecting a list.
parent 60788743
Loading
Loading
Loading
Loading
+66 −17
Original line number Diff line number Diff line
@@ -47,6 +47,8 @@ def load_data_from_df(df, **kwargs):

    Returns:
    list: List of job dictionaries.
    int: Telemetry Start (in seconds 0)
    int: Telemetry End (in seconds)
    """
    encrypt_bool = kwargs.get('encrypt')
    fastforward = kwargs.get('fastforward')
@@ -54,44 +56,84 @@ def load_data_from_df(df, **kwargs):
    validate = kwargs.get('validate')
    jid = kwargs.get('jid', '*')
    config = kwargs.get('config')
    min_time = kwargs.get('min_time', None)

    if fastforward: print(f"fast-forwarding {fastforward} seconds")

    job_list = []

    # Convert all times to datetime and find the min and max thereof for reference use.
    # Convert 'adt' (submit time) to datetime and find the earliest submission time
    df['adt'] = pd.to_datetime(df['adt'], errors='coerce')
    if not min_time:
        min_time = df['adt'].min()
    df['sdt'] = pd.to_datetime(df['sdt'], errors='coerce')
    df['edt'] = pd.to_datetime(df['edt'], errors='coerce')

    # We only have average power therefore we set the earliest telemetry to the earliest start time
    first_start_timestamp = df['sdt'].min()
    last_end_timestamp = df['edt'].max()
    telemetry_start_timestamp = first_start_timestamp
    telemetry_start = 0
    telemetry_end_timestamp = last_end_timestamp
    diff = telemetry_end_timestamp - telemetry_start_timestamp
    telemetry_end = int(diff.total_seconds())

    # Loop through the DataFrame rows to extract job information
    for _, row in tqdm(df.iterrows(), total=len(df), desc="Processing Jobs"):
        nodes_required = row['nnumr'] if 'nnumr' in df.columns else 0
        account = row['usr']
        name = row['jnam'] if 'jnam' in df.columns else 'unknown'
        account = row['usr']

        if validate:
            cpu_trace = row['avgpcon']
            gpu_trace = cpu_trace

        else:
            cpu_trace = row['perf1'] if 'perf1' in df.columns else 0  # Assuming some performance metric as cpu_trace
            # cpu_trace = row['perf1'] if 'perf1' in df.columns else 0  # Assuming some performance metric as cpu_trace
            cpu_trace = row['perf1'] / (row['perf1'] + row['perf6']) if 'perf1' in df.columns else 0  # Total Opts / Total Ops + Idle Ops
            gpu_trace = 0  # Set to 0 as GPU trace is not explicitly provided

        wall_time = row['duration'] if 'duration' in df.columns else 0
        # No network trace

        end_state = row['exit state'] if 'exit state' in df.columns else 'unknown'
        #scheduled_nodes = row['nnuma'] if 'nnuma' in df.columns else 0
        scheduled_nodes = None
        submit_time = row['adt'] if 'adt' in df.columns else min_time
        if arrival == 'poisson':  # Modify the arrival times of according to Poisson distribution
            time_offset = next_arrival(1/config['JOB_ARRIVAL_TIME'])
        else:
            time_offset = (submit_time - min_time).total_seconds()  # Compute time offset in seconds
        scheduled_nodes = None  # Only nodes_required is in the trace

        job_id = row['jid'] if 'jid' in df.columns else 'unknown'
        priority = row['pri'] if 'pri' in df.columns else 0

        submit_timestamp = pd.to_datetime(row['adt']) if 'adt' in df.columns else -1  # Else job was submitted in the past
        diff = submit_timestamp - telemetry_start_timestamp
        submit_time = int(diff.total_seconds())

        time_limit = int(row['elpl']) if 'elpl' in df.columns else 24 * 60 * 60  # in seconds

        start_timestamp = pd.to_datetime(row['sdt']) if 'sdt' in df.columns else 0
        diff = start_timestamp - telemetry_start_timestamp
        start_time = int(diff.total_seconds())

        end_timestamp = pd.to_datetime(row['edt']) if 'edt' in df.columns else 0
        diff = end_timestamp - telemetry_start_timestamp
        end_time = int(diff.total_seconds())

        wall_time = end_time - start_time
        #duration = int(row['duration']) if 'duration' in df.columns else 0  # in seconds Recorded duration and wall_time do not match!
        #if (wall_time != duration):
        #    if abs(wall_time - duration) <= 1:  # offset is often 1
        #        wall_time = min(wall_time,duration)
        #    else:
        #        raise ValueError(f"Duration: {row}")  # Offset can be as large as 15 minutes! Removed.

        # We only have a single average value, set trace times as if we had all.
        trace_time = wall_time
        trace_start_time = start_time
        trace_end_time = end_time
        trace_missing_values = False  # Sane Choice?

        # Should we still have this?
        # if arrival == 'poisson':  # Modify the arrival times of according to Poisson distribution
        #     time_offset = next_arrival(1/config['JOB_ARRIVAL_TIME'])
        # else:
        #     time_offset = (submit_time - min_time).total_seconds()  # Compute time offset in seconds
        # Removed from job_dict: time_offset=time_offset,

        # Create job dictionary
        job_info = job_dict(
            nodes_required=nodes_required,
@@ -101,17 +143,24 @@ def load_data_from_df(df, **kwargs):
            gpu_trace=gpu_trace,
            ntx_trace=[],
            nrx_trace=[],
            wall_time=wall_time,
            end_state=end_state,
            scheduled_nodes=scheduled_nodes,
            time_offset=time_offset,
            job_id=job_id,
            priority=priority
            priority=priority,
            submit_time=submit_time,
            time_limit=time_limit,
            start_time=start_time,
            end_time=end_time,
            wall_time=wall_time,
            trace_time=trace_time,
            trace_start_time=trace_start_time,
            trace_end_time=trace_end_time,
            trace_missing_values=trace_missing_values
        )

        job_list.append(job_info)

    return job_list
    return job_list, telemetry_start, telemetry_end


def node_index_to_name(index: int, config: dict):
+5 −4
Original line number Diff line number Diff line
@@ -166,7 +166,8 @@ class Engine:
                                       {job.running_time} > {job.wall_time}\n\
                                       {len(job.cpu_trace)} vs. {job.running_time // self.config['TRACE_QUANTA']}\
                                    ")
                if job.running_time < job.trace_start_time or job.running_time >= job.trace_end_time:
                # job.running_time < job.trace_start_time or
                if job.running_time >= job.trace_end_time:
                    cpu_util = 0  # No values available therefore we assume IDLE == 0
                    gpu_util = 0
                    net_util = 0
@@ -177,7 +178,7 @@ class Engine:
                        raise Exception("Replay is using IDLE values! Something is wrong!")
                else:
                    time_quanta_index = int((job.running_time - job.trace_start_time) // self.config['TRACE_QUANTA'])
                    if time_quanta_index == len(job.cpu_trace):
                    if isinstance(job.cpu_trace, List) and time_quanta_index == len(job.cpu_trace):
                        # If the running time is past the last time step in the
                        # trace, use the last value in the trace. This can
                        # happen if the last valid timesteps is e.g. 17%15,
@@ -191,7 +192,7 @@ class Engine:
                    gpu_util = get_utilization(job.gpu_trace, time_quanta_index)
                    net_util = 0

                if len(job.ntx_trace) and len(job.nrx_trace):
                if isinstance(job.ntx_trace,List) and len(job.ntx_trace) and isinstance(job.nrx_trace,List) and len(job.nrx_trace):
                    net_tx = get_utilization(job.ntx_trace, time_quanta_index)
                    net_rx = get_utilization(job.nrx_trace, time_quanta_index)
                    net_util = network_utilization(net_tx, net_rx)