Commit 111fbc40 authored by Maiterth, Matthias's avatar Maiterth, Matthias
Browse files

Reworked simulation loop.

Added start and end of the simulation
The simulation now has a start time and an end time
Everything runs related to the reference time zero.

The simulation loop and jobs are adjusted accordingly.
Additionally, a setup stage is added to prepare the running queue.

The data loader is rewritten to adjust the job's
related times.
This is reflected in the frontier data loader with extensive comments on the time layout.
(See below)

------
This is explanation on the frontier data loader:

These form the maximum extent of the simuluation time.
telemetry_start and telemetry_end.

        [                                    ]
        ^                                    ^
        telemetry_start          telemetry_end

These values form the maximum extent of the simulation.
Telemetry start == 0! This means that any time before that is negative,
while anything after this is positive.
Next is the actual extent of the simulation:

        [                                   ]
            ^                   ^
            simulation_start    simulation_end

The start of the simulation simulation_start and telemetry_start are only
the same when fastfoward is 0.
In general simulation_end and telemetry_end are the same, as this is the
last time step we can simulate.
Both simulation_start and _end are set in engine.py

Additionally, jobs can have started before telemetry_start,
And can have a recorded ending after simulation_end,
        [                                   ]
^                                                ^
first_start_timestamp           last_end_timestamp

This means that the time between first_start_timestamp and telemetry_start
has no associated values in the traces!
The missing values after simulation_end can be ignored, as the simulatuion will have stoped before.

However, the times before telemetry_start have to be padded to generate
correct offsets within their data!
Within the simulation a job's current time is specified as the difference
between its start_time and the current timestep of the simulation.

With this each job's
- submit_time
- time_limit
- start_time
- end_time
- wall_time (end_time-start_time, actual runtime in seconds)
- trace_time (lenght of each trace in seconds)
has to be set for use within the simulation
parent c25b3c8f
Loading
Loading
Loading
Loading
+10 −10
Original line number Diff line number Diff line
@@ -75,6 +75,7 @@ if args.replay:

    if args.fastforward:
        args.fastforward = convert_to_seconds(args.fastforward)
        timestep_start = args.fastforward

    td = Telemetry(**args_dict)

@@ -105,18 +106,16 @@ if args.replay:

    else:  # custom data loader
        print(*args.replay)
        jobs = td.load_data(args.replay)
        jobs, timestep_start, timestep_end = td.load_data(args.replay)
        td.save_snapshot(jobs, filename=DIR_NAME)

    # Set number of timesteps based on the last job running which we assume
    # is the maximum value of submit_time + wall_time of all the jobs
    if args.time:
        timesteps = convert_to_seconds(args.time)
    else:
        timesteps = int(max(job['wall_time'] + job['start_time'] for job in jobs)) + 1
        timestep_end = convert_to_seconds(args.time)
    elif not timestep_end:
        timestep_end = int(max(job['wall_time'] + job['start_time'] for job in jobs)) + 1

    print(f'Simulating {len(jobs)} jobs for {timesteps} seconds')
    time.sleep(1)

else:  # Synthetic jobs
    wl = Workload(config)
@@ -124,14 +123,14 @@ else: # Synthetic jobs

    if args.verbose:
        for job_vector in jobs:
            job = Job(job_vector, 0)  # What does 0 stand for here?
            job = Job(job_vector)
            print('jobid:', job.id, '\tlen(gpu_trace):', len(job.gpu_trace), '\twall_time(s):', job.wall_time)
        time.sleep(2)

    if args.time:
        timesteps = convert_to_seconds(args.time)
        timestep_end = convert_to_seconds(args.time)
    else:
        timesteps = 88200  # 24 hours
        timestep_end = 88200  # 24 hours

    DIR_NAME = create_casename()

@@ -157,7 +156,8 @@ if args.plot or args.output:
if args.verbose:
    print(jobs)

layout_manager.run(jobs, timesteps=timesteps)
print(f'Simulating {len(jobs)} jobs for {timestep_end - timestep_start} seconds')
layout_manager.run(jobs, timestep_start=timestep_start, timestep_end=timestep_end)

