Commit c25b3c8f authored by Maiterth, Matthias's avatar Maiterth, Matthias
Browse files

Added a replay scheduler, for testing and reworked Job class tick

Modified Job so that they have:
time_submit
time_limit_
time_start
time_end
wall_time
trace_time
and
current_time

Modified engine:
tick:
introduced new prepare_timestep function consistig of two tasks from tick:
1. moved newly downed nodes out of tick
2. moved completed nodes out of tick
prepare_timestep is now called at the beginning of each run_simulation loop

Added the replay scheduler
Modified the default scheduler to be able to use the replay policy, matching the replay scheduler functionality

Finally modified the workloads.py to match the new job description.

Note this is not yet fully tested, this breaks everything but frontier telemetry.
Next: Testing, and fixing the related issues!
parent 567ff14d
Loading
Loading
Loading
Loading
+2 −2
Original line number Diff line number Diff line
@@ -113,7 +113,7 @@ if args.replay:
    if args.time:
        timesteps = convert_to_seconds(args.time)
    else:
        timesteps = int(max(job['wall_time'] + job['submit_time'] for job in jobs)) + 1
        timesteps = int(max(job['wall_time'] + job['start_time'] for job in jobs)) + 1

    print(f'Simulating {len(jobs)} jobs for {timesteps} seconds')
    time.sleep(1)
@@ -124,7 +124,7 @@ else: # Synthetic jobs

    if args.verbose:
        for job_vector in jobs:
            job = Job(job_vector, 0)
            job = Job(job_vector, 0)  # What does 0 stand for here?
            print('jobid:', job.id, '\tlen(gpu_trace):', len(job.gpu_trace), '\twall_time(s):', job.wall_time)
        time.sleep(2)

