Commit 576704a3 authored by Brewer, Wes's avatar Brewer, Wes
Browse files

Clean up code... remove aging as its not ready for prime time

parent f97c81a3
Loading
Loading
Loading
Loading
+0 −1
Original line number Diff line number Diff line
{
    "SEED": 42,
    "MAX_PRIORITY": 500000,
    "JOB_ARRIVAL_TIME": 20,
    "MTBF": 11,
    "MAX_TIME": 88200,
+3 −6
Original line number Diff line number Diff line
@@ -24,7 +24,7 @@ parser.add_argument('-e', '--encrypt', action='store_true', help='Encrypt any se
parser.add_argument('-n', '--numjobs', type=int, default=1000, help='Number of jobs to schedule')
parser.add_argument('-t', '--time', type=str, default=None, help='Length of time to simulate, e.g., 123, 123s, 27m, 3h, 7d')
parser.add_argument('-v', '--verbose', action='store_true', help='Enable verbose output')
parser.add_argument('-s', '--seed', action='store_true', help='Set random number seed for deterministic simulation')
parser.add_argument('--seed', action='store_true', help='Set random number seed for deterministic simulation')
parser.add_argument('-f', '--replay', nargs='+', type=str, help='Either: path/to/joblive path/to/jobprofile' + \
                                                                ' -or- filename.npz (overrides --workload option)')
parser.add_argument('--reschedule', action='store_true', help='Reschedule the telemetry workload')
@@ -39,8 +39,8 @@ parser.add_argument('-p', '--plot', nargs='+', choices=['power', 'loss', 'pue',
choices = ['png', 'svg', 'jpg', 'pdf', 'eps']
parser.add_argument('--imtype', type=str, choices=choices, default=choices[0], help='Plot image type')
parser.add_argument('--system', type=str, default='frontier', help='System config to use')
choices = ['fcfs', 'sjf', 'prq', 'prq+ag']
parser.add_argument('--schedule', type=str, choices=choices, default=choices[0], help='Type of schedule to use')
choices = ['fcfs', 'sjf', 'prq']
parser.add_argument('-s', '--schedule', type=str, choices=choices, default=choices[0], help='Type of schedule to use')
choices = ['random', 'benchmark', 'peak', 'idle']
parser.add_argument('-w', '--workload', type=str, choices=choices, default=choices[0], help='Type of synthetic workload')
choices = ['layout1', 'layout2']
@@ -132,9 +132,6 @@ if args.replay:
else:
    wl = Workload()
    jobs = getattr(wl, args.workload)(num_jobs=args.numjobs)
    # jobs = getattr(wl, "random")(num_jobs=args.numjobs)
    # jobs2 = getattr(wl, "peak")(num_jobs=args.numjobs)
    # jobs.insert(1, jobs2[0])

    if args.verbose:
        for job_vector in jobs:
+2 −1
Original line number Diff line number Diff line
@@ -145,7 +145,8 @@ def load_data_from_df(jobs_df: pd.DataFrame, jobprofile_df: pd.DataFrame, **kwar
                end_state,
                scheduled_nodes,
                time_offset,
                job_id
                job_id,
                0 # priority (not supported for Frontier at the moment)
            ])

    return jobs
+8 −43
Original line number Diff line number Diff line
@@ -155,7 +155,6 @@ class Job:
        self.scheduled_nodes = []
        self.power = 0
        self.power_history = []
        self.wait_time = 0

    def __repr__(self):
        """Return a string representation of the job."""
@@ -182,11 +181,6 @@ class Job:
        else:
            raise ValueError(f"Invalid state: {value}")

    # def __lt__(self, other):
    #     """Implement less than comparison for jobs based on wall time."""
    #     #return self.end_time < other.end_time # First-come, First-served (FCFS)
    #     return self.wall_time < other.wall_time  # Shortest-job-first (SJF)

    @classmethod
    def _get_next_id(cls):
        """Generate the next unique identifier for a job.
@@ -262,8 +256,8 @@ class Scheduler:
        self.fmu_results = None
        self.debug = kwargs.get('debug')
        self.output = kwargs.get('output')
        self.schedule_method = kwargs.get('schedule')
        self.replay = kwargs.get('replay')
        self.policy = kwargs.get('schedule')
        self.sys_util_history = []

    def add_job(self, job):
@@ -271,45 +265,27 @@ class Scheduler:
        self._sort_queue()

    def _sort_queue(self):
        if self.schedule_method == 'fcfs':
            self.queue.sort(key=lambda job: (job.submit_time + job.id))
        elif self.schedule_method == 'sjf':
        if self.policy == 'fcfs':
            self.queue.sort(key=lambda job: job.submit_time)
        elif self.policy == 'sjf':
            self.queue.sort(key=lambda job: job.wall_time)
        elif self.schedule_method == 'prq':
        elif self.policy == 'prq':
            self.queue.sort(key=lambda job: -job.priority)
        elif self.schedule_method == 'prq+ag':
            self.queue.sort(key=lambda job: -(job.priority+job.wait_time))
    
    def increment_deferred_jobs(self, tasks):
        for task in tasks:
            if task.wait_time != 0:
                task.wait_time += 1
        return tasks
        else:
            raise ValueError(f"The scheduling policy {self.policy} is not supported.")
    
    def schedule(self, jobs):
        """Schedule jobs."""
        for job_vector in jobs:
            job = Job(job_vector, self.current_time)
            # heapq.heappush(self.queue, job)
            self.add_job(job)
        #if self.debug:
        #    print(f"\nt={self.current_time} queue={self.queue} heapq={heapq}")

        while self.queue:

            # job = heapq.heappop(self.queue)
            # print("type(self.queue) is" + str(type(self.queue)))
            # submit_times = [job.submit_time for job in self.queue]
            # print(submit_times)
            # print("self.queue: ", len(self.queue))
            # priorities = [job.priority for job in self.queue]
            # print("priorities: ", priorities)
            # priorities_aging = [(job.priority+job.wait_time) for job in self.queue]
            # print("priorities: ", priorities_aging)
            job = self.queue.pop(0)
            synthetic_bool = len(self.available_nodes) >= job.nodes_required
            telemetry_bool = job.requested_nodes and job.requested_nodes[0] in self.available_nodes
            self.increment_deferred_jobs(self.queue)

            if synthetic_bool or telemetry_bool:

                if job.requested_nodes:
@@ -336,13 +312,6 @@ class Scheduler:
                    print(f"t={self.current_time}: Scheduled job with wall time {job.wall_time} on nodes {scheduled_nodes}")

            else:
                # heapq.heappush(self.queue, job)
                # self.add_job(job)
                # print("Printing job", job)
                # print("Printing job type", type(job))
                # Increment wait_time for all jobs in the queue
                job.wait_time += 1
                # print("Job wait time is ", job.wait_time)
                self.queue.append(job)
                break

@@ -380,10 +349,6 @@ class Scheduler:
                            self.available_nodes.append(node)
                    self.available_nodes.sort()

                    # Keep the job in the queue for visibility
                    # heapq.heappush(self.queue, job)
                    

                    # Remove job from the list of running jobs
                    self.running.remove(job)

+3 −2
Original line number Diff line number Diff line
@@ -34,8 +34,7 @@ from .config import load_config_variables
load_config_variables([
    'TRACE_QUANTA', 'MAX_NODES_PER_JOB', 'JOB_NAMES', 'CPUS_PER_NODE',\
    'GPUS_PER_NODE', 'MAX_WALL_TIME', 'MIN_WALL_TIME', 'JOB_END_PROBS',\
    'AVAILABLE_NODES', 'MAX_PRIORITY'
], globals())
    'AVAILABLE_NODES' ], globals())

JOB_NAMES = ["LAMMPS", "GROMACS", "VASP", "Quantum ESPRESSO", "NAMD",\
             "OpenFOAM", "WRF", "AMBER", "CP2K", "nek5000", "CHARMM",\
@@ -44,6 +43,8 @@ JOB_NAMES = ["LAMMPS", "GROMACS", "VASP", "Quantum ESPRESSO", "NAMD",\
             "TensorFlow", "PyTorch", "BLAST", "Spark", "GAMESS",\
             "ORCA", "Simulink", "MOOSE", "ELK"]

MAX_PRIORITY = 500000

from .utils import truncated_normalvariate, determine_state, next_arrival