Loading raps/dataloaders/frontier.py +2 −1 Original line number Diff line number Diff line Loading @@ -2,7 +2,8 @@ import numpy as np import pandas as pd from ..config import load_config_variables from ..utils import power_to_utilization, next_arrival, encrypt, job_dict from ..job import job_dict from ..utils import power_to_utilization, next_arrival, encrypt load_config_variables([ 'CPUS_PER_NODE', Loading raps/dataloaders/marconi100.py +2 −1 Original line number Diff line number Diff line Loading @@ -2,7 +2,8 @@ import uuid import pandas as pd from ..config import load_config_variables from ..utils import power_to_utilization, next_arrival, job_dict from ..job import job_dict from ..utils import power_to_utilization, next_arrival load_config_variables([ 'CPUS_PER_NODE', Loading raps/job.py 0 → 100644 +89 −0 Original line number Diff line number Diff line from enum import Enum def job_dict(nodes_required, name, cpu_trace, gpu_trace, wall_time, \ end_state, scheduled_nodes, time_offset, job_id, priority=0): """ Return job info dictionary """ return { 'nodes_required': nodes_required, 'name': name, 'cpu_trace': cpu_trace, 'gpu_trace': gpu_trace, 'wall_time': wall_time, 'end_state': end_state, 'requested_nodes': scheduled_nodes, 'submit_time': time_offset, 'id': job_id, 'priority': priority } class JobState(Enum): """Enumeration for job states.""" RUNNING = 'R' PENDING = 'PD' COMPLETED = 'C' CANCELLED = 'CA' FAILED = 'F' TIMEOUT = 'TO' class Job: """Represents a job to be scheduled and executed in the distributed computing system. Each job consists of various attributes such as the number of nodes required for execution, CPU and GPU utilization, wall time, and other relevant parameters (see utils.job_dict). The job can transition through different states during its lifecycle, including PENDING, RUNNING, COMPLETED, CANCELLED, FAILED, or TIMEOUT. """ _id_counter = 0 def __init__(self, job_dict, current_time, state=JobState.PENDING): for key, value in job_dict.items(): setattr(self, key, value) if not self.id: self.id = Job._get_next_id() # initializations self.start_time = None self.end_time = None self.running_time = 0 self.power = 0 self.scheduled_nodes = [] self.power_history = [] self._state = state def __repr__(self): """Return a string representation of the job.""" return (f"Job(id={self.id}, name={self.name}, nodes_required={self.nodes_required}, " f"cpu_trace={self.cpu_trace}, gpu_trace={self.gpu_trace}, wall_time={self.wall_time}, " f"end_state={self.end_state}, requested_nodes={self.requested_nodes}, " f"submit_time={self.submit_time}, start_time={self.start_time}, " f"end_time={self.end_time}, running_time={self.running_time}, state={self._state}, " f"scheduled_nodes={self.scheduled_nodes}, power={self.power}, " f"power_history={self.power_history})") @property def state(self): """Get the current state of the job.""" return self._state @state.setter def state(self, value): """Set the state of the job.""" if isinstance(value, JobState): self._state = value elif isinstance(value, str) and value in JobState.__members__: self._state = JobState[value] else: raise ValueError(f"Invalid state: {value}") @classmethod def _get_next_id(cls): """Generate the next unique identifier for a job. This method is used internally to generate a unique identifier for each job based on the current value of the class's _id_counter attribute. Each time this method is called, it increments the counter by 1 and returns the new value. Returns: - int: The next unique identifier for a job. """ cls._id_counter += 1 return cls._id_counter raps/scheduler.py +2 −76 Original line number Diff line number Diff line Loading @@ -39,18 +39,16 @@ Constants: This module can be used to simulate job scheduling algorithms, analyze system behavior, and optimize resource utilization in distributed computing environments. """ from enum import Enum from typing import Optional import heapq import dataclasses import numpy as np from scipy.stats import weibull_min import pandas as pd from .utils import summarize_ranges, expand_ranges from .config import load_config_variables from .job import Job, JobState from .utils import summarize_ranges, expand_ranges load_config_variables([ 'TRACE_QUANTA', Loading @@ -71,78 +69,6 @@ load_config_variables([ ], globals()) class JobState(Enum): """Enumeration for job states.""" RUNNING = 'R' PENDING = 'PD' COMPLETED = 'C' CANCELLED = 'CA' FAILED = 'F' TIMEOUT = 'TO' class Job: """Represents a job to be scheduled and executed in the distributed computing system. Each job consists of various attributes such as the number of nodes required for execution, CPU and GPU utilization, wall time, and other relevant parameters (see utils.job_dict). The job can transition through different states during its lifecycle, including PENDING, RUNNING, COMPLETED, CANCELLED, FAILED, or TIMEOUT. """ _id_counter = 0 def __init__(self, job_dict, current_time, state=JobState.PENDING): for key, value in job_dict.items(): setattr(self, key, value) if not self.id: self.id = Job._get_next_id() # initializations self.start_time = None self.end_time = None self.running_time = 0 self.power = 0 self.scheduled_nodes = [] self.power_history = [] self._state = state def __repr__(self): """Return a string representation of the job.""" return (f"Job(id={self.id}, name={self.name}, nodes_required={self.nodes_required}, " f"cpu_trace={self.cpu_trace}, gpu_trace={self.gpu_trace}, wall_time={self.wall_time}, " f"end_state={self.end_state}, requested_nodes={self.requested_nodes}, " f"submit_time={self.submit_time}, start_time={self.start_time}, " f"end_time={self.end_time}, running_time={self.running_time}, state={self._state}, " f"scheduled_nodes={self.scheduled_nodes}, power={self.power}, " f"power_history={self.power_history})") @property def state(self): """Get the current state of the job.""" return self._state @state.setter def state(self, value): """Set the state of the job.""" if isinstance(value, JobState): self._state = value elif isinstance(value, str) and value in JobState.__members__: self._state = JobState[value] else: raise ValueError(f"Invalid state: {value}") @classmethod def _get_next_id(cls): """Generate the next unique identifier for a job. This method is used internally to generate a unique identifier for each job based on the current value of the class's _id_counter attribute. Each time this method is called, it increments the counter by 1 and returns the new value. Returns: - int: The next unique identifier for a job. """ cls._id_counter += 1 return cls._id_counter @dataclasses.dataclass class TickData: """ Represents the state output from the simulation each tick """ Loading raps/utils.py +0 −16 Original line number Diff line number Diff line Loading @@ -356,19 +356,3 @@ def write_dict_to_file(dictionary, file_path): file.write("}\n") else: file.write(f"{key}: {value}\n") def job_dict(nodes_required, name, cpu_trace, gpu_trace, wall_time, \ end_state, scheduled_nodes, time_offset, job_id, priority=0): """ Return job info dictionary """ return { 'nodes_required': nodes_required, 'name': name, 'cpu_trace': cpu_trace, 'gpu_trace': gpu_trace, 'wall_time': wall_time, 'end_state': end_state, 'requested_nodes': scheduled_nodes, 'submit_time': time_offset, 'id': job_id, 'priority': priority } Loading
raps/dataloaders/frontier.py +2 −1 Original line number Diff line number Diff line Loading @@ -2,7 +2,8 @@ import numpy as np import pandas as pd from ..config import load_config_variables from ..utils import power_to_utilization, next_arrival, encrypt, job_dict from ..job import job_dict from ..utils import power_to_utilization, next_arrival, encrypt load_config_variables([ 'CPUS_PER_NODE', Loading
raps/dataloaders/marconi100.py +2 −1 Original line number Diff line number Diff line Loading @@ -2,7 +2,8 @@ import uuid import pandas as pd from ..config import load_config_variables from ..utils import power_to_utilization, next_arrival, job_dict from ..job import job_dict from ..utils import power_to_utilization, next_arrival load_config_variables([ 'CPUS_PER_NODE', Loading
raps/job.py 0 → 100644 +89 −0 Original line number Diff line number Diff line from enum import Enum def job_dict(nodes_required, name, cpu_trace, gpu_trace, wall_time, \ end_state, scheduled_nodes, time_offset, job_id, priority=0): """ Return job info dictionary """ return { 'nodes_required': nodes_required, 'name': name, 'cpu_trace': cpu_trace, 'gpu_trace': gpu_trace, 'wall_time': wall_time, 'end_state': end_state, 'requested_nodes': scheduled_nodes, 'submit_time': time_offset, 'id': job_id, 'priority': priority } class JobState(Enum): """Enumeration for job states.""" RUNNING = 'R' PENDING = 'PD' COMPLETED = 'C' CANCELLED = 'CA' FAILED = 'F' TIMEOUT = 'TO' class Job: """Represents a job to be scheduled and executed in the distributed computing system. Each job consists of various attributes such as the number of nodes required for execution, CPU and GPU utilization, wall time, and other relevant parameters (see utils.job_dict). The job can transition through different states during its lifecycle, including PENDING, RUNNING, COMPLETED, CANCELLED, FAILED, or TIMEOUT. """ _id_counter = 0 def __init__(self, job_dict, current_time, state=JobState.PENDING): for key, value in job_dict.items(): setattr(self, key, value) if not self.id: self.id = Job._get_next_id() # initializations self.start_time = None self.end_time = None self.running_time = 0 self.power = 0 self.scheduled_nodes = [] self.power_history = [] self._state = state def __repr__(self): """Return a string representation of the job.""" return (f"Job(id={self.id}, name={self.name}, nodes_required={self.nodes_required}, " f"cpu_trace={self.cpu_trace}, gpu_trace={self.gpu_trace}, wall_time={self.wall_time}, " f"end_state={self.end_state}, requested_nodes={self.requested_nodes}, " f"submit_time={self.submit_time}, start_time={self.start_time}, " f"end_time={self.end_time}, running_time={self.running_time}, state={self._state}, " f"scheduled_nodes={self.scheduled_nodes}, power={self.power}, " f"power_history={self.power_history})") @property def state(self): """Get the current state of the job.""" return self._state @state.setter def state(self, value): """Set the state of the job.""" if isinstance(value, JobState): self._state = value elif isinstance(value, str) and value in JobState.__members__: self._state = JobState[value] else: raise ValueError(f"Invalid state: {value}") @classmethod def _get_next_id(cls): """Generate the next unique identifier for a job. This method is used internally to generate a unique identifier for each job based on the current value of the class's _id_counter attribute. Each time this method is called, it increments the counter by 1 and returns the new value. Returns: - int: The next unique identifier for a job. """ cls._id_counter += 1 return cls._id_counter
raps/scheduler.py +2 −76 Original line number Diff line number Diff line Loading @@ -39,18 +39,16 @@ Constants: This module can be used to simulate job scheduling algorithms, analyze system behavior, and optimize resource utilization in distributed computing environments. """ from enum import Enum from typing import Optional import heapq import dataclasses import numpy as np from scipy.stats import weibull_min import pandas as pd from .utils import summarize_ranges, expand_ranges from .config import load_config_variables from .job import Job, JobState from .utils import summarize_ranges, expand_ranges load_config_variables([ 'TRACE_QUANTA', Loading @@ -71,78 +69,6 @@ load_config_variables([ ], globals()) class JobState(Enum): """Enumeration for job states.""" RUNNING = 'R' PENDING = 'PD' COMPLETED = 'C' CANCELLED = 'CA' FAILED = 'F' TIMEOUT = 'TO' class Job: """Represents a job to be scheduled and executed in the distributed computing system. Each job consists of various attributes such as the number of nodes required for execution, CPU and GPU utilization, wall time, and other relevant parameters (see utils.job_dict). The job can transition through different states during its lifecycle, including PENDING, RUNNING, COMPLETED, CANCELLED, FAILED, or TIMEOUT. """ _id_counter = 0 def __init__(self, job_dict, current_time, state=JobState.PENDING): for key, value in job_dict.items(): setattr(self, key, value) if not self.id: self.id = Job._get_next_id() # initializations self.start_time = None self.end_time = None self.running_time = 0 self.power = 0 self.scheduled_nodes = [] self.power_history = [] self._state = state def __repr__(self): """Return a string representation of the job.""" return (f"Job(id={self.id}, name={self.name}, nodes_required={self.nodes_required}, " f"cpu_trace={self.cpu_trace}, gpu_trace={self.gpu_trace}, wall_time={self.wall_time}, " f"end_state={self.end_state}, requested_nodes={self.requested_nodes}, " f"submit_time={self.submit_time}, start_time={self.start_time}, " f"end_time={self.end_time}, running_time={self.running_time}, state={self._state}, " f"scheduled_nodes={self.scheduled_nodes}, power={self.power}, " f"power_history={self.power_history})") @property def state(self): """Get the current state of the job.""" return self._state @state.setter def state(self, value): """Set the state of the job.""" if isinstance(value, JobState): self._state = value elif isinstance(value, str) and value in JobState.__members__: self._state = JobState[value] else: raise ValueError(f"Invalid state: {value}") @classmethod def _get_next_id(cls): """Generate the next unique identifier for a job. This method is used internally to generate a unique identifier for each job based on the current value of the class's _id_counter attribute. Each time this method is called, it increments the counter by 1 and returns the new value. Returns: - int: The next unique identifier for a job. """ cls._id_counter += 1 return cls._id_counter @dataclasses.dataclass class TickData: """ Represents the state output from the simulation each tick """ Loading
raps/utils.py +0 −16 Original line number Diff line number Diff line Loading @@ -356,19 +356,3 @@ def write_dict_to_file(dictionary, file_path): file.write("}\n") else: file.write(f"{key}: {value}\n") def job_dict(nodes_required, name, cpu_trace, gpu_trace, wall_time, \ end_state, scheduled_nodes, time_offset, job_id, priority=0): """ Return job info dictionary """ return { 'nodes_required': nodes_required, 'name': name, 'cpu_trace': cpu_trace, 'gpu_trace': gpu_trace, 'wall_time': wall_time, 'end_state': end_state, 'requested_nodes': scheduled_nodes, 'submit_time': time_offset, 'id': job_id, 'priority': priority }