Commit 202799f1 authored by Brewer, Wes's avatar Brewer, Wes
Browse files

Refactor job representation to use dict instead of list

parent 73ccb6bf
Loading
Loading
Loading
Loading
+2 −2
Original line number Diff line number Diff line
@@ -131,7 +131,7 @@ if args.replay:
    if args.time:
        timesteps = convert_to_seconds(args.time)
    else:
        timesteps = int(max(job[4] + job[7] for job in jobs)) + 1
        timesteps = int(max(job['wall_time'] + job['submit_time'] for job in jobs)) + 1

    print(f'Running simulation for {timesteps} seconds')
    time.sleep(1)
+4 −13
Original line number Diff line number Diff line
@@ -2,7 +2,7 @@ import numpy as np
import pandas as pd

from ..config import load_config_variables
from ..utils import power_to_utilization, next_arrival, encrypt
from ..utils import power_to_utilization, next_arrival, encrypt, job_dict

load_config_variables([
    'CPUS_PER_NODE',
@@ -135,18 +135,9 @@ def load_data_from_df(jobs_df: pd.DataFrame, jobprofile_df: pd.DataFrame, **kwar
                scheduled_nodes.append(indices)

        if gpu_trace.size > 0 and (jid == job_id or jid == '*'):
            jobs.append([
                nodes_required,
                name,
                cpu_trace,
                gpu_trace,
                wall_time,
                end_state,
                scheduled_nodes,
                time_offset,
                job_id,
                0 # priority (not supported for Frontier at the moment)
            ])
            job_info = job_dict(nodes_required, name, cpu_trace, gpu_trace, wall_time, 
                                end_state, scheduled_nodes, time_offset, job_id)
            jobs.append(job_info)

    return jobs

+4 −14
Original line number Diff line number Diff line
@@ -2,7 +2,7 @@ import uuid
import pandas as pd

from ..config import load_config_variables
from ..utils import power_to_utilization, next_arrival
from ..utils import power_to_utilization, next_arrival, job_dict

load_config_variables([
    'CPUS_PER_NODE',
@@ -130,19 +130,9 @@ def load_data_from_df(jobs_df: pd.DataFrame, **kwargs):
        else: # Prescribed replay
            scheduled_nodes = (jobs_df.loc[i, 'nodes']).tolist()
            
        # if gpu_trace.size > 0 and (jid == job_id or jid == '*'):
        if (gpu_trace.size > 0):
            jobs.append([
                nodes_required,
                name,
                cpu_trace,
                gpu_trace,
                wall_time,
                end_state,
                scheduled_nodes,
                time_offset,
                job_id, 
                priority
            ])
            job_info = job_dict(nodes_required, name, cpu_trace, gpu_trace, wall_time,
                                end_state, scheduled_nodes, time_offset, job_id, priority)
            jobs.append(job_info)

    return jobs
+13 −66
Original line number Diff line number Diff line
@@ -85,76 +85,23 @@ class Job:
    """Represents a job to be scheduled and executed in the distributed computing system.

    Each job consists of various attributes such as the number of nodes required for execution,
    CPU and GPU utilization, wall time, and other relevant parameters. The job can transition
    through different states during its lifecycle, including PENDING, RUNNING, COMPLETED,
    CANCELLED, FAILED, or TIMEOUT.

    Attributes:
    - nodes_required (int): The number of nodes required for job execution.
    - name (str): A unique identifier for the job.
    - cpu_trace (list[float]): CPU utilization trace over time.
    - gpu_trace (list[float]): GPU utilization trace over time.
    - wall_time (int): The expected duration of the job's execution.
    - end_state (str): The final state of the job (e.g., "SUCCESS", "FAILURE").
    - requested_nodes (list[int]): The specific nodes requested by the job, if any.
    - submit_time (int): The time at which the job was submitted to the scheduler.
    - id (int): A unique identifier assigned to the job.
    - start_time (Optional[int]): The time at which the job started execution.
    - end_time (Optional[int]): The time at which the job completed execution.
    - running_time (int): The total time the job has been running.
    - _state (JobState): The current state of the job.

    Methods:
    - __lt__(self, other): Compares two jobs based on their wall time.
    CPU and GPU utilization, wall time, and other relevant parameters (see utils.job_dict). 
    The job can transition through different states during its lifecycle, including PENDING, 
    RUNNING, COMPLETED, CANCELLED, FAILED, or TIMEOUT.
    """
    _id_counter = 0

    def __init__(self, vector, current_time, state=JobState.PENDING):
        """Initialize a Job instance.

        Args:
            vector: A list representing job parameters.
            current_time: The current simulation time.
            state: Initial state of the job.

        Attributes:
            nodes_required: Number of nodes required for the job.
            name: Name of the job.
            cpu_trace: CPU utilization trace.
            gpu_trace: GPU utilization trace.
            wall_time: Wall time of the job.
            end_state: End state of the job.
            requested_nodes: Requested nodes for the job.
            submit_time: Submission time of the job.
            id: Unique identifier of the job.
            start_time: Start time of the job.
            end_time: End time of the job.
            running_time: Running time of the job.
            _state: Current state of the job.
            scheduled_nodes: Nodes scheduled for the job.
            power: Power consumption of the job.
            power_history: History of power consumption during the job.
        """
        self.nodes_required = vector[0]
        self.name = vector[1]
        self.cpu_trace = vector[2]
        self.gpu_trace = vector[3]
        self.wall_time = vector[4]
        self.end_state = vector[5]
        self.requested_nodes = vector[6]
        self.submit_time = vector[7]
        if vector[8]:
            self.id = vector[8]
        else:
            self.id = Job._get_next_id()
        self.priority = vector[9]
    def __init__(self, job_dict, current_time, state=JobState.PENDING):
        for key, value in job_dict.items(): setattr(self, key, value)
        if not self.id: self.id = Job._get_next_id() 
        # initializations
        self.start_time = None
        self.end_time = None
        self.running_time = 0
        self._state = state
        self.scheduled_nodes = []
        self.power = 0
        self.scheduled_nodes = []
        self.power_history = [] 
        self._state = state

    def __repr__(self):
        """Return a string representation of the job."""
@@ -278,8 +225,8 @@ class Scheduler:
    
    def schedule(self, jobs):
        """Schedule jobs."""
        for job_vector in jobs:
            job = Job(job_vector, self.current_time)
        for job_info in jobs:
            job = Job(job_info, self.current_time)
            self.add_job(job)

        while self.queue:
@@ -506,7 +453,7 @@ class Scheduler:
                job = jobs.pop(0)
                self.schedule([job])
                if jobs:
                    time_to_next_job = job[7]  # Update time to next job based on the next job's scheduled time
                    time_to_next_job = job['submit_time']  # Update time to next job based on the next job's scheduled time
                else:
                    time_to_next_job = float('inf')  # No more jobs, set to infinity or some large number to avoid triggering again
            yield self.tick()
+1 −1
Original line number Diff line number Diff line
@@ -68,7 +68,7 @@ if __name__ == "__main__":
    args_dict = vars(args)
    td = Telemetry(**args_dict)
    jobs = td.load_data(args.replay)
    timesteps = int(max(job[4] + job[7] for job in jobs))
    timesteps = int(max(job['wall_time'] + job['time_offset'] for job in jobs))

    dt_list = []
    wt_list = []
Loading