Commit 01ea5634 authored by Brewer, Wes's avatar Brewer, Wes
Browse files

Modify fugaku dataloader so that it works with `-m raps.telemetry` without error

parent 77b3f670
Loading
Loading
Loading
Loading
+9 −2
Original line number Diff line number Diff line
@@ -22,7 +22,8 @@ def load_data(path, **kwargs):

def load_data_from_df(df, **kwargs):
    """
    Processes DataFrame to extract relevant job information.
    Processes DataFrame to extract relevant job information and computes the time offset
    based on the earliest submission time.

    Parameters:
    df (pd.DataFrame): DataFrame containing job information.
@@ -32,6 +33,10 @@ def load_data_from_df(df, **kwargs):
    """
    job_list = []
    
    # Convert 'adt' (submit time) to datetime and find the earliest submission time
    df['adt'] = pd.to_datetime(df['adt'], errors='coerce')
    earliest_submit_time = df['adt'].min()

    # Loop through the DataFrame rows to extract job information
    for _, row in df.iterrows():
        nodes_required = row['nnumr'] if 'nnumr' in df.columns else 0
@@ -41,7 +46,8 @@ def load_data_from_df(df, **kwargs):
        wall_time = row['duration'] if 'duration' in df.columns else 0
        end_state = row['exit state'] if 'exit state' in df.columns else 'unknown'
        scheduled_nodes = row['nnuma'] if 'nnuma' in df.columns else 0
        time_offset = row['adt'] if 'adt' in df.columns else pd.Timestamp(0)  # Submission time
        submit_time = row['adt'] if 'adt' in df.columns else earliest_submit_time
        time_offset = (submit_time - earliest_submit_time).total_seconds()  # Compute time offset in seconds
        job_id = row['jid'] if 'jid' in df.columns else 'unknown'
        priority = row['pri'] if 'pri' in df.columns else 0
        
@@ -63,5 +69,6 @@ def load_data_from_df(df, **kwargs):
    
    return job_list


# Sample usage:
# fugaku_jobs = load_data(['/path/to/21_04.parquet'])