Commit 73d13113 authored by Maiterth, Matthias's avatar Maiterth, Matthias
Browse files

Changes to lassen loader.

Check with Wes and Tapasya, if this makes sense.
Some values seem odd, either GPU utilization reported as zero, or
factors are off!
We have to verify the units for each column.
parent 582f25d0
Loading
Loading
Loading
Loading
+29 −23
Original line number Diff line number Diff line
@@ -40,16 +40,18 @@ load_config_variables(['TRACE_QUANTA', 'CPUS_PER_NODE', 'GPUS_PER_NODE',
                       'POWER_NVME', 'POWER_CDU', 'POWER_SWITCH', 'CORES_PER_CPU',
                       'NICS_PER_NODE'], globals())


def load_data(path, **kwargs):
    """
    Loads data from the given file paths and returns job info.
    """
    nrows = None # 1E4
    nrows = 1E5
    alloc_df = pd.read_csv(os.path.join(path[0], 'final_csm_allocation_history_hashed.csv'), nrows=nrows)
    node_df = pd.read_csv(os.path.join(path[0], 'final_csm_allocation_node_history.csv'), nrows=nrows)
    step_df = pd.read_csv(os.path.join(path[0], 'final_csm_step_history.csv'), nrows=nrows)
    return load_data_from_df(alloc_df, node_df, step_df, **kwargs)


def load_data_from_df(allocation_df, node_df, step_df, **kwargs):
    """
    Loads data from pandas DataFrames and returns the extracted job info.
@@ -58,7 +60,8 @@ def load_data_from_df(allocation_df, node_df, step_df, **kwargs):
    reschedule = kwargs.get('reschedule')
    fastforward = kwargs.get('fastforward')

    if fastforward: print(f"fast-forwarding {fastforward} seconds")
    if fastforward:
        print(f"fast-forwarding {fastforward} seconds")

    allocation_df['begin_time'] = pd.to_datetime(allocation_df['begin_time'], format='mixed', errors='coerce')
    allocation_df['end_time'] = pd.to_datetime(allocation_df['end_time'], format='mixed', errors='coerce')
@@ -91,24 +94,23 @@ def load_data_from_df(allocation_df, node_df, step_df, **kwargs):
        #gpu_power = gpu_energy / wall_time
        gpu_power_array = np.array([gpu_power] * samples)

        #GPU power can be 0:
        gpu_min_power = nodes_required * POWER_GPU_IDLE
        gpu_max_power = nodes_required * POWER_GPU_MAX
        gpu_util = power_to_utilization(gpu_power_array, gpu_min_power, gpu_max_power)

        # Utilization is defined in the range of [0 to 1].
        # GPU power can be 0:
        # Utilization is defined in the range of [0 to GPUS_PER_NODE].
        # gpu_util will be negative if power reports 0, which is smaller than POWER_GPU_IDLE
        # Therefore: gpu_util should be set to zero if it is smaller than 0.
        gpu_trace = np.maximum(0,gpu_util) * GPUS_PER_NODE
        gpu_trace = np.maximum(0,gpu_util)

        # Compute CPU power from GPU usage time
        # Only Node Power and GPU power is reported!
        # Compute CPU power from CPU usage time
        # CPU usage is reported per core, while we need it in the range [0 to CPUS_PER_NODE]
        cpu_usage = node_data['cpu_usage'].sum() / 1E9 / nodes_required / CORES_PER_CPU  # seconds
        cpu_usage_array = np.array([cpu_usage] * samples)
        cpu_util = cpu_usage_array / wall_time
        cpu_trace = cpu_util * CPUS_PER_NODE

        cpu_trace = cpu_util  # * CPUS_PER_NODE
        # TODO use total energy for validation
        # Only Node Energy and GPU Energy is reported!
        # total_energy = node_data['energy'].sum() # Joules

        if reschedule:  # Let the scheduler reschedule the jobs
@@ -117,23 +119,25 @@ def load_data_from_df(allocation_df, node_df, step_df, **kwargs):
        else:
            scheduled_nodes = get_scheduled_nodes(row['allocation_id'], node_df)
            time_offset = compute_time_offset(row['begin_time'], earliest_begin_time)
            if fastforward: time_offset -= fastforward
            if fastforward:
                time_offset -= fastforward

        if time_offset >= 0:

            job_info = job_dict(nodes_required, \
                                row['hashed_user_id'], \
                                cpu_trace, gpu_trace, wall_time, \
                                row['exit_status'], \
                                scheduled_nodes, \
                                time_offset, \
                                job_id, \
            job_info = job_dict(nodes_required,
                                row['hashed_user_id'],
                                cpu_trace, gpu_trace, wall_time,
                                row['exit_status'],
                                scheduled_nodes,
                                time_offset,
                                job_id,
                                row.get('priority', 0))

            job_list.append(job_info)

    return job_list


def get_scheduled_nodes(allocation_id, node_df):
    """
    Gets the list of scheduled nodes for a given allocation.
@@ -144,6 +148,7 @@ def get_scheduled_nodes(allocation_id, node_df):
        return node_list
    return []


def compute_wall_time(begin_time, end_time):
    """
    Computes the wall time for the job.
@@ -151,6 +156,7 @@ def compute_wall_time(begin_time, end_time):
    wall_time = pd.to_datetime(end_time) - pd.to_datetime(begin_time)
    return int(wall_time.total_seconds())


def compute_time_offset(begin_time, reference_time):
    """
    Computes the time offset from a reference time.