Commit 43a74c7f authored by Brewer, Wes's avatar Brewer, Wes
Browse files

Merge branch 'refactor-job-vector' into 'main'

Refactor job vector

See merge request !55
parents 73ccb6bf a3b4d975
Loading
Loading
Loading
Loading
+2 −2
Original line number Diff line number Diff line
@@ -131,7 +131,7 @@ if args.replay:
    if args.time:
        timesteps = convert_to_seconds(args.time)
    else:
        timesteps = int(max(job[4] + job[7] for job in jobs)) + 1
        timesteps = int(max(job['wall_time'] + job['submit_time'] for job in jobs)) + 1

    print(f'Running simulation for {timesteps} seconds')
    time.sleep(1)
+4 −12
Original line number Diff line number Diff line
@@ -2,6 +2,7 @@ import numpy as np
import pandas as pd

from ..config import load_config_variables
from ..job import job_dict
from ..utils import power_to_utilization, next_arrival, encrypt

load_config_variables([
@@ -135,18 +136,9 @@ def load_data_from_df(jobs_df: pd.DataFrame, jobprofile_df: pd.DataFrame, **kwar
                scheduled_nodes.append(indices)

        if gpu_trace.size > 0 and (jid == job_id or jid == '*'):
            jobs.append([
                nodes_required,
                name,
                cpu_trace,
                gpu_trace,
                wall_time,
                end_state,
                scheduled_nodes,
                time_offset,
                job_id,
                0 # priority (not supported for Frontier at the moment)
            ])
            job_info = job_dict(nodes_required, name, cpu_trace, gpu_trace, wall_time, 
                                end_state, scheduled_nodes, time_offset, job_id)
            jobs.append(job_info)

    return jobs

+4 −13
Original line number Diff line number Diff line
@@ -2,6 +2,7 @@ import uuid
import pandas as pd

from ..config import load_config_variables
from ..job import job_dict
from ..utils import power_to_utilization, next_arrival

load_config_variables([
@@ -130,19 +131,9 @@ def load_data_from_df(jobs_df: pd.DataFrame, **kwargs):
        else: # Prescribed replay
            scheduled_nodes = (jobs_df.loc[i, 'nodes']).tolist()
            
        # if gpu_trace.size > 0 and (jid == job_id or jid == '*'):
        if (gpu_trace.size > 0):
            jobs.append([
                nodes_required,
                name,
                cpu_trace,
                gpu_trace,
                wall_time,
                end_state,
                scheduled_nodes,
                time_offset,
                job_id, 
                priority
            ])
            job_info = job_dict(nodes_required, name, cpu_trace, gpu_trace, wall_time,
                                end_state, scheduled_nodes, time_offset, job_id, priority)
            jobs.append(job_info)

    return jobs

raps/job.py

0 → 100644
+89 −0
Original line number Diff line number Diff line
from enum import Enum

def job_dict(nodes_required, name, cpu_trace, gpu_trace, wall_time, \
             end_state, scheduled_nodes, time_offset, job_id, priority=0):
    """ Return job info dictionary """
    return {
        'nodes_required': nodes_required,
        'name': name,
        'cpu_trace': cpu_trace,
        'gpu_trace': gpu_trace,
        'wall_time': wall_time,
        'end_state': end_state,
        'requested_nodes': scheduled_nodes,
        'submit_time': time_offset,
        'id': job_id,
        'priority': priority
    }


class JobState(Enum):
    """Enumeration for job states."""
    RUNNING = 'R'
    PENDING = 'PD'
    COMPLETED = 'C'
    CANCELLED = 'CA'
    FAILED = 'F'
    TIMEOUT = 'TO'


class Job:
    """Represents a job to be scheduled and executed in the distributed computing system.

    Each job consists of various attributes such as the number of nodes required for execution,
    CPU and GPU utilization, wall time, and other relevant parameters (see utils.job_dict). 
    The job can transition through different states during its lifecycle, including PENDING, 
    RUNNING, COMPLETED, CANCELLED, FAILED, or TIMEOUT.
    """
    _id_counter = 0

    def __init__(self, job_dict, current_time, state=JobState.PENDING):
        for key, value in job_dict.items(): setattr(self, key, value)
        if not self.id: self.id = Job._get_next_id() 
        # initializations
        self.start_time = None
        self.end_time = None
        self.running_time = 0
        self.power = 0
        self.scheduled_nodes = []
        self.power_history = [] 
        self._state = state

    def __repr__(self):
        """Return a string representation of the job."""
        return (f"Job(id={self.id}, name={self.name}, nodes_required={self.nodes_required}, "
                f"cpu_trace={self.cpu_trace}, gpu_trace={self.gpu_trace}, wall_time={self.wall_time}, "
                f"end_state={self.end_state}, requested_nodes={self.requested_nodes}, "
                f"submit_time={self.submit_time}, start_time={self.start_time}, "
                f"end_time={self.end_time}, running_time={self.running_time}, state={self._state}, "
                f"scheduled_nodes={self.scheduled_nodes}, power={self.power}, "
                f"power_history={self.power_history})")

    @property
    def state(self):
        """Get the current state of the job."""
        return self._state

    @state.setter
    def state(self, value):
        """Set the state of the job."""
        if isinstance(value, JobState):
            self._state = value
        elif isinstance(value, str) and value in JobState.__members__:
            self._state = JobState[value]
        else:
            raise ValueError(f"Invalid state: {value}")

    @classmethod
    def _get_next_id(cls):
        """Generate the next unique identifier for a job.

        This method is used internally to generate a unique identifier for each job
        based on the current value of the class's _id_counter attribute. Each time
        this method is called, it increments the counter by 1 and returns the new value.

        Returns:
        - int: The next unique identifier for a job.
        """
        cls._id_counter += 1
        return cls._id_counter
+5 −132
Original line number Diff line number Diff line
@@ -39,18 +39,16 @@ Constants:
This module can be used to simulate job scheduling algorithms, analyze system behavior, and
optimize resource utilization in distributed computing environments.
"""
from enum import Enum
from typing import Optional
import heapq
import dataclasses
import numpy as np

from scipy.stats import weibull_min
import pandas as pd

from .utils import summarize_ranges, expand_ranges

from .config import load_config_variables
from .job import Job, JobState
from .utils import summarize_ranges, expand_ranges

load_config_variables([
    'TRACE_QUANTA',
@@ -71,131 +69,6 @@ load_config_variables([
], globals())


class JobState(Enum):
    """Enumeration for job states."""
    RUNNING = 'R'
    PENDING = 'PD'
    COMPLETED = 'C'
    CANCELLED = 'CA'
    FAILED = 'F'
    TIMEOUT = 'TO'


class Job:
    """Represents a job to be scheduled and executed in the distributed computing system.

    Each job consists of various attributes such as the number of nodes required for execution,
    CPU and GPU utilization, wall time, and other relevant parameters. The job can transition
    through different states during its lifecycle, including PENDING, RUNNING, COMPLETED,
    CANCELLED, FAILED, or TIMEOUT.

    Attributes:
    - nodes_required (int): The number of nodes required for job execution.
    - name (str): A unique identifier for the job.
    - cpu_trace (list[float]): CPU utilization trace over time.
    - gpu_trace (list[float]): GPU utilization trace over time.
    - wall_time (int): The expected duration of the job's execution.
    - end_state (str): The final state of the job (e.g., "SUCCESS", "FAILURE").
    - requested_nodes (list[int]): The specific nodes requested by the job, if any.
    - submit_time (int): The time at which the job was submitted to the scheduler.
    - id (int): A unique identifier assigned to the job.
    - start_time (Optional[int]): The time at which the job started execution.
    - end_time (Optional[int]): The time at which the job completed execution.
    - running_time (int): The total time the job has been running.
    - _state (JobState): The current state of the job.

    Methods:
    - __lt__(self, other): Compares two jobs based on their wall time.
    """
    _id_counter = 0

    def __init__(self, vector, current_time, state=JobState.PENDING):
        """Initialize a Job instance.

        Args:
            vector: A list representing job parameters.
            current_time: The current simulation time.
            state: Initial state of the job.

        Attributes:
            nodes_required: Number of nodes required for the job.
            name: Name of the job.
            cpu_trace: CPU utilization trace.
            gpu_trace: GPU utilization trace.
            wall_time: Wall time of the job.
            end_state: End state of the job.
            requested_nodes: Requested nodes for the job.
            submit_time: Submission time of the job.
            id: Unique identifier of the job.
            start_time: Start time of the job.
            end_time: End time of the job.
            running_time: Running time of the job.
            _state: Current state of the job.
            scheduled_nodes: Nodes scheduled for the job.
            power: Power consumption of the job.
            power_history: History of power consumption during the job.
        """
        self.nodes_required = vector[0]
        self.name = vector[1]
        self.cpu_trace = vector[2]
        self.gpu_trace = vector[3]
        self.wall_time = vector[4]
        self.end_state = vector[5]
        self.requested_nodes = vector[6]
        self.submit_time = vector[7]
        if vector[8]:
            self.id = vector[8]
        else:
            self.id = Job._get_next_id()
        self.priority = vector[9]
        self.start_time = None
        self.end_time = None
        self.running_time = 0
        self._state = state
        self.scheduled_nodes = []
        self.power = 0
        self.power_history = []

    def __repr__(self):
        """Return a string representation of the job."""
        return (f"Job(id={self.id}, name={self.name}, nodes_required={self.nodes_required}, "
                f"cpu_trace={self.cpu_trace}, gpu_trace={self.gpu_trace}, wall_time={self.wall_time}, "
                f"end_state={self.end_state}, requested_nodes={self.requested_nodes}, "
                f"submit_time={self.submit_time}, start_time={self.start_time}, "
                f"end_time={self.end_time}, running_time={self.running_time}, state={self._state}, "
                f"scheduled_nodes={self.scheduled_nodes}, power={self.power}, "
                f"power_history={self.power_history})")

    @property
    def state(self):
        """Get the current state of the job."""
        return self._state

    @state.setter
    def state(self, value):
        """Set the state of the job."""
        if isinstance(value, JobState):
            self._state = value
        elif isinstance(value, str) and value in JobState.__members__:
            self._state = JobState[value]
        else:
            raise ValueError(f"Invalid state: {value}")

    @classmethod
    def _get_next_id(cls):
        """Generate the next unique identifier for a job.

        This method is used internally to generate a unique identifier for each job
        based on the current value of the class's _id_counter attribute. Each time
        this method is called, it increments the counter by 1 and returns the new value.

        Returns:
        - int: The next unique identifier for a job.
        """
        cls._id_counter += 1
        return cls._id_counter


@dataclasses.dataclass
class TickData:
    """ Represents the state output from the simulation each tick """
@@ -278,8 +151,8 @@ class Scheduler:
    
    def schedule(self, jobs):
        """Schedule jobs."""
        for job_vector in jobs:
            job = Job(job_vector, self.current_time)
        for job_info in jobs:
            job = Job(job_info, self.current_time)
            self.add_job(job)

        while self.queue:
@@ -506,7 +379,7 @@ class Scheduler:
                job = jobs.pop(0)
                self.schedule([job])
                if jobs:
                    time_to_next_job = job[7]  # Update time to next job based on the next job's scheduled time
                    time_to_next_job = job['submit_time']  # Update time to next job based on the next job's scheduled time
                else:
                    time_to_next_job = float('inf')  # No more jobs, set to infinity or some large number to avoid triggering again
            yield self.tick()
Loading