Commit 57f5fb5f authored by Brewer, Wes's avatar Brewer, Wes
Browse files

Remove globals() from fugaku, lassen, and marconi100 dataloaders

parent dc485350
Loading
Loading
Loading
Loading
+3 −2
Original line number Diff line number Diff line
@@ -10,7 +10,7 @@
    The --reschedule will compute submit times from Poisson distribution, instead of using
    the submit times given in F-Data.

    python main.py --system fugaku -f /path/to/21_04.parquet --reschedule --validate --reschedule
    python main.py --system fugaku -f /path/to/21_04.parquet --reschedule --validate

"""
import pandas as pd
@@ -53,6 +53,7 @@ def load_data_from_df(df, **kwargs):
    reschedule = kwargs.get('reschedule')
    validate = kwargs.get('validate')
    jid = kwargs.get('jid', '*')
    config = kwargs.get('config')

    if fastforward: print(f"fast-forwarding {fastforward} seconds")

@@ -81,7 +82,7 @@ def load_data_from_df(df, **kwargs):
        scheduled_nodes = None
        submit_time = row['adt'] if 'adt' in df.columns else earliest_submit_time
        if reschedule: # Let the scheduler reschedule the jobs
            time_offset = next_arrival(1/JOB_ARRIVAL_TIME)
            time_offset = next_arrival(1/config['JOB_ARRIVAL_TIME'])
        else:
            time_offset = (submit_time - earliest_submit_time).total_seconds()  # Compute time offset in seconds

+6 −7
Original line number Diff line number Diff line
@@ -16,7 +16,7 @@ Usage Instructions:
    # to analyze dataset
    python -m raps.telemetry -f /path/to/LAST/Lassen-Supercomputer-Job-Dataset --system lassen -v

    # to simulate the dataset
    # to simulate the dataset as submitted
    python main.py -f /path/to/LAST/Lassen-Supercomputer-Job-Dataset --system lassen

    # to reschedule
@@ -55,7 +55,6 @@ def load_data_from_df(allocation_df, node_df, step_df, **kwargs):
    Loads data from pandas DataFrames and returns the extracted job info.
    """
    config = kwargs.get('config')
    globals().update(config)
    jid = kwargs.get('jid', '*')
    reschedule = kwargs.get('reschedule')
    fastforward = kwargs.get('fastforward')
@@ -85,7 +84,7 @@ def load_data_from_df(allocation_df, node_df, step_df, **kwargs):
        nodes_required = row['num_nodes']

        wall_time = compute_wall_time(row['begin_time'], row['end_time'])
        samples = math.ceil(wall_time / TRACE_QUANTA)
        samples = math.ceil(wall_time / config['TRACE_QUANTA'])

        # Compute GPU power
        gpu_energy = node_data['gpu_energy'].sum()  # Joules
@@ -95,8 +94,8 @@ def load_data_from_df(allocation_df, node_df, step_df, **kwargs):
        #gpu_power = gpu_energy / wall_time
        gpu_power_array = np.array([gpu_power] * samples)

        gpu_min_power = nodes_required * POWER_GPU_IDLE
        gpu_max_power = nodes_required * POWER_GPU_MAX
        gpu_min_power = nodes_required * config['POWER_GPU_IDLE']
        gpu_max_power = nodes_required * config['POWER_GPU_MAX']
        gpu_util = power_to_utilization(gpu_power_array, gpu_min_power, gpu_max_power)
        # GPU power can be 0:
        # Utilization is defined in the range of [0 to GPUS_PER_NODE].
@@ -106,7 +105,7 @@ def load_data_from_df(allocation_df, node_df, step_df, **kwargs):

        # Compute CPU power from CPU usage time
        # CPU usage is reported per core, while we need it in the range [0 to CPUS_PER_NODE]
        cpu_usage = node_data['cpu_usage'].sum() / 1E9 / nodes_required / CORES_PER_CPU  # seconds
        cpu_usage = node_data['cpu_usage'].sum() / 1E9 / nodes_required / config['CORES_PER_CPU'] # seconds
        cpu_usage_array = np.array([cpu_usage] * samples)
        cpu_util = cpu_usage_array / wall_time
        cpu_trace = cpu_util  # * CPUS_PER_NODE
