Commit 98b2ea22 authored by Maiterth, Matthias's avatar Maiterth, Matthias
Browse files

Updated lassen dataloader to cut the region of interest to the simulation time.

parent 3338f6eb
Loading
Loading
Loading
Loading
+42 −20
Original line number Diff line number Diff line
@@ -22,8 +22,8 @@ Usage Instructions:
    # to modify the submit times of the telemetry according to Poisson distribution
    python main.py -f /path/to/LAST/Lassen-Supercomputer-Job-Dataset --system lassen --arrival poisson

    # to fast-forward 37 days and replay for 1 day
    python main.py -f /path/to/LAST/Lassen-Supercomputer-Job-Dataset --system lassen -ff 37d -t 1d
    # to fast-forward 365 days and replay for 1 day. This region day has 2250 jobs with 1650 jobs executed.
    python main.py -f /path/to/LAST/Lassen-Supercomputer-Job-Dataset --system lassen -ff 365d -t 1d
"""
import math
import os
@@ -31,19 +31,20 @@ import uuid
import numpy as np
import pandas as pd
from tqdm import tqdm
from datetime import timedelta

from ..job import job_dict
from ..utils import power_to_utilization, next_arrival
from ..utils import power_to_utilization, next_arrival, convert_to_seconds


def load_data(path, **kwargs):
    """
    Loads data from the given file paths and returns job info.
    """
    nrows = 1E5
    alloc_df = pd.read_csv(os.path.join(path[0], 'final_csm_allocation_history_hashed.csv'), nrows=nrows)
    node_df = pd.read_csv(os.path.join(path[0], 'final_csm_allocation_node_history.csv'), nrows=nrows)
    step_df = pd.read_csv(os.path.join(path[0], 'final_csm_step_history.csv'), nrows=nrows)
    nrows = None
    alloc_df = pd.read_csv(os.path.join(path[0], 'final_csm_allocation_history_hashed.csv'), nrows=nrows, low_memory=False)
    node_df = pd.read_csv(os.path.join(path[0], 'final_csm_allocation_node_history.csv'), nrows=nrows, low_memory=False)
    step_df = pd.read_csv(os.path.join(path[0], 'final_csm_step_history.csv'), nrows=nrows, low_memory=False)
    return load_data_from_df(alloc_df, node_df, step_df, **kwargs)


@@ -56,23 +57,44 @@ def load_data_from_df(allocation_df, node_df, step_df, **kwargs):
    validate = kwargs.get('validate')
    arrival = kwargs.get('arrival')
    verbose = kwargs.get('verbose')
    fastforward = kwargs.get('fastforward')  # int in seconds

    allocation_df['job_submit_time'] = pd.to_datetime(allocation_df['job_submit_time'], format='mixed', errors='coerce')
    allocation_df['begin_time'] = pd.to_datetime(allocation_df['begin_time'], format='mixed', errors='coerce')
    allocation_df['end_time'] = pd.to_datetime(allocation_df['end_time'], format='mixed', errors='coerce')
    allocation_df['job_submit_timestamp'] = pd.to_datetime(allocation_df['job_submit_time'], format='mixed', errors='coerce')
    allocation_df['begin_timestamp'] = pd.to_datetime(allocation_df['begin_time'], format='mixed', errors='coerce')
    allocation_df['end_timestamp'] = pd.to_datetime(allocation_df['end_time'], format='mixed', errors='coerce')

    telemetry_start_timestamp = allocation_df['begin_time'].min()
    # Too large dataset! Cut by fastforward and time to simulate!
    if fastforward is None:  # This is in seconds / int?
        fastforward = 0
        fastforward_timedelta = timedelta(seconds=fastforward)  # timedelta
    else:
        fastforward_timedelta = timedelta(seconds=fastforward)  # timedelta
    time_to_simulate = kwargs.get('time')  # int in seconds
    if time_to_simulate is None:  # This is a string!
        time_to_simulate = 31536000  # a year
        time_to_simulate_timedelta = timedelta(seconds=time_to_simulate)  # timedelta
    else:
        time_to_simulate_timedelta = timedelta(seconds=convert_to_seconds(time_to_simulate))  # timedelta

    telemetry_start_timestamp = allocation_df['begin_timestamp'].min()
    telemetry_start_time = 0
    telemetry_end_timestamp = allocation_df['end_time'].max()
    telemetry_end_timestamp = allocation_df['end_timestamp'].max()
    diff = telemetry_end_timestamp - telemetry_start_timestamp
    telemetry_end_time = int(math.ceil(diff.total_seconds()))

    simulation_start_timestamp = telemetry_start_timestamp + fastforward_timedelta
    simulation_end_timestamp = simulation_start_timestamp + time_to_simulate_timedelta

    # As these are >1.4M jobs, filtered to the simulated timestamps before creating the job structs.
    allocation_df = allocation_df[allocation_df['end_timestamp'] >= simulation_start_timestamp]  # Job should not have ended before the simulation time
    allocation_df = allocation_df[allocation_df['job_submit_timestamp'] < simulation_end_timestamp]  # Job has to have been submited before or during the simulaion time

    job_list = []

    for _, row in tqdm(allocation_df.iterrows(), total=len(allocation_df), desc="Processing Jobs"):

        account = row['hashed_user_id']
        job_id = row['primary_job_id']
        job_id = int(row['primary_job_id'])
        allocation_id = row['allocation_id']
        nodes_required = row['num_nodes']
        end_state = row['exit_status']
@@ -86,7 +108,7 @@ def load_data_from_df(allocation_df, node_df, step_df, **kwargs):

        node_data = node_df[node_df['allocation_id'] == row['allocation_id']]

        wall_time = compute_wall_time(row['begin_time'], row['end_time'])
        wall_time = compute_wall_time(row['begin_timestamp'], row['end_timestamp'])
        samples = math.ceil(wall_time / config['TRACE_QUANTA'])

        if validate:
@@ -128,9 +150,9 @@ def load_data_from_df(allocation_df, node_df, step_df, **kwargs):
            end_time = None  # Scheduler will determine end time
        else:  # Prescribed replay
            scheduled_nodes = get_scheduled_nodes(row['allocation_id'], node_df)
            submit_time = compute_time_offset(row['job_submit_time'], telemetry_start_timestamp)
            start_time = compute_time_offset(row['begin_time'], telemetry_start_timestamp)
            end_time = compute_time_offset(row['end_time'], telemetry_start_timestamp)
            submit_time = compute_time_offset(row['job_submit_timestamp'], telemetry_start_timestamp)
            start_time = compute_time_offset(row['begin_timestamp'], telemetry_start_timestamp)
            end_time = compute_time_offset(row['end_timestamp'], telemetry_start_timestamp)
            time_limit = row['time_limit']

            trace_time = wall_time
@@ -208,7 +230,7 @@ def adjust_bursts(burst_intervals, total, intervals):
    if adjustment != 0:
        for i in range(len(bursts)):
            if bursts[i] > 0:
                bursts[i] += adjustment
                bursts[i] += adjustment % (2^64-1)
                break  # Apply adjustment only once where it won't cause a negative

    return bursts