Commit 70c7832b authored by Maiterth, Matthias's avatar Maiterth, Matthias
Browse files

Added a job flag for missing telemetry. And updated marconi data loader, not yet fully tested.

parent dd60f8f1
Loading
Loading
Loading
Loading
+2 −1
Original line number Diff line number Diff line
@@ -260,7 +260,8 @@ def load_data_from_df(jobs_df: pd.DataFrame, jobprofile_df: pd.DataFrame, **kwar

        else:  # Prescribed replay
            scheduled_nodes = []
            priority = 0  # not used for replay
            # priority = 0  # not used for replay
            priority = aging_boost(nodes_required)
            for xname in xnames:
                indices = xname_to_index(xname, config)
                scheduled_nodes.append(indices)
+90 −26
Original line number Diff line number Diff line
@@ -23,6 +23,7 @@
"""
import uuid
import random
import numpy as np
import pandas as pd
from tqdm import tqdm

@@ -60,11 +61,13 @@ def load_data_from_df(jobs_df: pd.DataFrame, **kwargs):
    config = kwargs.get('config')
    min_time = kwargs.get('min_time', None)
    arrival = kwargs.get('arrival')
    fastforward = kwargs.get('fastforward')
    validate = kwargs.get('validate')
    jid = kwargs.get('jid', '*')
    debug = kwargs.get('debug')

    if fastforward: print(f"fast-forwarding {fastforward} seconds")
    #fastforward = kwargs.get('fastforward')
    #if fastforward:
    #    print(f"fast-forwarding {fastforward} seconds")

    # Sort jobs dataframe based on values in time_start column, adjust indices after sorting
    jobs_df = jobs_df.sort_values(by='start_time')
@@ -77,16 +80,36 @@ def load_data_from_df(jobs_df: pd.DataFrame, **kwargs):
    else:
        time_zero = jobs_df['start_time'].min()

    # Dataset has one value from start to finish.
    # Therefore we set telemetry start and end equal to job start and end.
    first_start_timestamp = jobs_df['start_time'].min()
    telemetry_start_timestamp = first_start_timestamp

    last_end_timestamp = jobs_df['end_time'].max()
    telemetry_end_timestamp = last_end_timestamp

    telemetry_start = 0
    diff = telemetry_end_timestamp - telemetry_start_timestamp
    telemetry_end = int(diff.total_seconds())

    num_jobs = len(jobs_df)
    print("time_zero:", time_zero, "num_jobs", num_jobs)

    if debug:
        print("num_jobs:", num_jobs)
        print("telemetry_start:", telemetry_start, "simulation_fin", telemetry_end)
        print("telemetry_start_timestamp:", telemetry_start_timestamp, "telemetry_end_timestamp", telemetry_end_timestamp)
        print("first_start_timestamp:",first_start_timestamp, "last start timestamp:", jobs_df['time_start'].max())

    jobs = []

    # Map dataframe to job state. Add results to jobs list
    for jidx in tqdm(range(num_jobs - 1), total=num_jobs, desc="Processing Jobs"):

        account = jobs_df.loc[jidx, 'user_id'] # or 'group_id'
        account = jobs_df.loc[jidx, 'user_id']  # or 'user_id' ?
        job_id = jobs_df.loc[jidx, 'job_id']
        # allocation_id =
        nodes_required = jobs_df.loc[jidx, 'num_nodes_alloc']
        end_state = jobs_df.loc[jidx, 'job_state']

        if not jid == '*':
            if int(jid) == int(job_id):
@@ -95,7 +118,7 @@ def load_data_from_df(jobs_df: pd.DataFrame, **kwargs):
                continue
        nodes_required = jobs_df.loc[jidx, 'num_nodes_alloc']

        name = str(uuid.uuid4())[:6]
        name = str(uuid.uuid4())[:6]  # This generates a random 6 char identifier....

        if validate:
            cpu_power = jobs_df.loc[jidx, 'node_power_consumption'] / jobs_df.loc[jidx, 'num_nodes_alloc']
@@ -129,25 +152,57 @@ def load_data_from_df(jobs_df: pd.DataFrame, **kwargs):
            gpu_trace = gpu_util * config['GPUS_PER_NODE']

        priority = int(jobs_df.loc[jidx, 'priority'])
        partition = int(jobs_df.loc[jidx, 'partition'])

        submit_timestamp = jobs_df.loc[jidx, 'submit_time']
        diff = submit_timestamp - telemetry_start_timestamp
        submit_time = int(diff.total_seconds())

        time_limit = jobs_df.loc[jidx, 'time_limit']

        start_timestamp = jobs_df.loc[jidx, 'start_time']
        diff = start_timestamp - telemetry_start_timestamp
        start_time = int(diff.total_seconds())

        end_timestamp = jobs_df.loc[jidx, 'end_time']
        diff = end_timestamp - telemetry_start_timestamp
        end_time = int(diff.total_seconds())

        wall_time = int(jobs_df.loc[jidx, 'run_time'])
        if np.isnan(wall_time):
            wall_time = 0
        if wall_time != (end_time - start_time):
            print("wall_time != (end_time - start_time)")
            print(f"{wall_time} != {(end_time - start_time)}")

        trace_time = gpu_trace.size * config['TRACE_QUANTA'] # seconds
        trace_start_time = 0
        trace_end_time = trace_time
        if wall_time > trace_time:
            missing_trace_time = wall_time - trace_time
            if start_time < 0:
                trace_start_time = missing_trace_time
                trace_end_time = wall_time
            elif end_time > telemetry_end:
                trace_start_time = 0
                trace_end_time = trace_time
            else:
                # Telemetry mission at the end
                trace_start_time = 0
                trace_end_time = trace_time
                trace_missing_values = True

        wall_time = gpu_trace.size * config['TRACE_QUANTA'] # seconds
        end_state = jobs_df.loc[jidx, 'job_state']

        time_start = jobs_df.loc[jidx, 'start_time']
        time_start = time_start - time_zero

        time_submit = jobs_df.loc[jidx, 'submit_time']
        time_submit = time_submit - time_zero
        # What does this do?
        #if jid == '*':
        #    # submit_time = max(submit_time.total_seconds(), 0)
        #    submit_timestamp = jobs_df.loc[jidx, 'submit_time']
        #    diff = submit_timestamp - telemetry_start_timestamp
        #    submit_time = diff.total_seconds()

        if jid == '*':
            time_submit = max(time_submit.total_seconds(), 0)
        else:
            # When extracting out a single job, run one iteration past the end of the job
            time_submit = config['UI_UPDATE_FREQ']

        if fastforward:
            time_start -= fastforward
            time_submit -= fastforward
        #else:
        #    # When extracting out a single job, run one iteration past the end of the job
        #    submit_time = config['UI_UPDATE_FREQ']

        if arrival == 'poisson':  # Modify the arrival times according to Poisson distribution
            scheduled_nodes = None
@@ -156,12 +211,21 @@ def load_data_from_df(jobs_df: pd.DataFrame, **kwargs):
        else:  # Prescribed replay
            scheduled_nodes = (jobs_df.loc[jidx, 'nodes']).tolist()

        if gpu_trace.size > 0 and time_submit >= 0:
            job_info = job_dict(nodes_required, name, account, cpu_trace, gpu_trace, [], [], wall_time,
                                end_state, scheduled_nodes, time_submit, job_id, priority, time_start)
        if gpu_trace.size > 0 and (jid == job_id or jid == '*'):  # and time_submit >= 0:

            job_info = job_dict(nodes_required, name, account, cpu_trace, gpu_trace, [], [],
                                end_state, scheduled_nodes,
                                job_id, priority, partition,
                                submit_time=submit_time, time_limit=time_limit,
                                start_time=start_time, end_time=end_time,
                                wall_time=wall_time, trace_time=trace_time,
                                trace_start_time=trace_start_time,
                                trace_end_time=trace_end_time,
                                trace_missing_values=trace_missing_values)

            jobs.append(job_info)

    return jobs
    return jobs, telemetry_start, telemetry_end


def node_index_to_name(index: int, config: dict):
+2 −2
Original line number Diff line number Diff line
@@ -167,13 +167,13 @@ class Engine:
                                       {job.running_time} > {job.wall_time}\n\
                                       {len(job.cpu_trace)} vs. {job.running_time // self.config['TRACE_QUANTA']}\
                                      ")
                if job.running_time < job.trace_start_time or job.running_time > job.trace_end_time:
                if job.running_time < job.trace_start_time or job.running_time >= job.trace_end_time:
                    cpu_util = 0  # No values available therefore we assume IDLE == 0
                    gpu_util = 0
                    net_util = 0
                    if self.debug:
                        print("No Values in trace, using IDLE.")
                    if self.scheduler.policy == PolicyType.REPLAY:
                    if self.scheduler.policy == PolicyType.REPLAY and not job.trace_missing_values:
                        print(f"{job.running_time} < {job.trace_start_time} or {job.running_time} > {job.trace_end_time}")
                        raise Exception("Replay is using IDLE values! Something is wrong!")
                else:
+4 −2
Original line number Diff line number Diff line
@@ -4,7 +4,7 @@ def job_dict(nodes_required, name, account, \
             cpu_trace, gpu_trace, ntx_trace, nrx_trace, \
             end_state, scheduled_nodes, job_id, priority=0, partition=0,
             submit_time=0, time_limit=0, start_time=0, end_time=0,
             wall_time=0, trace_time=0, trace_start_time=0,trace_end_time=0):
             wall_time=0, trace_time=0, trace_start_time=0,trace_end_time=0, trace_missing_values=False):
    """ Return job info dictionary """
    return {
        'nodes_required': nodes_required,
@@ -27,7 +27,9 @@ def job_dict(nodes_required, name, account, \
        'wall_time': wall_time,
        'trace_time': trace_time,
        'trace_start_time': trace_start_time,
        'trace_end_time': trace_end_time
        'trace_end_time': trace_end_time,
        'trace_missing_values': trace_missing_values

    }