Commit 3338f6eb authored by Maiterth, Matthias's avatar Maiterth, Matthias
Browse files

Upate lassen dataloader for new engine/scheduler loop

Next: fix npz files.
parent a35d1f28
Loading
Loading
Loading
Loading
+2 −1
Original line number Diff line number Diff line
@@ -13,6 +13,7 @@
    "DOWN_NODES": [],
    "CPUS_PER_NODE": 2,
    "CORES_PER_CPU": 22,
    "CPU_FREQUENCY": 2400000000,
    "GPUS_PER_NODE": 4,
    "CPU_PEAK_FLOPS": 396.8E9,
    "GPU_PEAK_FLOPS": 7.8E12,
+76 −62
Original line number Diff line number Diff line
@@ -26,24 +26,21 @@ Usage Instructions:
    python main.py -f /path/to/LAST/Lassen-Supercomputer-Job-Dataset --system lassen -ff 37d -t 1d
"""
import math
import numpy as np
import os
import uuid
import numpy as np
import pandas as pd
from tqdm import tqdm

try:
from ..job import job_dict
from ..utils import power_to_utilization, next_arrival

except:
    pass


def load_data(path, **kwargs):
    """
    Loads data from the given file paths and returns job info.
    """
    nrows = 1E4
    nrows = 1E5
    alloc_df = pd.read_csv(os.path.join(path[0], 'final_csm_allocation_history_hashed.csv'), nrows=nrows)
    node_df = pd.read_csv(os.path.join(path[0], 'final_csm_allocation_node_history.csv'), nrows=nrows)
    step_df = pd.read_csv(os.path.join(path[0], 'final_csm_step_history.csv'), nrows=nrows)
@@ -56,25 +53,30 @@ def load_data_from_df(allocation_df, node_df, step_df, **kwargs):
    """
    config = kwargs.get('config')
    jid = kwargs.get('jid', '*')
    validate = kwargs.get('validate')
    arrival = kwargs.get('arrival')
    fastforward = kwargs.get('fastforward')
    verbose = kwargs.get('verbose')
    min_time = kwargs.get('min_time', None)

    if fastforward:
        print(f"fast-forwarding {fastforward} seconds")

    allocation_df['job_submit_time'] = pd.to_datetime(allocation_df['job_submit_time'], format='mixed', errors='coerce')
    allocation_df['begin_time'] = pd.to_datetime(allocation_df['begin_time'], format='mixed', errors='coerce')
    allocation_df['end_time'] = pd.to_datetime(allocation_df['end_time'], format='mixed', errors='coerce')

    if not min_time:
        min_time = pd.to_datetime(allocation_df['begin_time']).min()
    telemetry_start_timestamp = allocation_df['begin_time'].min()
    telemetry_start_time = 0
    telemetry_end_timestamp = allocation_df['end_time'].max()
    diff = telemetry_end_timestamp - telemetry_start_timestamp
    telemetry_end_time = int(math.ceil(diff.total_seconds()))

    job_list = []

    for _, row in tqdm(allocation_df.iterrows(), total=len(allocation_df), desc="Processing Jobs"):

        account = row['hashed_user_id']
        job_id = row['primary_job_id']
        allocation_id = row['allocation_id']
        nodes_required = row['num_nodes']
        end_state = row['exit_status']
        name = str(uuid.uuid4())[:6]  # This generates a random 6 char identifier....

        if not jid == '*':
            if int(jid) == int(job_id):
