Loading raps/schedulers/scheduleflow.py +64 −45 Original line number Diff line number Diff line from raps.job import Job, JobState from raps.utils import summarize_ranges # Import ScheduleFlow’s modules – since ScheduleFlow isn’t pip installable, you # may have vendored it or added it as a submodule (e.g. under third_party/scheduleflow) from third_party.ScheduleFlow import ScheduleFlow # adjust this import if needed from third_party.ScheduleFlow import ScheduleFlow from ..job import job_dict class SFJob: def __init__(self, job_info): """Map RAPS job object to ScheduleFlow""" self.job_id = job_info['id'] self.nodes = job_info['nodes_required'] self.walltime = job_info['wall_time'] self.requested_walltimes = None self.submission_time = job_info['submit_time'] self.name = job_info['name'] self.priority = job_info['priority'] self.resubmit_factor = -1 def __hash__(self): return hash(self.job_id) def __eq__(self, other): return isinstance(other, SFJob) and self.id == other.id def __repr__(self): return f"SFJob(id={self.job_id}, nodes={self.nodes}, wall_time={self.walltime})" class Scheduler: """ Loading @@ -15,15 +35,11 @@ class Scheduler: def __init__(self, config, policy, resource_manager): self.config = config # You might or might not use the policy parameter; for now we store it. self.policy = policy self.resource_manager = resource_manager # Here we instantiate a ScheduleFlow scheduler. # For example, if ScheduleFlow provides a Scheduler or OnlineScheduler, # choose one based on your needs. (See ScheduleFlow documentation for details.) self.sf_scheduler = ScheduleFlow.Scheduler( ScheduleFlow.System(config['TOTAL_NODES']), # You might pass additional parameters here if needed. priorityLevels=3, ) def sort_jobs(self, queue, accounts=None): Loading @@ -35,52 +51,55 @@ class Scheduler: return sorted(queue, key=lambda job: job.submit_time) def schedule(self, queue, running, current_time, accounts=None, sorted=False, debug=False): """ Convert the list of RAPS jobs into the format ScheduleFlow expects, call ScheduleFlow’s scheduling function, and then update each job. # Convert RAPS jobs to ScheduleFlow format sf_jobs = [self._convert_to_sf(job) for job in queue] This method is expected to remove the scheduled jobs from `queue` and append them to `running`. """ # Convert RAPS jobs into ScheduleFlow job representations. sf_jobs = [self._convert_job(job) for job in queue] # Call ScheduleFlow’s scheduling algorithm. # This is a placeholder – you must adapt it to ScheduleFlow’s actual API. scheduled_sf_jobs = self.sf_scheduler.compute_schedule(sf_jobs) # Map ScheduleFlow’s output back to the corresponding RAPS jobs. # Here we assume each ScheduleFlow job has an 'id' and a field 'assigned_nodes'. for sf_job in scheduled_sf_jobs: job = self._find_job_by_id(queue, sf_job['id']) if job is not None: # Submit each job to the ScheduleFlow scheduler for sf_job in sf_jobs: self.sf_scheduler.submit_job(current_time, [sf_job]) # Trigger the schedule calculation actions = self.sf_scheduler.trigger_schedule(current_time) # Process the actions (each action is assumed to be (start_time, job_info)) for act in actions: start_time, sf_job = act # Find the corresponding RAPS job using its ID job = self._find_job(queue, sf_job['id']) if job: job.scheduled_nodes = sf_job.get('assigned_nodes', []) # You could also update start_time, end_time, etc., if ScheduleFlow provides these. job.start_time = current_time # Or use sf_job['start_time'] if available job.end_time = current_time + job.wall_time job.start_time = start_time job.end_time = start_time + job.wall_time job.state = JobState.RUNNING running.append(job) queue.remove(job) if debug: print(f"t={current_time}: Scheduled job {job.id} on nodes {summarize_ranges(job.scheduled_nodes)}") # Optionally, if ScheduleFlow supports backfill, you can implement find_backfill_job() similarly. def _convert_job(self, job): """ Convert a RAPS Job object into a dictionary (or other format) that ScheduleFlow expects. def _convert_to_sf(self, job): # Use job_dict to create a dictionary from the RAPS job. d = job_dict( job.nodes_required, job.name, job.account, job.cpu_trace, job.gpu_trace, job.ntx_trace, job.nrx_trace, job.wall_time, getattr(job, 'end_state', None), # Provide a default if not set job.requested_nodes, job.submit_time, job.id, priority=job.priority, partition=getattr(job, 'partition', 0) ) # Now create an SFJob from the dictionary. return SFJob(d) Adjust the fields as necessary – here’s an example conversion. """ return { 'id': job.id, 'nodes_required': job.nodes_required, 'wall_time': job.wall_time, 'submit_time': job.submit_time, # Add any additional fields required by ScheduleFlow here. } def _find_job_by_id(self, queue, job_id): def _find_job(self, queue, job_id): """ Given a list of RAPS jobs, return the one with the matching id. Find the RAPS job in the queue that matches the given job_id. """ for job in queue: if job.id == job_id: Loading Loading
raps/schedulers/scheduleflow.py +64 −45 Original line number Diff line number Diff line from raps.job import Job, JobState from raps.utils import summarize_ranges # Import ScheduleFlow’s modules – since ScheduleFlow isn’t pip installable, you # may have vendored it or added it as a submodule (e.g. under third_party/scheduleflow) from third_party.ScheduleFlow import ScheduleFlow # adjust this import if needed from third_party.ScheduleFlow import ScheduleFlow from ..job import job_dict class SFJob: def __init__(self, job_info): """Map RAPS job object to ScheduleFlow""" self.job_id = job_info['id'] self.nodes = job_info['nodes_required'] self.walltime = job_info['wall_time'] self.requested_walltimes = None self.submission_time = job_info['submit_time'] self.name = job_info['name'] self.priority = job_info['priority'] self.resubmit_factor = -1 def __hash__(self): return hash(self.job_id) def __eq__(self, other): return isinstance(other, SFJob) and self.id == other.id def __repr__(self): return f"SFJob(id={self.job_id}, nodes={self.nodes}, wall_time={self.walltime})" class Scheduler: """ Loading @@ -15,15 +35,11 @@ class Scheduler: def __init__(self, config, policy, resource_manager): self.config = config # You might or might not use the policy parameter; for now we store it. self.policy = policy self.resource_manager = resource_manager # Here we instantiate a ScheduleFlow scheduler. # For example, if ScheduleFlow provides a Scheduler or OnlineScheduler, # choose one based on your needs. (See ScheduleFlow documentation for details.) self.sf_scheduler = ScheduleFlow.Scheduler( ScheduleFlow.System(config['TOTAL_NODES']), # You might pass additional parameters here if needed. priorityLevels=3, ) def sort_jobs(self, queue, accounts=None): Loading @@ -35,52 +51,55 @@ class Scheduler: return sorted(queue, key=lambda job: job.submit_time) def schedule(self, queue, running, current_time, accounts=None, sorted=False, debug=False): """ Convert the list of RAPS jobs into the format ScheduleFlow expects, call ScheduleFlow’s scheduling function, and then update each job. # Convert RAPS jobs to ScheduleFlow format sf_jobs = [self._convert_to_sf(job) for job in queue] This method is expected to remove the scheduled jobs from `queue` and append them to `running`. """ # Convert RAPS jobs into ScheduleFlow job representations. sf_jobs = [self._convert_job(job) for job in queue] # Call ScheduleFlow’s scheduling algorithm. # This is a placeholder – you must adapt it to ScheduleFlow’s actual API. scheduled_sf_jobs = self.sf_scheduler.compute_schedule(sf_jobs) # Map ScheduleFlow’s output back to the corresponding RAPS jobs. # Here we assume each ScheduleFlow job has an 'id' and a field 'assigned_nodes'. for sf_job in scheduled_sf_jobs: job = self._find_job_by_id(queue, sf_job['id']) if job is not None: # Submit each job to the ScheduleFlow scheduler for sf_job in sf_jobs: self.sf_scheduler.submit_job(current_time, [sf_job]) # Trigger the schedule calculation actions = self.sf_scheduler.trigger_schedule(current_time) # Process the actions (each action is assumed to be (start_time, job_info)) for act in actions: start_time, sf_job = act # Find the corresponding RAPS job using its ID job = self._find_job(queue, sf_job['id']) if job: job.scheduled_nodes = sf_job.get('assigned_nodes', []) # You could also update start_time, end_time, etc., if ScheduleFlow provides these. job.start_time = current_time # Or use sf_job['start_time'] if available job.end_time = current_time + job.wall_time job.start_time = start_time job.end_time = start_time + job.wall_time job.state = JobState.RUNNING running.append(job) queue.remove(job) if debug: print(f"t={current_time}: Scheduled job {job.id} on nodes {summarize_ranges(job.scheduled_nodes)}") # Optionally, if ScheduleFlow supports backfill, you can implement find_backfill_job() similarly. def _convert_job(self, job): """ Convert a RAPS Job object into a dictionary (or other format) that ScheduleFlow expects. def _convert_to_sf(self, job): # Use job_dict to create a dictionary from the RAPS job. d = job_dict( job.nodes_required, job.name, job.account, job.cpu_trace, job.gpu_trace, job.ntx_trace, job.nrx_trace, job.wall_time, getattr(job, 'end_state', None), # Provide a default if not set job.requested_nodes, job.submit_time, job.id, priority=job.priority, partition=getattr(job, 'partition', 0) ) # Now create an SFJob from the dictionary. return SFJob(d) Adjust the fields as necessary – here’s an example conversion. """ return { 'id': job.id, 'nodes_required': job.nodes_required, 'wall_time': job.wall_time, 'submit_time': job.submit_time, # Add any additional fields required by ScheduleFlow here. } def _find_job_by_id(self, queue, job_id): def _find_job(self, queue, job_id): """ Given a list of RAPS jobs, return the one with the matching id. Find the RAPS job in the queue that matches the given job_id. """ for job in queue: if job.id == job_id: Loading