+45 −13
Original line number Diff line number Diff line
@@ -67,6 +67,8 @@ def load_data_from_df(jobs_df: pd.DataFrame, jobprofile_df: pd.DataFrame, **kwar

    if fastforward:
        print(f"fast-forwarding {fastforward} seconds")
    else:
        fastforward = 0

    min_time = kwargs.get('min_time', None)

@@ -87,7 +89,10 @@ def load_data_from_df(jobs_df: pd.DataFrame, jobprofile_df: pd.DataFrame, **kwar
    if min_time:
        time_zero = min_time
    else:
        time_zero = jobs_df['time_snapshot'].min()
        time_zero = jobs_df['time_snapshot'].min()  # Earliets time snapshot within the day!
    first_start_time = jobs_df['time_start'].min()
    diff = time_zero - first_start_time  # Check if fast forward makes sense!
    fastforward += diff.total_seconds()

    num_jobs = len(jobs_df)
    print("time_zero:", time_zero, "num_jobs", num_jobs)
@@ -134,19 +139,41 @@ def load_data_from_df(jobs_df: pd.DataFrame, jobprofile_df: pd.DataFrame, **kwar
        cpu_trace[np.isnan(cpu_trace)] = 0
        gpu_trace[np.isnan(gpu_trace)] = 0

        wall_time = gpu_trace.size * config['TRACE_QUANTA']  # seconds

        time_submit = jobs_df.loc[jidx, 'time_submission']
        diff = time_submit - time_zero
        time_submit = max(diff.total_seconds(), 0)
        time_submit_timestamp = jobs_df.loc[jidx, 'time_submission']
        diff = time_submit_timestamp - time_zero
        # time_submit = max(diff.total_seconds(), 0)
        time_submit = diff.total_seconds()

        time_limit = jobs_df.loc[jidx, 'time_limit']  # timelimit in seconds

        time_start_timestamp = jobs_df.loc[jidx, 'time_start']
        diff = time_start_timestamp - time_zero
        # time_start = max(diff.total_seconds(), 0)
        time_start = diff.total_seconds()

        time_end_timestamp = jobs_df.loc[jidx, 'time_end']
        diff = time_end_timestamp - time_zero
        time_end = diff.total_seconds()

        wall_time = time_end - time_start
        if np.isnan(wall_time):
            wall_time = 0

        trace_time = gpu_trace.size * config['TRACE_QUANTA']  # seconds
        if wall_time > trace_time:
            missing_steps = int(wall_time - trace_time)
            cpu_trace = np.concatenate((cpu_trace,np.array([cpu_min_power] * missing_steps)))
            gpu_trace = np.concatenate((cpu_trace,np.array([cpu_min_power] * missing_steps)))
            wall_time = trace_time  # Pretending to have a full trace
            print(f"Job: {job_id} extended {missing_steps} Values with idle power!")
            #raise ValueError(f"Job: {job_id} {wall_time} > {trace_time}")

        time_start = jobs_df.loc[jidx, 'time_start']
        diff = time_start - time_zero
        time_start = max(diff.total_seconds(), 0)

        if fastforward:
            time_start -= fastforward
            time_submit -= fastforward
            time_start -= fastforward
            time_end -= fastforward

        xnames = jobs_df.loc[jidx, 'xnames']
        # Don't replay any job with an empty set of xnames
@@ -156,7 +183,8 @@ def load_data_from_df(jobs_df: pd.DataFrame, jobprofile_df: pd.DataFrame, **kwar
        if arrival == 'poisson':  # Modify the arrival times of the jobs according to Poisson distribution
            scheduled_nodes = None
            time_offset = next_arrival(1 / config['JOB_ARRIVAL_TIME'])
            time_start = None
            time_start = None  # ?
            time_end = None  # ?
            priority = aging_boost(nodes_required)

        else:  # Prescribed replay
@@ -170,8 +198,12 @@ def load_data_from_df(jobs_df: pd.DataFrame, jobprofile_df: pd.DataFrame, **kwar
            print("ignoring job b/c zero trace:", jidx, time_submit, time_start, nodes_required)

        if gpu_trace.size > 0 and (jid == job_id or jid == '*') and time_submit >= 0:
            job_info = job_dict(nodes_required, name, account, cpu_trace, gpu_trace, [], [], wall_time,
                                end_state, scheduled_nodes, time_submit, job_id, priority, time_start)
            job_info = job_dict(nodes_required, name, account, cpu_trace, gpu_trace, [], [],
                                end_state, scheduled_nodes,
                                job_id, priority,  # partition missing
                                submit_time=time_submit, time_limit=time_limit,
                                start_time=time_start, end_time=time_end,
                                wall_time=wall_time, trace_time=trace_time)
            jobs.append(job_info)

    return jobs
+49 −19
Original line number Diff line number Diff line
@@ -5,6 +5,7 @@ import pandas as pd
import sys

from .job import Job, JobState
from .policy import PolicyType
from .network import network_utilization
from .utils import summarize_ranges, expand_ranges, get_utilization
from .utils import sum_values, min_value, max_value
@@ -77,18 +78,30 @@ class Engine:
        # Convert them to Job instances and build list of eligible jobs.
        eligible_jobs_list = []
        for job_data in eligible:
            job_instance = Job(job_data, self.current_time)
            job_instance = Job(job_data, self.current_time)  # current_time is not used in Job()
            eligible_jobs_list.append(job_instance)
        return eligible_jobs_list

    def tick(self):
        """Simulate a timestep."""
    def prepare_timestep(self, replay:bool = True):
        completed_jobs = [job for job in self.running if job.end_time is not None and job.end_time <= self.current_time]

        for job in completed_jobs:
            self.running.remove(job)
            self.jobs_completed += 1
            job_stats = job.statistics()
            if self.accounts:
                self.accounts.update_account_statistics(job_stats)
            self.job_history_dict.append(job_stats.__dict__)
            # Free the nodes via the resource manager.
            self.resource_manager.free_nodes_from_job(job)

        if not replay:
            # Simulate node failure
            newly_downed_nodes = self.resource_manager.node_failure(self.config['MTBF'])
            for node in newly_downed_nodes:
                self.power_manager.set_idle(node)
        else:
            newly_downed_nodes = []

        # Update active/free nodes
        self.num_free_nodes = len(self.resource_manager.available_nodes)
@@ -96,17 +109,29 @@ class Engine:
                              - len(self.resource_manager.available_nodes) \
                              - len(self.resource_manager.down_nodes)


        return completed_jobs, newly_downed_nodes


    def tick(self):
        """Simulate a timestep."""

        # Update running time for all running jobs
        scheduled_nodes = []
        cpu_utils = []
        gpu_utils = []
        net_utils = []
        for job in self.running:
            if job.end_time == self.current_time:
            if job.end_time <= self.current_time:
                job.state = JobState.COMPLETED

            if job.state == JobState.RUNNING:
                job.running_time = self.current_time - job.start_time
                if job.running_time > job.trace_time:
                    raise ValueError(f"Trace Ended before job ended!\n\
                                       {job.running_time} > {job.trace_time}\n\
                                       {len(job.cpu_trace)} vs. {self.running_time // self.config['TRACE_QUANTA']}\
                                      ")
                time_quanta_index = (self.current_time - job.start_time) // self.config['TRACE_QUANTA']
                cpu_util = get_utilization(job.cpu_trace, time_quanta_index)
                gpu_util = get_utilization(job.gpu_trace, time_quanta_index)
@@ -136,15 +161,6 @@ class Engine:
                    job.power_history.append(jobs_power[i] * len(job.scheduled_nodes))
            del _running_jobs

        for job in completed_jobs:
            self.running.remove(job)
            self.jobs_completed += 1
            job_stats = job.statistics()
            if self.accounts:
                self.accounts.update_account_statistics(job_stats)
            self.job_history_dict.append(job_stats.__dict__)
            # Free the nodes via the resource manager.
            self.resource_manager.free_nodes_from_job(job)

        # Update the power array UI component
        rack_power, rect_losses = self.power_manager.compute_rack_power()
@@ -192,7 +208,7 @@ class Engine:

        tick_data = TickData(
            current_time=self.current_time,
            completed=completed_jobs,
            completed=None,
            running=self.running,
            queue=self.queue,
            down_nodes=expand_ranges(self.down_nodes[1:]),
@@ -216,7 +232,17 @@ class Engine:
        # Sort pending jobs by submit_time.
        jobs_to_submit = sorted(jobs, key=lambda j: j['submit_time'])

        # Missing prepareation:
        # Remove Jobs that have already ended.
        # Place jobs that are currently running.

        if self.scheduler.policy == PolicyType.REPLAY:
            replay = True
        else:
            replay = False

        for timestep in range(timesteps):
            completed_jobs, newly_downed_nodes = self.prepare_timestep(replay)

            # Identify eligible jobs and add them to the queue.
            self.queue += self.eligible_jobs(jobs_to_submit)
@@ -231,7 +257,11 @@ class Engine:
            if self.debug and timestep % self.config['UI_UPDATE_FREQ'] == 0:
                print(".", end="", flush=True)

            yield self.tick()


            tick_data = self.tick()
            tick_data.completed = completed_jobs
            yield tick_data

    def get_job_history_dict(self):
        return self.job_history_dict
+29 −12
Original line number Diff line number Diff line
from enum import Enum

def job_dict(nodes_required, name, account, cpu_trace, gpu_trace, ntx_trace, nrx_trace, \
             wall_time, end_state, scheduled_nodes, time_offset, job_id, priority=0, partition=0, start_time=0):
def job_dict(nodes_required, name, account, \
             cpu_trace, gpu_trace, ntx_trace, nrx_trace, \
             end_state, scheduled_nodes, job_id, priority=0, partition=0,
             submit_time=0, time_limit=0, start_time=0, end_time=0,
             wall_time=0, trace_time=0):
    """ Return job info dictionary """
    return {
        'nodes_required': nodes_required,
@@ -11,14 +14,18 @@ def job_dict(nodes_required, name, account, cpu_trace, gpu_trace, ntx_trace, nrx
        'gpu_trace': gpu_trace,
        'ntx_trace': ntx_trace,
        'nrx_trace': nrx_trace,
        'wall_time': wall_time,
        'end_state': end_state,
        'requested_nodes': scheduled_nodes,
        'submit_time': time_offset,
        'id': job_id,
        'priority': priority,
        'partition': partition,
        'start_time': start_time
        # Times:
        'submit_time': submit_time,
        'time_limit': time_limit,
        'start_time': start_time,
        'end_time': end_time,
        'wall_time': wall_time,
        'trace_time': trace_time
    }


@@ -36,22 +43,29 @@ class Job:
    """Represents a job to be scheduled and executed in the distributed computing system.

    Each job consists of various attributes such as the number of nodes required for execution,
    CPU and GPU utilization, wall time, and other relevant parameters (see utils.job_dict).
    CPU and GPU utilization, trace time, and other relevant parameters (see utils.job_dict).
    The job can transition through different states during its lifecycle, including PENDING,
    RUNNING, COMPLETED, CANCELLED, FAILED, or TIMEOUT.
    """
    _id_counter = 0

    def __init__(self, job_dict, current_time, state=JobState.PENDING, account=None):
        # # current_time unused!
        # Initializations:
        self.start_time = None
        self.end_time = None
        self.running_time = 0
        self.power = 0
        self.scheduled_nodes = []
        self.power_history = []
        self._state = state
        self.account = account
        # Times:
        self.submit_time = None  # Actual submit time
        self.time_limit = None   # Time limit set at submission
        self.start_time = None   # Actual start time when executing or from telemetry
        self.end_time = None     # Actual end time when executing or from telemetry
        self.wall_time = None    # end_time - start_time
        self.trace_time = None   # Time period for which traces are available
        self.running_time = 0    # Current running time updated when simulating

        # If a job dict was given, override the values from the job_dict:
        for key, value in job_dict.items():
            setattr(self, key, value)
@@ -63,10 +77,13 @@ class Job:
        """Return a string representation of the job."""
        return (f"Job(id={self.id}, name={self.name}, account={self.account}, "
                f"nodes_required={self.nodes_required}, "
                f"cpu_trace={self.cpu_trace}, gpu_trace={self.gpu_trace}, wall_time={self.wall_time}, "
                f"cpu_trace={self.cpu_trace}, gpu_trace={self.gpu_trace}, "
                f"end_state={self.end_state}, requested_nodes={self.requested_nodes}, "
                f"submit_time={self.submit_time}, start_time={self.start_time}, "
                f"end_time={self.end_time}, running_time={self.running_time}, state={self._state}, "
                f"submit_time={self.submit_time}, time_limit={self.time_limit}, "
                f"start_time={self.start_time}, end_time={self.end_time}, "
                f"wall_time={self.wall_time}, "
                f"trace_time={self.trace_time}, "
                f"running_time={self.running_time}, state={self._state}, "
                f"scheduled_nodes={self.scheduled_nodes}, power={self.power}, "
                f"power_history={self.power_history})")

+2 −1
Original line number Diff line number Diff line
@@ -25,7 +25,8 @@ class ResourceManager:
            job.scheduled_nodes = self.available_nodes[:job.nodes_required]
            self.available_nodes = self.available_nodes[job.nodes_required:]

        # Set job start and end times
        # Set job start and end times according to simulation
        # This overrides actual times from telemetry and set state to 'running'
        job.start_time = current_time
        job.end_time = current_time + job.wall_time
        job.state = JobState.RUNNING  # Mark job as running
Loading