@@ -122,7 +121,7 @@ def load_data_from_df(allocation_df, node_df, step_df, **kwargs):

        if reschedule:  # Let the scheduler reschedule the jobs
            scheduled_nodes = None
            time_offset = next_arrival(1/JOB_ARRIVAL_TIME)
            time_offset = next_arrival(1/config['JOB_ARRIVAL_TIME'])
        else:
            scheduled_nodes = get_scheduled_nodes(row['allocation_id'], node_df)
            time_offset = compute_time_offset(row['begin_time'], earliest_begin_time)
+11 −12
Original line number Diff line number Diff line
@@ -57,7 +57,6 @@ def load_data_from_df(jobs_df: pd.DataFrame, **kwargs):
        The list of parsed jobs.
    """
    config = kwargs.get('config')
    globals().update(config)
    min_time = kwargs.get('min_time', None)
    reschedule = kwargs.get('reschedule')
    fastforward = kwargs.get('fastforward')
@@ -104,10 +103,10 @@ def load_data_from_df(jobs_df: pd.DataFrame, **kwargs):
        else:                
            cpu_power = jobs_df.loc[jidx, 'cpu_power_consumption']
            cpu_power_array = cpu_power.tolist()
            cpu_min_power = nodes_required * POWER_CPU_IDLE * CPUS_PER_NODE
            cpu_max_power = nodes_required * POWER_CPU_MAX * CPUS_PER_NODE
            cpu_min_power = nodes_required * config['POWER_CPU_IDLE'] * config['CPUS_PER_NODE']
            cpu_max_power = nodes_required * config['POWER_CPU_MAX'] * config['CPUS_PER_NODE']
            cpu_util = power_to_utilization(cpu_power_array, cpu_min_power, cpu_max_power)
            cpu_trace = cpu_util * CPUS_PER_NODE
            cpu_trace = cpu_util * config['CPUS_PER_NODE']
                
            node_power = (jobs_df.loc[jidx, 'node_power_consumption']).tolist()
            mem_power = (jobs_df.loc[jidx, 'mem_power_consumption']).tolist()
@@ -119,18 +118,18 @@ def load_data_from_df(jobs_df: pd.DataFrame, **kwargs):
            mem_power = mem_power[:min_length]
                
            gpu_power = (node_power - cpu_power - mem_power
                - ([nodes_required * NICS_PER_NODE * POWER_NIC] * len(node_power))
                - ([nodes_required * POWER_NVME] * len(node_power)))
                - ([nodes_required * config['NICS_PER_NODE'] * config['POWER_NIC']] * len(node_power))
                - ([nodes_required * config['POWER_NVME']] * len(node_power)))
            gpu_power_array = gpu_power.tolist()
            gpu_min_power = nodes_required * POWER_GPU_IDLE * GPUS_PER_NODE
            gpu_max_power = nodes_required * POWER_GPU_MAX * GPUS_PER_NODE
            gpu_min_power = nodes_required * config['POWER_GPU_IDLE'] * config['GPUS_PER_NODE']
            gpu_max_power = nodes_required * config['POWER_GPU_MAX'] * config['GPUS_PER_NODE']
            gpu_util = power_to_utilization(gpu_power_array, gpu_min_power, gpu_max_power)
            gpu_trace = gpu_util * GPUS_PER_NODE
            gpu_trace = gpu_util * config['GPUS_PER_NODE']
            
        priority = int(jobs_df.loc[jidx, 'priority'])
            
        # wall_time = jobs_df.loc[i, 'run_time']
        wall_time = gpu_trace.size * TRACE_QUANTA # seconds
        wall_time = gpu_trace.size * config['TRACE_QUANTA'] # seconds
        end_state = jobs_df.loc[jidx, 'job_state']
        time_start = jobs_df.loc[jidx+1, 'start_time']
        diff = time_start - time_zero
@@ -139,13 +138,13 @@ def load_data_from_df(jobs_df: pd.DataFrame, **kwargs):
            time_offset = max(diff.total_seconds(), 0)
        else:
            # When extracting out a single job, run one iteration past the end of the job
            time_offset = UI_UPDATE_FREQ
            time_offset = config['UI_UPDATE_FREQ']

        if fastforward: time_offset -= fastforward

        if reschedule: # Let the scheduler reschedule the jobs
            scheduled_nodes = None
            time_offset = next_arrival(1/JOB_ARRIVAL_TIME)
            time_offset = next_arrival(1/config['JOB_ARRIVAL_TIME'])
        else: # Prescribed replay
            scheduled_nodes = (jobs_df.loc[jidx, 'nodes']).tolist()