Commit a35d1f28 authored by Maiterth, Matthias's avatar Maiterth, Matthias
Browse files

Updated adastra data loader for the new engine/scheduler loop.

parent cd97807a
Loading
Loading
Loading
Loading
+74 −51
Original line number Diff line number Diff line
@@ -5,16 +5,16 @@


    # to simulate the dataset
    python main.py -f /path/to/AdastaJobsMI250_15days.parquet --system adastra
    python main.py -f /path/to/AdastaJobsMI250_15days.parquet --system adastraMI250

    # to replay with different arrival distribution
    python main.py -f /path/to/AdastaJobsMI250_15days.parquet --system adastra --arrival poisson
    # to replay with different scheduling policy
    python main.py -f /path/to/AdastaJobsMI250_15days.parquet --system adastraMI250  --policy priority --backfill easy

    # to fast-forward 60 days and replay for 1 day
    python main.py -f /path/to/AdastaJobsMI250_15days.parquet --system adastra -ff 60d -t 1d
    python main.py -f /path/to/AdastaJobsMI250_15days.parquet --system adastraMI250 -ff 60d -t 1d

    # to analyze dataset
    python -m raps.telemetry -f /path/to/AdastaJobsMI250_15days.parquet --system adastra -v
    python -m raps.telemetry -f /path/to/AdastaJobsMI250_15days.parquet --system adastraMI250 -v

"""
import uuid
@@ -52,55 +52,53 @@ def load_data_from_df(jobs_df: pd.DataFrame, **kwargs):
    -------
    list
        The list of parsed jobs.
    telemetry_start
    telemetry_end
    """
    count_jobs_notOK = 0
    config = kwargs.get('config')
    min_time = kwargs.get('min_time', None)
    arrival = kwargs.get('arrival')
    fastforward = kwargs.get('fastforward')
    validate = kwargs.get('validate')
    jid = kwargs.get('jid', '*')

    if fastforward: print(f"fast-forwarding {fastforward} seconds")

    # Sort jobs dataframe based on values in time_start column, adjust indices after sorting
    jobs_df = jobs_df.sort_values(by='start_time')
    jobs_df = jobs_df.reset_index(drop=True)

    # Take earliest time as baseline reference
    # We can use the start time of the first job.
    if min_time:
        time_zero = min_time
    else:
        time_zero = jobs_df['start_time'].min()
    # We only have average power, therefore use the first start time as the start time for the telemetry
    telemetry_start_timestamp = jobs_df['start_time'].min()
    telemetry_end_timestamp = jobs_df['end_time'].max()

    telemetry_start_time = 0
    diff = telemetry_end_timestamp - telemetry_start_timestamp
    telemetry_end_time = int(diff.total_seconds())

    num_jobs = len(jobs_df)
    print("time_zero:", time_zero, "num_jobs", num_jobs)
    print("First start time:", telemetry_start_timestamp, "num_jobs", num_jobs)

    jobs = []

    # Map dataframe to job state. Add results to jobs list
    for jidx in tqdm(range(num_jobs - 1), total=num_jobs, desc="Processing Jobs"):

        account = jobs_df.loc[jidx, 'user_id'] # or 'group_id'
        job_id = jobs_df.loc[jidx, 'job_id']

        if not jid == '*':
            if int(jid) == int(job_id):
                print(f'Extracting {job_id} profile')
            else:
                continue
        nodes_required = jobs_df.loc[jidx, 'num_nodes_alloc']

        nodes_required = jobs_df.loc[jidx, 'num_nodes_alloc']
        name = str(uuid.uuid4())[:6]
        wall_time = jobs_df.loc[jidx, 'run_time']
        account = jobs_df.loc[jidx, 'user_id']

        wall_time = int(jobs_df.loc[jidx, 'run_time'])
        if wall_time <= 0:
            print("error wall_time",wall_time)
            continue
        if nodes_required <= 0:
            print("error nodes_required",nodes_required)
            continue
        #wall_time = gpu_trace.size * TRACE_QUANTA # seconds

        if validate:

