Commit 567ff14d authored by Maiterth, Matthias's avatar Maiterth, Matthias
Browse files

First try to implement the old replay scheduler. Both schedule versions raise errors.

With the given dataset both placement checks try to schedule on unavailable nodes.
This needs to be debugged before continuing and potentially merging.
parent 7ba6d4d8
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -44,7 +44,7 @@ choices = ['random', 'benchmark', 'peak', 'idle']
parser.add_argument('-w', '--workload', type=str, choices=choices, default=choices[0], help='Type of synthetic workload')

# Scheduling options
choices = ['default', 'nrel', 'anl', 'flux']
choices = ['default', 'replay', 'nrel', 'anl', 'flux']
parser.add_argument('--scheduler', type=str, choices=choices, default=choices[0], help='Name of scheduler')
policies = [policy.value for policy in PolicyType]
choices = ['prescribed', 'poisson']
+1 −3
Original line number Diff line number Diff line
@@ -220,10 +220,8 @@ class Engine:

            # Identify eligible jobs and add them to the queue.
            self.queue += self.eligible_jobs(jobs_to_submit)
            # Sort the queue according to the policy
            self.queue = self.scheduler.sort_jobs(self.queue, self.accounts)
            # Schedule jobs that are now in the queue.
            self.scheduler.schedule(self.queue, self.running, self.current_time, sorted=True)
            self.scheduler.schedule(self.queue, self.running, self.current_time, sorted=False)

            # Stop the simulation if no more jobs are running or in the queue.
            if autoshutdown and not self.queue and not self.running and not self.replay:
+70 −0
Original line number Diff line number Diff line
from ..policy import PolicyType


class Scheduler:
    """
    Mock Scheduler only considering start time.
    There is no scheduling going on but job placement according to start time.

    Default job scheduler with various scheduling policies.
    """

    def __init__(self, config, policy, resource_manager=None):
        self.config = config
        self.policy = PolicyType(policy)
        if resource_manager is None:
            raise ValueError("Scheduler requires a ResourceManager instance")
        self.resource_manager = resource_manager
        self.debug = False

    def sort_jobs(self, queue, accounts=None):
        """Sort jobs based on the selected scheduling policy."""
        return sorted(queue, key=lambda job: job.start_time)

### NOTE:
# Both schdule and schedule_v2 do not work, as the resource_manager claims nodes not available.
# This needs to be fixed.

    def schedule(self, queue, running, current_time, accounts=None, sorted=False, debug=False):
    #### DOES NOT WORK, Nodes are not available! in resrouce_manager.assign_nodes_to_job!
        # Sort the queue in place.
        if not sorted:
            queue[:] = self.sort_jobs(queue, accounts)

        # Filter Jobs with start_time in this epoch
        queue[:] = [job for job in queue if job.start_time <= current_time]

        # Iterate over a copy of the queue since we might remove items
        for job in queue[:]:
            nodes_available = set(job.requested_nodes).issubset(set(self.resource_manager.available_nodes))
            self.resource_manager.assign_nodes_to_job(job, current_time)
            running.append(job)
            queue.remove(job)
            continue

    def schedule_v2(self, queue, running, current_time, accounts=None, sorted=False, debug=False):
    #### DOES NOT WORK, Nodes are not available!
        # Sort the queue in place.
        if not sorted:
            queue[:] = self.sort_jobs(queue, accounts)

        # Filter Jobs with start_time in this epoch
        queue[:] = [job for job in queue if job.start_time <= current_time]

        for job in queue[:]:
            nodes_available = False
            if job.requested_nodes:  # nodes specified, i.e., telemetry replay
                if len(job.requested_nodes) <= len(self.resource_manager.available_nodes):
                    nodes_available = set(job.requested_nodes).issubset(set(self.resource_manager.available_nodes))
                else:
                    continue   # continue instead of break, as later job with specific nodes may still be placed!
            else:  # synthetic
                raise ValueError("No jobs requested?")

            if nodes_available:
                self.resource_manager.assign_nodes_to_job(job, current_time)
                running.append(job)
                queue.remove(job)
            else:
                raise ValueError("Nodes not available!")  # Jobs may be queued
                pass  # Try next time