Commit 848f0635 authored by Brewer, Wes's avatar Brewer, Wes
Browse files

Merge branch 'bugfix-negative-util' into 'main'

Lassen updates from Matthias

See merge request !59
parents 0c3bea0c 674320e0
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -12,6 +12,7 @@
    "MISSING_RACKS": [44],
    "DOWN_NODES": [],
    "CPUS_PER_NODE": 2,
    "CORES_PER_CPU": 22,
    "GPUS_PER_NODE": 4,
    "CPU_PEAK_FLOPS": 396.8E9,
    "GPU_PEAK_FLOPS": 7.8E12,
+43 −36
Original line number Diff line number Diff line
@@ -36,19 +36,22 @@ from tqdm import tqdm

load_config_variables(['TRACE_QUANTA', 'CPUS_PER_NODE', 'GPUS_PER_NODE',
                       'POWER_GPU_IDLE', 'POWER_GPU_MAX', 'POWER_CPU_IDLE',
                       'POWER_CPU_MAX', 'POWER_MEM', 'POWER_NIC', 'POWER_NVME',
                       'POWER_CPU_MAX', 'POWER_MEM', 'POWER_NIC',
                       'POWER_NVME', 'POWER_CDU', 'POWER_SWITCH', 'CORES_PER_CPU',
                       'NICS_PER_NODE'], globals())


def load_data(path, **kwargs):
    """
    Loads data from the given file paths and returns job info.
    """
    nrows = 1E4
    nrows = 1E5
    alloc_df = pd.read_csv(os.path.join(path[0], 'final_csm_allocation_history_hashed.csv'), nrows=nrows)
    node_df = pd.read_csv(os.path.join(path[0], 'final_csm_allocation_node_history.csv'), nrows=nrows)
    step_df = pd.read_csv(os.path.join(path[0], 'final_csm_step_history.csv'), nrows=nrows)
    return load_data_from_df(alloc_df, node_df, step_df, **kwargs)


def load_data_from_df(allocation_df, node_df, step_df, **kwargs):
    """
    Loads data from pandas DataFrames and returns the extracted job info.
@@ -57,7 +60,8 @@ def load_data_from_df(allocation_df, node_df, step_df, **kwargs):
    reschedule = kwargs.get('reschedule')
    fastforward = kwargs.get('fastforward')

    if fastforward: print(f"fast-forwarding {fastforward} seconds")
    if fastforward:
        print(f"fast-forwarding {fastforward} seconds")

    allocation_df['begin_time'] = pd.to_datetime(allocation_df['begin_time'], format='mixed', errors='coerce')
    allocation_df['end_time'] = pd.to_datetime(allocation_df['end_time'], format='mixed', errors='coerce')
@@ -93,22 +97,21 @@ def load_data_from_df(allocation_df, node_df, step_df, **kwargs):
        gpu_min_power = nodes_required * POWER_GPU_IDLE
        gpu_max_power = nodes_required * POWER_GPU_MAX
        gpu_util = power_to_utilization(gpu_power_array, gpu_min_power, gpu_max_power)
        gpu_trace = gpu_util * GPUS_PER_NODE

        # Compute CPU power (assuming total energy minus gpu_energy is cpu_energy)
        total_energy = node_data['energy'].sum() # Joules
        cpu_energy = total_energy - gpu_energy 

        cpu_usage = node_data['cpu_usage'].sum() / 1E9 / nodes_required # seconds
        cpu_power = cpu_energy / cpu_usage if cpu_usage > 0 else 0
        #cpu_power = cpu_energy / wall_time 
        cpu_power -= nodes_required * (POWER_MEM + NICS_PER_NODE * POWER_NIC + POWER_NVME)
        cpu_power_array = np.array([cpu_power] * samples)

        cpu_min_power = nodes_required * POWER_CPU_IDLE
        cpu_max_power = nodes_required * POWER_CPU_MAX
        cpu_util = power_to_utilization(cpu_power_array, cpu_min_power, cpu_max_power)
        cpu_trace = cpu_util * CPUS_PER_NODE
        # GPU power can be 0:
        # Utilization is defined in the range of [0 to GPUS_PER_NODE].
        # gpu_util will be negative if power reports 0, which is smaller than POWER_GPU_IDLE
        # Therefore: gpu_util should be set to zero if it is smaller than 0.
        gpu_trace = np.maximum(0, gpu_util)

        # Compute CPU power from CPU usage time
        # CPU usage is reported per core, while we need it in the range [0 to CPUS_PER_NODE]
        cpu_usage = node_data['cpu_usage'].sum() / 1E9 / nodes_required / CORES_PER_CPU  # seconds
        cpu_usage_array = np.array([cpu_usage] * samples)
        cpu_util = cpu_usage_array / wall_time
        cpu_trace = cpu_util  # * CPUS_PER_NODE
        # TODO use total energy for validation
        # Only Node Energy and GPU Energy is reported!
        # total_energy = node_data['energy'].sum() # Joules

        if reschedule:  # Let the scheduler reschedule the jobs
            scheduled_nodes = None
@@ -116,23 +119,25 @@ def load_data_from_df(allocation_df, node_df, step_df, **kwargs):
        else:
            scheduled_nodes = get_scheduled_nodes(row['allocation_id'], node_df)
            time_offset = compute_time_offset(row['begin_time'], earliest_begin_time)
            if fastforward: time_offset -= fastforward
            if fastforward:
                time_offset -= fastforward

        if time_offset >= 0:

            job_info = job_dict(nodes_required, \
                                row['hashed_user_id'], \
                                cpu_trace, gpu_trace, wall_time, \
                                row['exit_status'], \
                                scheduled_nodes, \
                                time_offset, \
                                job_id, \
            job_info = job_dict(nodes_required,
                                row['hashed_user_id'],
                                cpu_trace, gpu_trace, wall_time,
                                row['exit_status'],
                                scheduled_nodes,
                                time_offset,
                                job_id,
                                row.get('priority', 0))

            job_list.append(job_info)

    return job_list


def get_scheduled_nodes(allocation_id, node_df):
    """
    Gets the list of scheduled nodes for a given allocation.
@@ -143,6 +148,7 @@ def get_scheduled_nodes(allocation_id, node_df):
        return node_list
    return []


def compute_wall_time(begin_time, end_time):
    """
    Computes the wall time for the job.
@@ -150,6 +156,7 @@ def compute_wall_time(begin_time, end_time):
    wall_time = pd.to_datetime(end_time) - pd.to_datetime(begin_time)
    return int(wall_time.total_seconds())


def compute_time_offset(begin_time, reference_time):
    """
    Computes the time offset from a reference time.