@@ -117,9 +115,11 @@ def load_data_from_df(jobs_df: pd.DataFrame, **kwargs):
            cpu_min_power = config['POWER_CPU_IDLE'] * config['CPUS_PER_NODE']
            cpu_max_power = config['POWER_CPU_MAX'] * config['CPUS_PER_NODE']


            cpu_util = (cpu_watts/float(config['POWER_CPU_IDLE']) - config['CPUS_PER_NODE']) /  ((float(config['POWER_CPU_MAX']) / float(config['POWER_CPU_IDLE'])) -1.0)    #power_to_utilization(cpu_power_array, cpu_min_power, cpu_max_power)
            cpu_util = (cpu_watts / float(config['POWER_CPU_IDLE']) - config['CPUS_PER_NODE']) \
                     / ((float(config['POWER_CPU_MAX']) / float(config['POWER_CPU_IDLE'])) - 1.0)
            # power_to_utilization(cpu_power_array, cpu_min_power, cpu_max_power)
            # print("cpu_watts",cpu_watts,"cpu_util",cpu_util)

            cpu_trace = np.maximum(0, cpu_util)

            node_power = (jobs_df.loc[jidx, 'node_power_consumption']).tolist()
@@ -131,31 +131,19 @@ def load_data_from_df(jobs_df: pd.DataFrame, **kwargs):
            cpu_power = cpu_power[:min_length]
            mem_power = mem_power[:min_length]

            gpu_power = (node_power - cpu_power - mem_power
            gpu_power = (node_power - cpu_power - mem_power \
                         - ([config['NICS_PER_NODE'] * config['POWER_NIC']]))
            gpu_power_array = gpu_power.tolist()
            gpu_watts = sum(gpu_power_array) / (wall_time * nodes_required)
            gpu_min_power = config['POWER_GPU_IDLE'] * config['GPUS_PER_NODE']
            gpu_max_power = config['POWER_GPU_MAX'] * config['GPUS_PER_NODE']
            gpu_util = (gpu_watts/float(config['POWER_GPU_IDLE']) - config['GPUS_PER_NODE']) /  ((float(config['POWER_GPU_MAX']) / float(config['POWER_GPU_IDLE'])) -1.0)    #power_to_utilization(cpu_power_array, cpu_min_power, cpu_max_power)
            gpu_util = (gpu_watts / float(config['POWER_GPU_IDLE']) - config['GPUS_PER_NODE']) \
                     / ((float(config['POWER_GPU_MAX']) / float(config['POWER_GPU_IDLE'])) - 1.0)
            # power_to_utilization(gpu_power_array, gpu_min_power, gpu_max_power)
            # print("gpu_watts",gpu_watts,"gpu_util",gpu_util)
            gpu_trace = np.maximum(0, gpu_util) #gpu_util * GPUS_PER_NODE

        priority = int(jobs_df.loc[jidx, 'priority'])
            gpu_trace = np.maximum(0, gpu_util)

        end_state = jobs_df.loc[jidx, 'job_state']
        time_start = jobs_df.loc[jidx, 'start_time']
        time_end = jobs_df.loc[jidx, 'end_time']
        diff = time_start - time_zero

        if jid == '*':
            time_offset = max(diff.total_seconds(), 0)
        else:
            # When extracting out a single job, run one iteration past the end of the job
            time_offset = config['UI_UPDATE_FREQ']

        if fastforward:
            time_offset -= fastforward

        if arrival == 'poisson':  # Modify the arrival times of the jobs according to Poisson distribution
            scheduled_nodes = None
@@ -163,15 +151,50 @@ def load_data_from_df(jobs_df: pd.DataFrame, **kwargs):
        else:  # Prescribed replay
            scheduled_nodes = (jobs_df.loc[jidx, 'nodes']).tolist()

        if time_offset >= 0 and wall_time > 0:
            job_info = job_dict(nodes_required, name, account, cpu_trace, gpu_trace, [],[],wall_time,
                                end_state, scheduled_nodes, time_offset, job_id, priority)
        priority = int(jobs_df.loc[jidx, 'priority'])

        submit_timestamp = jobs_df.loc[jidx, 'submit_time']
        diff = submit_timestamp - telemetry_start_timestamp
        submit_time = int(diff.total_seconds())

        time_limit = jobs_df.loc[jidx, 'time_limit']  # in seconds

        start_timestamp = jobs_df.loc[jidx, 'start_time']
        diff = start_timestamp - telemetry_start_timestamp
        start_time = int(diff.total_seconds())

        end_timestamp = jobs_df.loc[jidx,'end_time']
        diff = end_timestamp - telemetry_start_timestamp
        end_time = int(diff.total_seconds())

        if wall_time != end_time - start_time:
            print("wall_time != end_time - start_time")
            print(f"{wall_time} != {end_time - start_time}")
            print(jobs_df[jidx])

        trace_time = wall_time
        trace_start_time = end_time
        trace_end_time = start_time

        if wall_time > 0:
            job_info = job_dict(nodes_required, name, account, cpu_trace, gpu_trace, [],[],
                                end_state, scheduled_nodes, job_id, priority,
                                submit_time=submit_time,
                                time_limit=time_limit,
                                start_time=start_time,
                                end_time=end_time,
                                wall_time=wall_time,
                                trace_time=trace_time,
                                trace_start_time=trace_start_time,
                                trace_end_time=trace_end_time,
                                trace_missing_values=True
                                )
            jobs.append(job_info)
        else:
            count_jobs_notOK += 1

    print("jobs not added: ", count_jobs_notOK)
    return jobs
    return jobs, telemetry_start_time, telemetry_end_time

def xname_to_index(xname: str, config: dict):
    """
+1 −1
Original line number Diff line number Diff line
@@ -174,7 +174,7 @@ def load_data_from_df(jobs_df: pd.DataFrame, jobprofile_df: pd.DataFrame, **kwar
    # Map dataframe to job state. Add results to jobs list
    for jidx in tqdm(range(num_jobs - 1), total=num_jobs, desc="Processing Jobs"):

        user = jobs_df.loc[jidx, 'user']
        # user = jobs_df.loc[jidx, 'user']
        account = jobs_df.loc[jidx, 'account']
        job_id = jobs_df.loc[jidx, 'job_id']
        allocation_id = jobs_df.loc[jidx, 'allocation_id']
+3 −5
Original line number Diff line number Diff line
@@ -10,8 +10,9 @@
    The '--arrival poisson' will compute submit times from Poisson distribution, instead of using
    the submit times given in F-Data.

    python main.py --system fugaku -f /path/to/21_04.parquet --arrival poisson --validate

    python main.py --system fugaku -f /path/to/21_04.parquet
    python main.py --system fugaku -f /path/to/21_04.parquet --validate
    python main.py --system fugaku -f /path/to/21_04.parquet --policy priority --backfill easy
"""
import pandas as pd
from tqdm import tqdm
@@ -51,14 +52,11 @@ def load_data_from_df(df, **kwargs):
    int: Telemetry End (in seconds)
    """
    encrypt_bool = kwargs.get('encrypt')
    fastforward = kwargs.get('fastforward')
    arrival = kwargs.get('arrival')
    validate = kwargs.get('validate')
    jid = kwargs.get('jid', '*')
    config = kwargs.get('config')

    if fastforward: print(f"fast-forwarding {fastforward} seconds")

    job_list = []

    # Convert all times to datetime and find the min and max thereof for reference use.
+3 −9
Original line number Diff line number Diff line
@@ -11,8 +11,9 @@
    # to simulate the dataset
    python main.py -f /path/to/job_table.parquet --system marconi100

    # to replay using modified arrival times
    python main.py -f /path/to/job_table.parquet --system marconi100 --arrival poisson
    # to replay using differnt schedulers
    python main.py -f /path/to/job_table.parquet --system marconi100 --policy fcfs --backfill easy
    python main.py -f /path/to/job_table.parquet --system marconi100 --policy priority --backfill firstfit

    # to fast-forward 60 days and replay for 1 day
    python main.py -f /path/to/job_table.parquet --system marconi100 -ff 60d -t 1d
@@ -73,13 +74,6 @@ def load_data_from_df(jobs_df: pd.DataFrame, **kwargs):
    jobs_df = jobs_df.sort_values(by='start_time')
    jobs_df = jobs_df.reset_index(drop=True)

    # Take earliest time as baseline reference
    # We can use the start time of the first job.
    if min_time:
        time_zero = min_time
    else:
        time_zero = jobs_df['start_time'].min()

    # Dataset has one value from start to finish.
    # Therefore we set telemetry start and end equal to job start and end.
    first_start_timestamp = jobs_df['start_time'].min()
+2 −2
Original line number Diff line number Diff line
@@ -84,13 +84,13 @@ if __name__ == "__main__":

    if args.replay[0].endswith(".npz"):
        print(f"Loading {args.replay[0]}...")
        jobs = td.load_snapshot(args.replay[0])
        jobs,_,_ = td.load_snapshot(args.replay[0])
        if args.arrival == "poisson":
            for job in tqdm(jobs, desc="Updating requested_nodes"):
                job['requested_nodes'] = None
                job['submit_time'] = next_arrival(1 / config['JOB_ARRIVAL_TIME'])
    else:
        jobs = td.load_data(args.replay)
        jobs,_,_ = td.load_data(args.replay)

    timesteps = int(max(job['wall_time'] + job['submit_time'] for job in jobs))