Commit 3a61ec39 authored by Brewer, Wes's avatar Brewer, Wes Committed by Maiterth, Matthias
Browse files

Add initial skeleton for integrating ScheduleFlow

parent 4c84f9bd
Loading
Loading
Loading
Loading

.gitmodules

0 → 100644
+3 −0
Original line number Diff line number Diff line
[submodule "third_party/ScheduleFlow"]
	path = third_party/ScheduleFlow
	url = https://github.com/whbrewer/ScheduleFlow
+16 −10
Original line number Diff line number Diff line
import argparse
import sys
from raps.schedulers.default import PolicyType
from raps.schedulers.default import PolicyType, BackfillType

parser = argparse.ArgumentParser(description='Resource Allocator & Power Simulator (RAPS)')

@@ -10,12 +9,11 @@ parser.add_argument('-x', '--partitions', nargs='+', default=None, help='List of
parser.add_argument('-c', '--cooling', action='store_true', help='Include FMU cooling model')

# Simulation runtime options
parser.add_argument('-ff', '--fastforward', type=str, default=None, help='Fast-forward by time amount (uses same units as -t)')
parser.add_argument('-t', '--time', type=str, default=None, help='Length of time to simulate, e.g., 123, 123s, 27m, 3h, 7d')
parser.add_argument('-d', '--debug', action='store_true', help='Enable debug mode and disable rich layout')
parser.add_argument('-n', '--numjobs', type=int, default=1000, help='Number of jobs to schedule')
parser.add_argument('-v', '--verbose', action='store_true', help='Enable verbose output')
choices = ['layout1', 'layout2']
parser.add_argument('--layout', type=str, choices=choices, default=choices[0], help='Layout of UI')
parser.add_argument('--start', type=str, help='ISO8061 string for start of simulation')
parser.add_argument('--end', type=str, help='ISO8061 string for end of simulation')
parser.add_argument('--seed', action='store_true', help='Set random number seed for deterministic simulation')
@@ -23,6 +21,11 @@ parser.add_argument('-u', '--uncertainties', action='store_true',
                    help='Change from floating point units to floating point units with uncertainties.' + \
                         ' Very expensive w.r.t simulation time!')

# User Interface options
choices = ['layout1', 'layout2']
parser.add_argument('--layout', type=str, choices=choices, default=choices[0], help='Layout of UI')


# Output options
parser.add_argument('-o', '--output', action='store_true', help='Output power, cooling, and loss models for later analysis')
parser.add_argument('-p', '--plot', nargs='+', choices=['power', 'loss', 'pue', 'temp', 'util'],
@@ -33,7 +36,6 @@ parser.add_argument('--imtype', type=str, choices=choices, default=choices[0], h
# Telemetry data
parser.add_argument('-f', '--replay', nargs='+', type=str, help='Either: path/to/joblive path/to/jobprofile' + \
                                                                ' -or- filename.npz (overrides --workload option)')
parser.add_argument('-ff', '--fastforward', type=str, default=None, help='Fast-forward by time amount (uses same units as -t)')
parser.add_argument('-e', '--encrypt', action='store_true', help='Encrypt any sensitive data in telemetry')
parser.add_argument('--validate', action='store_true', help='Use node power instead of CPU/GPU utilizations')
parser.add_argument('--jid', type=str, default='*', help='Replay job id')
@@ -44,14 +46,18 @@ choices = ['random', 'benchmark', 'peak', 'idle']
parser.add_argument('-w', '--workload', type=str, choices=choices, default=choices[0], help='Type of synthetic workload')

# Scheduling options
choices = ['default', 'replay', 'nrel', 'anl', 'flux']
choices = ['default', 'scheduleflow', 'nrel', 'anl', 'flux']
parser.add_argument('--scheduler', type=str, choices=choices, default=choices[0], help='Name of scheduler')
choices = [None, 'firstfit', 'bestfit', 'greedy', 'easy', 'conservative']
choices = [policy.value for policy in PolicyType]
parser.add_argument('--policy', type=str, choices=choices, default=choices[0], help='Schedule policy to use')
choices = [policy.value for policy in BackfillType]
parser.add_argument('--backfill', type=str, choices=choices, default=None, help='Backfill Policy')
policies = [policy.value for policy in PolicyType]

# Redistribution of job arrival
choices = ['prescribed', 'poisson']
parser.add_argument('--arrival', default=choices[0], type=str, choices=choices, help=f'Modify arrival distribution ({choices[1]}) or use the original submit times ({choices[0]})')
parser.add_argument('--policy', type=str, choices=policies, default=None, help='Schedule policy to use')

# Account options
parser.add_argument('--accounts', action='store_true', help='Flag indicating if accounts should be tracked')
parser.add_argument('--accounts-json', type=str, help='Json of account stats generated in previous run. see raps/accounts.py')

+100 −0
Original line number Diff line number Diff line
from raps.job import Job, JobState
from raps.utils import summarize_ranges
# Import ScheduleFlow’s modules – since ScheduleFlow isn’t pip installable, you
# may have vendored it or added it as a submodule (e.g. under third_party/scheduleflow)
from third_party.ScheduleFlow import ScheduleFlow  # adjust this import if needed

class Scheduler:
    """
    Adapter for integrating ScheduleFlow into RAPS.
    
    This scheduler implements the same interface as the default RAPS scheduler.
    It converts RAPS jobs into ScheduleFlow’s format, calls ScheduleFlow’s scheduling
    routines, then updates the RAPS job objects accordingly.
    """

    def __init__(self, config, policy, resource_manager):
        self.config = config
        # You might or might not use the policy parameter; for now we store it.
        self.policy = policy  
        self.resource_manager = resource_manager
        # Here we instantiate a ScheduleFlow scheduler.
        # For example, if ScheduleFlow provides a Scheduler or OnlineScheduler,
        # choose one based on your needs. (See ScheduleFlow documentation for details.)
        self.sf_scheduler = ScheduleFlow.Scheduler(
            ScheduleFlow.System(config['TOTAL_NODES']),
            # You might pass additional parameters here if needed.
        )

    def sort_jobs(self, queue, accounts=None):
        """
        Optionally, pre-sort jobs.
        
        For now, we can sort by submit_time (FCFS) as a default.
        """
        return sorted(queue, key=lambda job: job.submit_time)

    def schedule(self, queue, running, current_time, accounts=None, sorted=False, debug=False):
        """
        Convert the list of RAPS jobs into the format ScheduleFlow expects,
        call ScheduleFlow’s scheduling function, and then update each job.

        This method is expected to remove the scheduled jobs from `queue` and append them to `running`.
        """
        # Convert RAPS jobs into ScheduleFlow job representations.
        sf_jobs = [self._convert_job(job) for job in queue]

        # Call ScheduleFlow’s scheduling algorithm.
        # This is a placeholder – you must adapt it to ScheduleFlow’s actual API.
        scheduled_sf_jobs = self.sf_scheduler.compute_schedule(sf_jobs)

        # Map ScheduleFlow’s output back to the corresponding RAPS jobs.
        # Here we assume each ScheduleFlow job has an 'id' and a field 'assigned_nodes'.
        for sf_job in scheduled_sf_jobs:
            job = self._find_job_by_id(queue, sf_job['id'])
            if job is not None:
                job.scheduled_nodes = sf_job.get('assigned_nodes', [])
                # You could also update start_time, end_time, etc., if ScheduleFlow provides these.
                job.start_time = current_time  # Or use sf_job['start_time'] if available
                job.end_time = current_time + job.wall_time
                job.state = JobState.RUNNING
                running.append(job)
                queue.remove(job)
                if debug:
                    print(f"t={current_time}: Scheduled job {job.id} on nodes {summarize_ranges(job.scheduled_nodes)}")
        # Optionally, if ScheduleFlow supports backfill, you can implement find_backfill_job() similarly.

    def _convert_job(self, job):
        """
        Convert a RAPS Job object into a dictionary (or other format) that ScheduleFlow expects.
        
        Adjust the fields as necessary – here’s an example conversion.
        """
        return {
            'id': job.id,
            'nodes_required': job.nodes_required,
            'wall_time': job.wall_time,
            'submit_time': job.submit_time,
            # Add any additional fields required by ScheduleFlow here.
        }

    def _find_job_by_id(self, queue, job_id):
        """
        Given a list of RAPS jobs, return the one with the matching id.
        """
        for job in queue:
            if job.id == job_id:
                return job
        return None

    def find_backfill_job(self, queue, num_free_nodes, current_time):
        """
        Optionally, implement backfill logic by delegating to ScheduleFlow's
        mechanisms or by applying custom logic.
        """
        # This is left as an exercise. You might use ScheduleFlow’s API to determine if a job can backfill.
        return None

if __name__ == '__main__':
    import unittest
    unittest.main()
Original line number Diff line number Diff line
Subproject commit 3fdfd3675e68f0c2a0e68c1d7ce7205940d28216