Commit 62ac0370 authored by Maiterth, Matthias's avatar Maiterth, Matthias
Browse files

Third try for scheduleflow.

Using the Events directly.
Simulation slows down drastically after over 90 jobs are quued and simulations time is 3+h.
parent 59768b6a
Loading
Loading
Loading
Loading
+8 −6
Original line number Diff line number Diff line
@@ -63,12 +63,6 @@ else:
args_dict['config'] = config
flops_manager = FLOPSManager(**args_dict)

sc = Engine(
    power_manager=power_manager,
    flops_manager=flops_manager,
    cooling_model=cooling_model,
    **args_dict,
)

timestep_start = 0
if args.fastforward:
@@ -145,6 +139,14 @@ else: # Synthetic jobs

    DIR_NAME = create_casename()

sc = Engine(
    power_manager=power_manager,
    flops_manager=flops_manager,
    cooling_model=cooling_model,
    jobs=jobs,
    **args_dict,
)

OPATH = OUTPUT_PATH / DIR_NAME
print("Output directory is: ", OPATH)
sc.opath = OPATH
+7 −6
Original line number Diff line number Diff line
@@ -34,7 +34,7 @@ class TickData:
class Engine:
    """Job scheduling simulation engine."""

    def __init__(self, *, power_manager, flops_manager, cooling_model=None, config, **kwargs):
    def __init__(self, *, power_manager, flops_manager, cooling_model=None, config, jobs=None, **kwargs):
        self.config = config
        self.down_nodes = summarize_ranges(self.config['DOWN_NODES'])
        self.resource_manager = ResourceManager(
@@ -66,11 +66,12 @@ class Engine:
            config=self.config,
            policy=kwargs.get('policy'),
            bfpolicy=kwargs.get('backfill'),
            resource_manager=self.resource_manager
            resource_manager=self.resource_manager,
            jobs=jobs
        )
        print(f"Using scheduler: {str(self.scheduler.__class__).split('.')[2]}"\
              f", with policy {self.scheduler.policy.value} "\
              f"and backfill {self.scheduler.bfpolicy.value}")
              f", with policy {self.scheduler.policy} "\
              f"and backfill {self.scheduler.bfpolicy}")


    def add_running_jobs_to_queue(self, jobs_to_submit: List):
@@ -315,11 +316,11 @@ class Engine:
        # Process jobs in batches for better performance of timestep loop
        all_jobs = jobs.copy()
        jobs = []
        # Batch Jobs into 6h windows based on submit_time
        batch_window = 60 * 60 * 6  # 6h

        for timestep in range(timestep_start,timestep_end):

            # Batch Jobs into 6h windows based on submit_time
            batch_window = 60 * 60 * 6  # 6h
            if (timestep % batch_window == 0) or (timestep == timestep_start):
                # Add jobs that are within the batching window and remove them from all jobs
                jobs += [job for job in all_jobs if job['submit_time'] <= timestep + batch_window]
+5 −5
Original line number Diff line number Diff line
@@ -10,9 +10,9 @@ Implementing such using something like:
    job = SimpleNamespace(**job_dict(...))
"""

def job_dict(nodes_required, name, account, \
def job_dict(*,nodes_required, name, account, \
             cpu_trace, gpu_trace, ntx_trace, nrx_trace, \
             end_state, scheduled_nodes, job_id, priority=0, partition=0,
             end_state, scheduled_nodes=None, id, priority=0, partition=0,
             submit_time=0, time_limit=0, start_time=0, end_time=0,
             wall_time=0, trace_time=0, trace_start_time=0,trace_end_time=0, trace_missing_values=False):
    """ Return job info dictionary """
@@ -26,7 +26,7 @@ def job_dict(nodes_required, name, account, \
        'nrx_trace': nrx_trace,
        'end_state': end_state,
        'requested_nodes': scheduled_nodes,
        'id': job_id,
        'id': id,
        'priority': priority,
        'partition': partition,
        # Times:
@@ -87,7 +87,7 @@ class Job:
        for key, value in job_dict.items():
            setattr(self, key, value)
        # In any case: provide a job_id!
        if not self.id:
        if self.id is None:  # This is wrong
            self.id = Job._get_next_id()

        if self.scheduled_nodes and self.nodes_required == 0:
+1 −1
Original line number Diff line number Diff line
@@ -10,7 +10,7 @@ from ..policy import PolicyType, BackfillType
class Scheduler:
    """ Default job scheduler with various scheduling policies. """

    def __init__(self, config, policy, bfpolicy=None, resource_manager=None):
    def __init__(self, config, policy, bfpolicy=None, jobs=None, resource_manager=None):
        self.config = config
        if policy is None:  # policy is passed as policy=None, therefore default is not choosen
            policy = "replay"
+166 −68
Original line number Diff line number Diff line
from raps.job import Job, JobState
from raps.job import JobState
from raps.utils import summarize_ranges
from third_party.ScheduleFlow import ScheduleFlow
from third_party.ScheduleFlow import _intScheduleFlow
from third_party.ScheduleFlow._intScheduleFlow import EventType
from ..job import job_dict

class SFJob:
    def __init__(self, job_info):
        """Map RAPS job object to ScheduleFlow"""
        self.job_id = job_info['id']
        self.nodes = job_info['nodes_required']
        self.walltime = job_info['wall_time']
        self.requested_walltimes = None
        self.submission_time = job_info['submit_time']
        self.name = job_info['name']
        self.priority = job_info['priority']
        self.resubmit_factor = -1

    def __hash__(self):
        return hash(self.job_id)

    def __eq__(self, other):
        return isinstance(other, SFJob) and self.id == other.id

    def __repr__(self):
        return f"SFJob(id={self.job_id}, nodes={self.nodes}, wall_time={self.walltime})"

class Scheduler:
    """
