Commit 7dccd9b1 authored by Brewer, Wes's avatar Brewer, Wes
Browse files

Lassen dataloader seems to be working now

parent a61c6226
Loading
Loading
Loading
Loading
+30 −28
Original line number Diff line number Diff line
@@ -4,17 +4,14 @@ import os
import pandas as pd
from ..config import load_config_variables
from ..job import job_dict
from ..utils import power_to_utilization
from ..utils import power_to_utilization, next_arrival
from tqdm import tqdm
import time

load_config_variables(['TRACE_QUANTA', 
                       'CPUS_PER_NODE', 
                       'GPUS_PER_NODE', 
                       'POWER_GPU_IDLE',
                       'POWER_GPU_MAX',
                       'POWER_CPU_IDLE',
                       'POWER_CPU_MAX'], globals())
load_config_variables(['TRACE_QUANTA', 'CPUS_PER_NODE', 'GPUS_PER_NODE', 
                       'POWER_GPU_IDLE', 'POWER_GPU_MAX', 'POWER_CPU_IDLE',
                       'POWER_CPU_MAX', 'POWER_MEM', 'POWER_NIC', 'POWER_NVME',
                       'NICS_PER_NODE'], globals())

def load_data(path, **kwargs):
    """
@@ -30,6 +27,8 @@ def load_data_from_df(allocation_df, node_df, step_df, **kwargs):
    """
    Loads data from pandas DataFrames and returns the extracted job info.
    """
    reschedule = kwargs.get('reschedule')

    allocation_df['begin_time'] = pd.to_datetime(allocation_df['begin_time'], format='mixed', errors='coerce')
    allocation_df['end_time'] = pd.to_datetime(allocation_df['end_time'], format='mixed', errors='coerce')

@@ -37,57 +36,60 @@ def load_data_from_df(allocation_df, node_df, step_df, **kwargs):
    job_list = []

    for _, row in tqdm(allocation_df.iterrows(), total=len(allocation_df), desc="Processing Jobs"):
        start_row = time.time()  # Start timing each row
        node_data = node_df[node_df['allocation_id'] == row['allocation_id']]
        nodes_required = row['num_nodes']

        wall_time_start = time.time()  # Timing wall_time calculation
        wall_time = compute_wall_time(row['begin_time'], row['end_time'])
        #print(f"Wall time calculation took: {time.time() - wall_time_start} seconds")
        samples = math.ceil(wall_time / TRACE_QUANTA)

        # Compute GPU power
        gpu_start = time.time()  # Timing GPU power calculation
        gpu_energy = node_data['gpu_energy'].sum() # Joules
        gpu_usage = node_data['gpu_usage'].sum()   # microseconds
        gpu_power = gpu_energy / (gpu_usage / 1E6) / nodes_required if gpu_usage > 0 else 0
        # divide by nodes_required to get average gpu_usage per node
        #gpu_usage = node_data['gpu_usage'].sum() / 1E6 / nodes_required # seconds
        #gpu_power = gpu_energy / gpu_usage if gpu_usage > 0 else 0
        gpu_power = gpu_energy / wall_time
        if gpu_power == 0:
            continue
        else:
            gpu_power_array = np.array([gpu_power] * samples)

        gpu_min_power = nodes_required * POWER_GPU_IDLE * GPUS_PER_NODE
        gpu_max_power = nodes_required * POWER_GPU_MAX * GPUS_PER_NODE
        print(gpu_power_array)
        print(gpu_min_power, gpu_max_power)
        gpu_min_power = nodes_required * POWER_GPU_IDLE
        gpu_max_power = nodes_required * POWER_GPU_MAX
        gpu_util = power_to_utilization(gpu_power_array, gpu_min_power, gpu_max_power)
        gpu_trace = gpu_util * GPUS_PER_NODE

        # Compute CPU power (assuming total energy minus gpu_energy is cpu_energy)
        cpu_start = time.time()  # Timing CPU power calculation
        total_energy = node_data['energy'].sum() # Joules
        cpu_usage = node_data['cpu_usage'].sum() # nanoseconds
        # divide by nodes_required to get average cpu_usage per node
        #cpu_usage = node_data['cpu_usage'].sum() / 1E9 / nodes_required # seconds
        cpu_energy = total_energy - gpu_energy 
        cpu_power = cpu_energy / (cpu_usage / 1E9) / nodes_required if cpu_usage > 0 else 0

        #cpu_power = cpu_energy / cpu_usage if cpu_usage > 0 else 0
        cpu_power = cpu_energy / wall_time 
        cpu_power -= nodes_required * (POWER_MEM + NICS_PER_NODE * POWER_NIC + POWER_NVME)
        cpu_power_array = np.array([cpu_power] * samples)

        cpu_min_power = nodes_required * POWER_CPU_IDLE * CPUS_PER_NODE
        cpu_max_power = nodes_required * POWER_CPU_MAX * CPUS_PER_NODE
        cpu_min_power = nodes_required * POWER_CPU_IDLE
        cpu_max_power = nodes_required * POWER_CPU_MAX
        cpu_util = power_to_utilization(cpu_power_array, cpu_min_power, cpu_max_power)
        cpu_trace = cpu_util * CPUS_PER_NODE

        #print(f"CPU power calculation took: {time.time() - cpu_start} seconds")
        if reschedule: # Let the scheduler reschedule the jobs
            scheduled_nodes = None
            time_offset = next_arrival()
        else:
            scheduled_nodes = get_scheduled_nodes(row['allocation_id'], node_df)
            time_offset = compute_time_offset(row['begin_time'], earliest_begin_time)

        job_info = job_dict(nodes_required, \
                            row['primary_job_id'], \
                            cpu_trace, gpu_trace, wall_time, \
                            row['exit_status'], \
                            get_scheduled_nodes(row['allocation_id'], node_df), \
                            compute_time_offset(row['begin_time'], earliest_begin_time), \
                            scheduled_nodes, \
                            time_offset, \
                            row['primary_job_id'], \
                            row.get('priority', 0))
        job_list.append(job_info)
        #print(f"Processing this row took: {time.time() - start_row} seconds")

    return job_list