Commit 9a5f9efc authored by Brewer, Wes's avatar Brewer, Wes
Browse files

Add support for processing single job in lassen dataloader

parent f84b0648
Loading
Loading
Loading
Loading
+11 −1
Original line number Diff line number Diff line
@@ -53,8 +53,10 @@ def load_data_from_df(allocation_df, node_df, step_df, **kwargs):
    """
    Loads data from pandas DataFrames and returns the extracted job info.
    """
    jid = kwargs.get('jid', '*')
    reschedule = kwargs.get('reschedule')
    fastforward = kwargs.get('fastforward')

    if fastforward: print(f"fast-forwarding {fastforward} seconds")

    allocation_df['begin_time'] = pd.to_datetime(allocation_df['begin_time'], format='mixed', errors='coerce')
@@ -65,6 +67,14 @@ def load_data_from_df(allocation_df, node_df, step_df, **kwargs):
    job_list = []

    for _, row in tqdm(allocation_df.iterrows(), total=len(allocation_df), desc="Processing Jobs"):
        job_id = row['primary_job_id']

        if not jid == '*':
            if int(jid) == int(job_id):
                print(f'Extracting {job_id} profile')
            else:
                continue

        node_data = node_df[node_df['allocation_id'] == row['allocation_id']]

        nodes_required = row['num_nodes']
@@ -116,7 +126,7 @@ def load_data_from_df(allocation_df, node_df, step_df, **kwargs):
                                row['exit_status'], \
                                scheduled_nodes, \
                                time_offset, \
                                row['primary_job_id'], \
                                job_id, \
                                row.get('priority', 0))

            job_list.append(job_info)