@@ -84,55 +86,57 @@ def load_data_from_df(allocation_df, node_df, step_df, **kwargs):

        node_data = node_df[node_df['allocation_id'] == row['allocation_id']]

        nodes_required = row['num_nodes']

        wall_time = compute_wall_time(row['begin_time'], row['end_time'])
        samples = math.ceil(wall_time / config['TRACE_QUANTA'])

        if validate:
            # Validate should represent the node power and not split it according to cpu and gpu.
            # Not sure if this is correct.
            cpu_power = (node_data['energy'].sum() / nodes_required) / wall_time
            cpu_trace = cpu_power
            gpu_trace = 0  # = cpu_trace  # Is this correct?
        else:
            # Compute GPU power
        gpu_energy = node_data['gpu_energy'].sum()  # Joules
        # divide by nodes_required to get average gpu_usage per node
        gpu_usage = node_data['gpu_usage'].sum() / 1E6 / nodes_required  # seconds
        gpu_power = gpu_energy / gpu_usage if gpu_usage > 0 else 0
        #gpu_power = gpu_energy / wall_time
        gpu_power_array = np.array([gpu_power] * samples)

        gpu_min_power = nodes_required * config['POWER_GPU_IDLE']
        gpu_max_power = nodes_required * config['POWER_GPU_MAX']
        gpu_util = power_to_utilization(gpu_power_array, gpu_min_power, gpu_max_power)
        # GPU power can be 0:
        # Utilization is defined in the range of [0 to GPUS_PER_NODE].
        # gpu_util will be negative if power reports 0, which is smaller than POWER_GPU_IDLE
        # Therefore: gpu_util should be set to zero if it is smaller than 0.
        gpu_trace = np.maximum(0, gpu_util)
            gpu_power = (node_data['gpu_energy'].sum() / nodes_required) / wall_time
            gpu_min_power = config['POWER_GPU_IDLE']
            gpu_max_power = config['POWER_GPU_MAX']
            gpu_util = power_to_utilization(gpu_power,gpu_min_power,gpu_max_power)
            gpu_trace = gpu_util

            # Compute CPU power from CPU usage time
            # CPU usage is reported per core, while we need it in the range [0 to CPUS_PER_NODE]
        cpu_usage = node_data['cpu_usage'].sum() / 1E9 / nodes_required / config['CORES_PER_CPU'] # seconds
        cpu_usage_array = np.array([cpu_usage] * samples)
        cpu_util = cpu_usage_array / wall_time
        cpu_trace = cpu_util  # * CPUS_PER_NODE
            cpu_util = node_data['cpu_usage'].sum() / nodes_required / wall_time / config['CPU_FREQUENCY'] / config['CORES_PER_CPU']
            cpu_trace = cpu_util
            # TODO use total energy for validation
            # Only Node Energy and GPU Energy is reported!
            # total_energy = node_data['energy'].sum() # Joules

        # Network utilization - since values are given in octets / quarter of a byte, multiply by 4 to get bytes
        ib_tx = 4 * node_data['ib_tx'].values[0] if node_data['ib_tx'].values.size > 0 else []
        ib_rx = 4 * node_data['ib_rx'].values[0] if node_data['ib_rx'].values.size > 0 else []
        ib_tx = 4 * node_data['ib_tx'].sum() if node_data['ib_tx'].values.size > 0 else []
        ib_rx = 4 * node_data['ib_rx'].sum() if node_data['ib_rx'].values.size > 0 else []

        net_tx, net_rx = generate_network_sequences(ib_tx, ib_rx, samples, lambda_poisson=0.3)

        # no priorities defined!
        priority = row.get('priority', 0)
        partition = row.get('partition', "0")

        if arrival == 'poisson':  # Modify the submit times according to Poisson process
            scheduled_nodes = None
            time_submit = next_arrival(1/config['JOB_ARRIVAL_TIME'])
            time_start = None  # Scheduler will determine start time
            submit_time = next_arrival(1 / config['JOB_ARRIVAL_TIME'])
            start_time = None  # Scheduler will determine start time
            end_time = None  # Scheduler will determine end time
        else:  # Prescribed replay
            scheduled_nodes = get_scheduled_nodes(row['allocation_id'], node_df)
            time_submit = compute_time_offset(row['job_submit_time'], min_time)
            time_start = compute_time_offset(row['begin_time'], min_time)
            if fastforward:
                time_submit -= fastforward
                time_start -= fastforward
            submit_time = compute_time_offset(row['job_submit_time'], telemetry_start_timestamp)
            start_time = compute_time_offset(row['begin_time'], telemetry_start_timestamp)
            end_time = compute_time_offset(row['end_time'], telemetry_start_timestamp)
            time_limit = row['time_limit']

            trace_time = wall_time
            trace_start_time = start_time
            trace_end_time = end_time
            trace_missing_values = False

        if verbose:
            print('ib_tx, ib_rx, samples:', ib_tx, ib_rx, samples)
@@ -140,22 +144,32 @@ def load_data_from_df(allocation_df, node_df, step_df, **kwargs):
            print('rx:', net_rx)
            print('scheduled_nodes:', nodes_required, scheduled_nodes)

        if time_submit >= 0:

        if wall_time >= 0:
            job_info = job_dict(nodes_required,
                                row['hashed_user_id'],
                                row['hashed_user_group_id'],
                                cpu_trace, gpu_trace, net_tx, net_rx, wall_time,
                                row['exit_status'],
                                name,
                                account,
                                cpu_trace,
                                gpu_trace,
                                net_tx,
                                net_rx,
                                end_state,
                                scheduled_nodes,
                                time_submit,
                                job_id,
                                row.get('priority', 0),
                                time_start)
                                priority,
                                partition,
                                submit_time=submit_time,
                                time_limit=time_limit,
                                start_time=start_time,
                                end_time=end_time,
                                wall_time=wall_time,
                                trace_time=trace_time,
                                trace_start_time=trace_start_time,
                                trace_end_time=trace_end_time,
                                trace_missing_values=trace_missing_values)

            job_list.append(job_info)

    return job_list
    return job_list, telemetry_start_time, telemetry_end_time


def get_scheduled_nodes(allocation_id, node_df):