Commit d8c397dc authored by Hines, Jesse's avatar Hines, Jesse
Browse files

Add min-time to dataloaders

Needed for the server to be able to use them properly
parent 879b2a49
Loading
Loading
Loading
Loading
+5 −3
Original line number Diff line number Diff line
@@ -54,6 +54,7 @@ def load_data_from_df(df, **kwargs):
    validate = kwargs.get('validate')
    jid = kwargs.get('jid', '*')
    config = kwargs.get('config')
    min_time = kwargs.get('min_time', None)

    if fastforward: print(f"fast-forwarding {fastforward} seconds")

@@ -61,7 +62,8 @@ def load_data_from_df(df, **kwargs):
    
    # Convert 'adt' (submit time) to datetime and find the earliest submission time
    df['adt'] = pd.to_datetime(df['adt'], errors='coerce')
    earliest_submit_time = df['adt'].min()
    if not min_time:
        min_time = df['adt'].min()

    # Loop through the DataFrame rows to extract job information
    for _, row in tqdm(df.iterrows(), total=len(df), desc="Processing Jobs"):
@@ -80,11 +82,11 @@ def load_data_from_df(df, **kwargs):
        end_state = row['exit state'] if 'exit state' in df.columns else 'unknown'
        #scheduled_nodes = row['nnuma'] if 'nnuma' in df.columns else 0
        scheduled_nodes = None
        submit_time = row['adt'] if 'adt' in df.columns else earliest_submit_time
        submit_time = row['adt'] if 'adt' in df.columns else min_time
        if reschedule: # Let the scheduler reschedule the jobs
            time_offset = next_arrival(1/config['JOB_ARRIVAL_TIME'])
        else:
            time_offset = (submit_time - earliest_submit_time).total_seconds()  # Compute time offset in seconds
            time_offset = (submit_time - min_time).total_seconds()  # Compute time offset in seconds

        job_id = row['jid'] if 'jid' in df.columns else 'unknown'
        priority = row['pri'] if 'pri' in df.columns else 0
+5 −3
Original line number Diff line number Diff line
@@ -59,6 +59,7 @@ def load_data_from_df(allocation_df, node_df, step_df, **kwargs):
    reschedule = kwargs.get('reschedule')
    fastforward = kwargs.get('fastforward')
    verbose = kwargs.get('verbose')
    min_time = kwargs.get('min_time', None)

    if fastforward:
        print(f"fast-forwarding {fastforward} seconds")
@@ -66,8 +67,9 @@ def load_data_from_df(allocation_df, node_df, step_df, **kwargs):
    allocation_df['begin_time'] = pd.to_datetime(allocation_df['begin_time'], format='mixed', errors='coerce')
    allocation_df['end_time'] = pd.to_datetime(allocation_df['end_time'], format='mixed', errors='coerce')

    earliest_begin_time = pd.to_datetime(allocation_df['begin_time']).min()
    print(earliest_begin_time)
    if not min_time:
        min_time = pd.to_datetime(allocation_df['begin_time']).min()

    job_list = []

    for _, row in tqdm(allocation_df.iterrows(), total=len(allocation_df), desc="Processing Jobs"):
@@ -124,7 +126,7 @@ def load_data_from_df(allocation_df, node_df, step_df, **kwargs):
            time_offset = next_arrival(1/config['JOB_ARRIVAL_TIME'])
        else:
            scheduled_nodes = get_scheduled_nodes(row['allocation_id'], node_df)
            time_offset = compute_time_offset(row['begin_time'], earliest_begin_time)
            time_offset = compute_time_offset(row['begin_time'], min_time)
            if fastforward:
                time_offset -= fastforward