engine_stats = get_engine_stats(sc)
job_stats = get_job_stats(sc)
+134 −52
Original line number Diff line number Diff line
@@ -57,20 +57,78 @@ def load_data_from_df(jobs_df: pd.DataFrame, jobprofile_df: pd.DataFrame, **kwar
    -------
    list
        The list of parsed jobs.

    telemetry_start
        the first timestep in which the simulation be executed.

    telemetry_end
        the last timestep in which the simulation can be executed.
    ----
    Explanation regarding times:

    The loaded dataframe contains
    a first timestamp with associated data
    and a last timestamp with associated data

    These form the maximum extent of the simuluation time.
    telemetry_start and telemetry_end.

            [                                    ]
            ^                                    ^
            telemetry_start          telemetry_end

    These values form the maximum extent of the simulation.
    Telemetry start == 0! This means that any time before that is negative,
    while anything after this is positive.
    Next is the actual extent of the simulation:

            [                                   ]
                ^                   ^
                simulation_start    simulation_end

    The start of the simulation simulation_start and telemetry_start are only
    the same when fastfoward is 0.
    In general simulation_end and telemetry_end are the same, as this is the
    last time step we can simulate.
    Both simulation_start and _end are set in engine.py

    Additionally, jobs can have started before telemetry_start,
    And can have a recorded ending after simulation_end,
            [                                   ]
    ^                                                ^
    first_start_timestamp           last_end_timestamp

    This means that the time between first_start_timestamp and telemetry_start
    has no associated values in the traces!
    The missing values after simulation_end can be ignored, as the simulatuion will have stoped before.

    However, the times before telemetry_start have to be padded to generate
    correct offsets within their data!
    Within the simulation a job's current time is specified as the difference
    between its start_time and the current timestep of the simulation.

    With this each job's
    - submit_time
    - time_limit
    - start_time
    - end_time
    - wall_time (end_time-start_time, actual runtime in seconds)
    - trace_time (lenght of each trace in seconds)
    has to be set for use within the simulation

    The returned values are these three:
        - The list of parsed jobs. (as a job_dict)
        - telemetry_start: int (in seconds)
        - telemetry_end: int (in seconds)

    The implementation follows:
    """
    config = kwargs.get('config')
    encrypt_bool = kwargs.get('encrypt')
    fastforward = kwargs.get('fastforward')
    arrival = kwargs.get('arrival')
    validate = kwargs.get('validate')
    jid = kwargs.get('jid', '*')

    if fastforward:
        print(f"fast-forwarding {fastforward} seconds")
    else:
        fastforward = 0

    min_time = kwargs.get('min_time', None)
    debug = kwargs.get('debug')

    # Sort jobs dataframe based on values in time_start column, adjust indices after sorting
    jobs_df = jobs_df[jobs_df['time_start'].notna()]
@@ -85,17 +143,27 @@ def load_data_from_df(jobs_df: pd.DataFrame, jobprofile_df: pd.DataFrame, **kwar
    jobprofile_df = jobprofile_df.sort_values(by='timestamp')
    jobprofile_df = jobprofile_df.reset_index(drop=True)

    # Take earliest time as baseline reference
    if min_time:
        time_zero = min_time
    else:
        time_zero = jobs_df['time_snapshot'].min()  # Earliets time snapshot within the day!
    first_start_time = jobs_df['time_start'].min()
    diff = time_zero - first_start_time  # Check if fast forward makes sense!
    fastforward += diff.total_seconds()
    #telemetry_start_timestamp = jobs_df['time_snapshot'].min()  # Earliets time snapshot within the day!
    telemetry_start_timestamp = jobprofile_df['timestamp'].min()  # Earliets time snapshot within the day!
    #telemetry_end_timestamp = jobs_df['time_snapshot'].max()  # This time has nothing to do with the jobs!
    telemetry_end_timestamp = jobprofile_df['timestamp'].max()  # Earliets time snapshot within the day!

    # Time that can be simulated # Take earliest time as baseline reference
    telemetry_start = 0  # second 0 of the simulation
    diff = telemetry_end_timestamp - telemetry_start_timestamp
    telemetry_end = int(diff.total_seconds())

    first_start_timestamp = jobs_df['time_start'].min()
    diff = first_start_timestamp - telemetry_start_timestamp
    first_start = int(diff.total_seconds())  # negative seconds or 0


    num_jobs = len(jobs_df)
    print("time_zero:", time_zero, "num_jobs", num_jobs)
    if debug:
        print("num_jobs:", num_jobs)
        print("telemetry_start:", telemetry_start, "simulation_fin", telemetry_end)
        print("telemetry_start_timestamp:", telemetry_start_timestamp, "telemetry_end_timestamp", telemetry_end_timestamp)
        print("first_start_timestamp:",first_start_timestamp, "last start timestamp:", jobs_df['time_start'].max())

    jobs = []
    # Map dataframe to job state. Add results to jobs list
@@ -112,13 +180,13 @@ def load_data_from_df(jobs_df: pd.DataFrame, jobprofile_df: pd.DataFrame, **kwar
            name = encrypt(name)

        if validate:
            cpu_power = jobprofile_df[jobprofile_df['allocation_id']
            cpu_power = jobprofile_df[jobprofile_df['allocation_id'] \
                                      == allocation_id]['mean_node_power']
            cpu_trace = cpu_power.values
            gpu_trace = cpu_trace

        else:
            cpu_power = jobprofile_df[jobprofile_df['allocation_id']
            cpu_power = jobprofile_df[jobprofile_df['allocation_id'] \
                                      == allocation_id]['sum_cpu0_power']
            cpu_power_array = cpu_power.values
            cpu_min_power = nodes_required * config['POWER_CPU_IDLE'] * config['CPUS_PER_NODE']
@@ -126,7 +194,7 @@ def load_data_from_df(jobs_df: pd.DataFrame, jobprofile_df: pd.DataFrame, **kwar
            cpu_util = power_to_utilization(cpu_power_array, cpu_min_power, cpu_max_power)
            cpu_trace = cpu_util * config['CPUS_PER_NODE']

            gpu_power = jobprofile_df[jobprofile_df['allocation_id']
            gpu_power = jobprofile_df[jobprofile_df['allocation_id'] \
                                      == allocation_id]['sum_gpu_power']
            gpu_power_array = gpu_power.values

@@ -139,41 +207,43 @@ def load_data_from_df(jobs_df: pd.DataFrame, jobprofile_df: pd.DataFrame, **kwar
        cpu_trace[np.isnan(cpu_trace)] = 0
        gpu_trace[np.isnan(gpu_trace)] = 0


        time_submit_timestamp = jobs_df.loc[jidx, 'time_submission']
        diff = time_submit_timestamp - time_zero
        # time_submit = max(diff.total_seconds(), 0)
        time_submit = diff.total_seconds()
        # Times:
        submit_timestamp = jobs_df.loc[jidx, 'time_submission']
        diff = submit_timestamp - telemetry_start_timestamp
        submit_time = diff.total_seconds()

        time_limit = jobs_df.loc[jidx, 'time_limit']  # timelimit in seconds

        time_start_timestamp = jobs_df.loc[jidx, 'time_start']
        diff = time_start_timestamp - time_zero
        # time_start = max(diff.total_seconds(), 0)
        time_start = diff.total_seconds()
        start_timestamp = jobs_df.loc[jidx, 'time_start']
        diff = start_timestamp - telemetry_start_timestamp
        start_time = diff.total_seconds()

        end_time_timestamp = jobs_df.loc[jidx, 'time_end']
        diff = end_time_timestamp - telemetry_start_timestamp
        end_time = diff.total_seconds()

        time_end_timestamp = jobs_df.loc[jidx, 'time_end']
        diff = time_end_timestamp - time_zero
        time_end = diff.total_seconds()

        wall_time = time_end - time_start
        wall_time = end_time - start_time
        if np.isnan(wall_time):
            wall_time = 0

        trace_time = gpu_trace.size * config['TRACE_QUANTA']  # seconds
        if wall_time > trace_time:
            missing_steps = int(wall_time - trace_time)
            if start_time < 0:
                cpu_trace = np.concatenate((np.array([cpu_min_power] * missing_steps),cpu_trace))
                gpu_trace = np.concatenate((np.array([cpu_min_power] * missing_steps),gpu_trace))
                print(f"Job: {job_id} prepended {missing_steps} Values with idle power!")
                print(f"{start_time} - {end_time}")
            elif end_time > telemetry_end:
                cpu_trace = np.concatenate((cpu_trace,np.array([cpu_min_power] * missing_steps)))
            gpu_trace = np.concatenate((cpu_trace,np.array([cpu_min_power] * missing_steps)))
            wall_time = trace_time  # Pretending to have a full trace
            print(f"Job: {job_id} extended {missing_steps} Values with idle power!")
            #raise ValueError(f"Job: {job_id} {wall_time} > {trace_time}")


        if fastforward:
            time_submit -= fastforward
            time_start -= fastforward
            time_end -= fastforward
                gpu_trace = np.concatenate((gpu_trace,np.array([cpu_min_power] * missing_steps)))
                print(f"Job: {job_id} appended {missing_steps} Values with idle power!")
                print(f"{start_time} - {end_time}")
            else:
                print(f"Job: {job_id} {start_time} - {end_time}!")
                raise ValueError("Missing values not at start nor end.")
            trace_time = wall_time  # Pretending to have a full trace, This may not be needed!

        xnames = jobs_df.loc[jidx, 'xnames']
        # Don't replay any job with an empty set of xnames
@@ -182,9 +252,9 @@ def load_data_from_df(jobs_df: pd.DataFrame, jobprofile_df: pd.DataFrame, **kwar

        if arrival == 'poisson':  # Modify the arrival times of the jobs according to Poisson distribution
            scheduled_nodes = None
            time_offset = next_arrival(1 / config['JOB_ARRIVAL_TIME'])
            time_start = None  # ?
            time_end = None  # ?
            submit_time = next_arrival(1 / config['JOB_ARRIVAL_TIME'])
            start_time = None  # ?
            end_time = None  # ?
            priority = aging_boost(nodes_required)

        else:  # Prescribed replay
@@ -194,19 +264,31 @@ def load_data_from_df(jobs_df: pd.DataFrame, jobprofile_df: pd.DataFrame, **kwar
                indices = xname_to_index(xname, config)
                scheduled_nodes.append(indices)

        # Throw out jobs that are not valid!
        if gpu_trace.size == 0:
            print("ignoring job b/c zero trace:", jidx, time_submit, time_start, nodes_required)
            print("ignoring job b/c zero trace:", jidx, submit_time, start_time, nodes_required)
            continue  # SKIP!
        if end_time < telemetry_start:
            # raise ValueError("Job ends before frist recorded telemetry entry:",job_id, "start:", start_time,"end:",end_time, " Telemetry: ", len(gpu_trace), "entries.")
            print("Job ends before frist recorded telemetry entry:",job_id, "start:", start_time,"end:",end_time, " Telemetry: ", len(gpu_trace), "entries.")
            continue  # SKIP!
        if start_time > telemetry_end:
            # raise ValueError("Job starts after last recorded telemetry entry:",job_id, "start:", start_time,"end:",end_time, " Telemetry: ", len(gpu_trace), "entries.")
            print("Job starts after last recorded telemetry entry:",job_id, "start:", start_time,"end:",end_time, " Telemetry: ", len(gpu_trace), "entries.")
            continue  # SKIP!



        if gpu_trace.size > 0 and (jid == job_id or jid == '*') and time_submit >= 0:
        if gpu_trace.size > 0 and (jid == job_id or jid == '*'):  # and time_submit >= 0:
            job_info = job_dict(nodes_required, name, account, cpu_trace, gpu_trace, [], [],
                                end_state, scheduled_nodes,
                                job_id, priority,  # partition missing
                                submit_time=time_submit, time_limit=time_limit,
                                start_time=time_start, end_time=time_end,
                                submit_time=submit_time, time_limit=time_limit,
                                start_time=start_time, end_time=end_time,
                                wall_time=wall_time, trace_time=trace_time)
            jobs.append(job_info)

    return jobs
    return jobs, telemetry_start, telemetry_end


def xname_to_index(xname: str, config: dict):
+62 −22
Original line number Diff line number Diff line
@@ -66,10 +66,36 @@ class Engine:
        )
        print(f"Using scheduler: {scheduler_type}")

    def eligible_jobs(self, jobs_to_submit: List):

    def add_running_jobs_to_queue(self, jobs_to_submit: List):
        """
        Mofifies jobs_to_submit
        and self.queue

        This is a preparatory step and should only be called before the main
        loop of run_simulation.
        Adds running jobs to the queueu, and removes them from the jobs_to_submit
        jobs_to_submit still holds the jobs that need be submitted in the future.
        """
        # Build a list of jobs whose start_time is <= current_time.
        eligible = [job for job in jobs_to_submit if job['start_time'] < self.current_time]
        # Remove those jobs from jobs_to_submit:
        jobs_to_submit[:] = [job for job in jobs_to_submit if job['start_time'] >= self.current_time]
        # Convert them to Job instances and build list of eligible jobs.
        eligible_jobs_list = []
        for job_data in eligible:
            job_instance = Job(job_data)
            eligible_jobs_list.append(job_instance)
        self.queue += eligible_jobs_list


    def add_eligible_jobs_to_queue(self, jobs_to_submit: List):
        """
        Returns list of eligible jobs and:
        modifies the jobs_to_submit removing them from the passed list (Mutable)!
        Mofifies jobs_to_submit
        and self.queue

        Adds eligible jobs to the queueu, and removes them from the jobs_to_submit
        jobs_to_submit still holds the jobs that need be submitted in the future.
        """
        # Build a list of jobs whose submit_time is <= current_time.
        eligible = [job for job in jobs_to_submit if job['submit_time'] <= self.current_time]
@@ -78,14 +104,16 @@ class Engine:
        # Convert them to Job instances and build list of eligible jobs.
        eligible_jobs_list = []
        for job_data in eligible:
            job_instance = Job(job_data, self.current_time)  # current_time is not used in Job()
            job_instance = Job(job_data)
            eligible_jobs_list.append(job_instance)
        return eligible_jobs_list
        self.queue += eligible_jobs_list

    def prepare_timestep(self, replay:bool = True):
        completed_jobs = [job for job in self.running if job.end_time is not None and job.end_time <= self.current_time]

        for job in completed_jobs:
            job.state = JobState.COMPLETED

            self.running.remove(job)
            self.jobs_completed += 1
            job_stats = job.statistics()
@@ -109,7 +137,6 @@ class Engine:
                              - len(self.resource_manager.available_nodes) \
                              - len(self.resource_manager.down_nodes)


        return completed_jobs, newly_downed_nodes


@@ -122,8 +149,6 @@ class Engine:
        gpu_utils = []
        net_utils = []
        for job in self.running:
            if job.end_time <= self.current_time:
                job.state = JobState.COMPLETED

            if job.state == JobState.RUNNING:
                job.running_time = self.current_time - job.start_time
@@ -132,7 +157,7 @@ class Engine:
                                       {job.running_time} > {job.trace_time}\n\
                                       {len(job.cpu_trace)} vs. {self.running_time // self.config['TRACE_QUANTA']}\
                                      ")
                time_quanta_index = (self.current_time - job.start_time) // self.config['TRACE_QUANTA']
                time_quanta_index = int(job.running_time // self.config['TRACE_QUANTA'])
                cpu_util = get_utilization(job.cpu_trace, time_quanta_index)
                gpu_util = get_utilization(job.gpu_trace, time_quanta_index)
                net_util = 0
@@ -145,9 +170,11 @@ class Engine:
                else:
                    net_utils.append(0)

                scheduled_nodes.append(job.scheduled_nodes)
                scheduled_nodes.append(job.scheduled_nodes)  # ?
                cpu_utils.append(cpu_util)
                gpu_utils.append(gpu_util)
            else:
                raise ValueError(f"Job is in running list, but state is not RUNNING: job.state == {job.state}")

        if len(scheduled_nodes) > 0:
            self.flops_manager.update_flop_state(scheduled_nodes, cpu_utils, gpu_utils)
@@ -225,27 +252,42 @@ class Engine:
        self.current_time += 1
        return tick_data

    def run_simulation(self, jobs, timesteps, autoshutdown=False):
        """Generator that yields after each simulation tick."""
        self.timesteps = timesteps
    def prepare_system_state(self, all_jobs:List, timestep_start):
        # Modifies Jobs object
        self.current_time = timestep_start

        # Sort pending jobs by submit_time.
        jobs_to_submit = sorted(jobs, key=lambda j: j['submit_time'])
        #keep only jobs that have not yet ended
        all_jobs[:] = [job for job in all_jobs if job['end_time'] >= timestep_start]

        # Missing prepareation:
        # Remove Jobs that have already ended.
        # Place jobs that are currently running.
        all_jobs.sort(key=lambda j: j['submit_time'])

        self.add_running_jobs_to_queue(all_jobs)
        # Now process job queue one by one (needed to get the start_time right!)
        for job in self.queue:
            self.scheduler.schedule([job], self.running, job.start_time, sorted=True)
        if len(self.queue) != len(self.running):
            raise ValueError(f"Something went wrong! Not all jobs could be placed!\nPotential confligt in queue:\n{self.queue}")
        self.queue = []  # Empty queue needed as addition one by one does not empty the queue!

    def run_simulation(self, jobs, timestep_start, timestep_end, autoshutdown=False):
        """Generator that yields after each simulation tick."""
        self.timesteps = timestep_end - timestep_start  # Where is this used?

        # Place jobs that are currently running, onto the system.
        self.prepare_system_state(jobs, timestep_start)

        if self.scheduler.policy == PolicyType.REPLAY:
            replay = True
        else:
            replay = False

        for timestep in range(timesteps):
        for timestep in range(timestep_start,timestep_end):
            completed_jobs, newly_downed_nodes = self.prepare_timestep(replay)

            # Identify eligible jobs and add them to the queue.
            self.queue += self.eligible_jobs(jobs_to_submit)
            #self.queue += self.eligible_jobs(jobs, self.current_time)
            #jobs = self.add_eligible_jobs_to_queue(jobs)
            self.add_eligible_jobs_to_queue(jobs)
            # Schedule jobs that are now in the queue.
            self.scheduler.schedule(self.queue, self.running, self.current_time, sorted=False)

@@ -257,8 +299,6 @@ class Engine:
            if self.debug and timestep % self.config['UI_UPDATE_FREQ'] == 0:
                print(".", end="", flush=True)



            tick_data = self.tick()
            tick_data.completed = completed_jobs
            yield tick_data
+13 −9
Original line number Diff line number Diff line
@@ -49,11 +49,12 @@ class Job:
    """
    _id_counter = 0

    def __init__(self, job_dict, current_time, state=JobState.PENDING, account=None):
    def __init__(self, job_dict, state=JobState.PENDING, account=None):
        # # current_time unused!
        # Initializations:
        self.power = 0
        self.scheduled_nodes = []
        self.scheduled_nodes = []  # Explicit list of requested nodes
        self.nodes_required = 0  # If scheduled_nodes is set this can be derived.
        self.power_history = []
        self._state = state
        self.account = account
@@ -73,6 +74,9 @@ class Job:
        if not self.id:
            self.id = Job._get_next_id()

        if self.scheduled_nodes and self.nodes_required == 0:
            self.nodes_required = len(self.scheduled_nodes)

    def __repr__(self):
        """Return a string representation of the job."""
        return (f"Job(id={self.id}, name={self.name}, account={self.account}, "
+2 −0
Original line number Diff line number Diff line
@@ -8,3 +8,5 @@ class PolicyType(Enum):
    PRIORITY = 'priority'
    FUGAKU_PTS = 'fugaku_pts'
    REPLAY = 'replay'
    SJF = 'sjf'
    LJF = 'ljf'
Loading