@@ -33,14 +15,27 @@ class Scheduler:
    routines, then updates the RAPS job objects accordingly.
    """

    def __init__(self, config, policy, resource_manager):
    def __init__(self, config, policy, bfpolicy, resource_manager, jobs):
        self.sorted_priorities = sorted([x['priority'] for x in jobs])
        num_prios = len(self.sorted_priorities)
        # self.sf_queue = []
        self.queue = []  # track submitted jobs
        self.config = config
        self.policy = policy
        self.bfpolicy = bfpolicy
        self.resource_manager = resource_manager
        self.sf_scheduler = ScheduleFlow.Scheduler(
            ScheduleFlow.System(config['TOTAL_NODES']),
            priorityLevels=3,
            priorityLevels=num_prios,
        )
        self._sf_runtime = _intScheduleFlow.Runtime([])
        self._sf_runtime.scheduler = self.sf_scheduler
        # self.sf_time = -1
        self.sf_submitted_list = []  # list of sf_apps
        # self.sf_start_list = []  # list as returned from sf_scheduler.submit_job
        # self.sf_end_list = []  # list as returned from sf_scheduler.start_job
        # self.sf_action_list = []  # list as returned from sf_scheduler.stop_job


    def sort_jobs(self, queue, accounts=None):
        """
@@ -50,59 +45,145 @@ class Scheduler:
        """
        return sorted(queue, key=lambda job: job.submit_time)

    def start_job_event():
        pass

    def end_job_event():
        pass

    def schedule(self, queue, running, current_time, accounts=None, sorted=False, debug=False):
        # Convert RAPS jobs to ScheduleFlow format
        sf_jobs = [self._convert_to_sf(job) for job in queue]

        # Submit each job to the ScheduleFlow scheduler
        for sf_job in sf_jobs:
            self.sf_scheduler.submit_job(current_time, [sf_job])
        #self._sf_runtim
        pass
        #### SECOND TRY
        new_queue_items = list(filter(lambda x: x not in self.queue, queue))
        if new_queue_items:
            self.queue += new_queue_items
        #    # Convert RAPS jobs to ScheduleFlow format
            new_sf_jobs = [self._convert_to_sf(job) for job in new_queue_items]
            self.sf_submitted_list += new_sf_jobs  # This one only holds sf_jobs no timestamps
            # Submit each job to the ScheduleFlow scheduler # This trigger schedule!
            if new_sf_jobs:
                ret = self.sf_scheduler.submit_job(current_time, new_sf_jobs)
                self._sf_runtime._Runtime__handle_scheduler_actions(ret)
                self._sf_runtime._Runtime__trigger_schedule_event()

        if not self._sf_runtime._Runtime__events.empty():
            top = self._sf_runtime._Runtime__events.top()
            if top[0] == current_time:
                start_jobs = []
                end_jobs = []
                for event in self._sf_runtime._Runtime__events.pop_list():
                    if event[1] == EventType.Submit:
                        raise ValueError(f"Didnt we already Submit above? {event}")
                    if event[1] == EventType.JobStart:
                        start_jobs.append(event[2])
                    if event[1] == EventType.JobEnd:
                        end_jobs.append(event[2])
                if len(end_jobs) > 0:
                    self._sf_runtime._Runtime__job_end_event(end_jobs)
                    # End of jobs is handled by RAPS via prepare_timestep
                    pass
                if len(start_jobs) > 0:
                    self._sf_runtime._Runtime__job_start_event(start_jobs)
                    for sf_app in start_jobs:
                        job = _match_sf_app_and_job(sf_app,queue,start_jobs)
                        queue.remove(job)
                        self.resource_manager.assign_nodes_to_job(job, current_time)
                        running.append(job)


        # Trigger the schedule calculation
        actions = self.sf_scheduler.trigger_schedule(current_time)
            # Keep track of:  All jobs have been submitted empty the queue!


        #    remove_list = []
        #    job_list = []
        #    for x in self.sf_start_list:
        #        sf_job_start_time,sf_app = x
        #        if sf_job_start_time <= current_time:
        #            job_list.append(sf_app)
        #            remove_list.append(x)
        #            job = _match_sf_app_and_job(sf_app,queue,self.sf_submitted_list)
        #            if current_time != sf_job_start_time:
        #                print("current_time != sf_job_start_time")
        #                print(f"{current_time} != {sf_job_start_time}")
        #            queue.remove(job)
        #            self.sf_submitted_list.remove(sf_app)

        #            self.resource_manager.assign_nodes_to_job(job, current_time)
        #            running.append(job)
        #    if job_list:
        #        self.sf_end_list += self.sf_scheduler.start_job(current_time,job_list)
        #    for x in remove_list:
        #        self.sf_start_list.remove(x)

        #### First TRY
        #if self.sf_end_list:
        #    remove_list = []
        #    job_list = []
        #    for x in self.sf_end_list:
        #        if x[0] <= current_time:
        #            job_list.append(x[1])
        #            remove_list.append(x)
        #    if job_list:
        #        self.sf_action_list += self.sf_scheduler.stop_job(current_time,job_list)
        #    for x in remove_list:
        #        self.sf_end_list.remove(x)

        # submit_jobs triggered the schedule calculation, sf_jobs returned the placed jobs.
        # We need to flect this on the raps side.

        # March the sf_scheduler forward based on the jobs
        #end_jobs = self.sf_scheduler.start_job(current_time,sf_schedule[1])
        #self.sf_scheduler.end_job(current_time,end_jobs)

        # Add to running

        # Process the actions (each action is assumed to be (start_time, job_info))
        for act in actions:
            start_time, sf_job = act
            # Find the corresponding RAPS job using its ID
            job = self._find_job(queue, sf_job['id'])
            if job:
                job.scheduled_nodes = sf_job.get('assigned_nodes', [])
                job.start_time = start_time
                job.end_time = start_time + job.wall_time
                job.state = JobState.RUNNING
                running.append(job)
                queue.remove(job)
                if debug:
                    print(f"t={current_time}: Scheduled job {job.id} on nodes {summarize_ranges(job.scheduled_nodes)}")
        #for act in actions:
        #    start_time, sf_job = act
        #    # Find the corresponding RAPS job using its ID
        #    job = self._find_job(queue, sf_job['job_id'])
        #    if job:
        #        job.scheduled_nodes = sf_job.get('assigned_nodes', [])
        #        job.start_time = start_time
        #        job.end_time = start_time + job.wall_time
        #        job.state = JobState.RUNNING
        #        running.append(job)
        #        queue.remove(job)
        #        if debug:
        #            print(f"t={current_time}: Scheduled job {job.id} on nodes {summarize_ranges(job.scheduled_nodes)}")



    def _find_sf_in_queue(self,queue,sf_app):
        # Remember we added four digits and an underscore in _convert_to_sf:
        match = [x for x in queue if x.id == sf_app.name]
        if len(match != 1):
            raise ValueError(sf_app)
        return match[0]

    def _convert_to_sf(self, job):
        # Create an ScheduleFlow.Application from the job information:
        sf_prio = self.sorted_priorities.index(job.priority)
        # Use job_dict to create a dictionary from the RAPS job.
        d = job_dict(
            job.nodes_required,
            job.name,
            job.account,
            job.cpu_trace,
            job.gpu_trace,
            job.ntx_trace,
            job.nrx_trace,
            job.wall_time,
            getattr(job, 'end_state', None),  # Provide a default if not set
            job.requested_nodes,
            job.submit_time,
            job.id,
            priority=job.priority,
            partition=getattr(job, 'partition', 0)
        )
        # Now create an SFJob from the dictionary.
        return SFJob(d)
        nodes = job.nodes_required
        submission_time = job.submit_time
        if submission_time < 0:
            submission_time = 0
        walltime = job.wall_time
        requested_walltimes = [job.wall_time]
        priority = sf_prio
        resubmit_factor = -1
        name = job.id  # We use the ID as name to be able to match when unpacking!
        return ScheduleFlow.Application(nodes,submission_time,walltime,requested_walltimes,priority,resubmit_factor,name)

    def _find_job(self, queue, job_id):
        """
        Find the RAPS job in the queue that matches the given job_id.
        """
        for job in queue:
            if job.id == job_id:
            if job.job_id == job_id:
                return job
        return None

@@ -114,6 +195,23 @@ class Scheduler:
        # This is left as an exercise. You might use ScheduleFlow’s API to determine if a job can backfill.
        return None


def _match_sf_app_and_job(sf_app,queue,sf_queue):
    match = [x for x in sf_queue if x.name == sf_app.name]
    if len(match) != 1:
        print("Multiple Matches")
        raise ValueError(sf_app)
    else:
        match = match[0]
    job = [x for x in queue if x.id == match.name]
    if len(job) != 1:
        print("Multiple submitted Jobs ")
        raise ValueError(job)
    else:
        job = job[0]
    return job


if __name__ == '__main__':
    import unittest
    unittest.main()
Loading