Commit cf3e0c94 authored by Brewer, Wes's avatar Brewer, Wes
Browse files

Add resmgr.py with ResourceManager class

parent 16725002
Loading
Loading
Loading
Loading
+21 −12
Original line number Diff line number Diff line
@@ -7,6 +7,7 @@ from .job import Job, JobState
from .account import Accounts
from .network import network_utilization
from .utils import summarize_ranges, expand_ranges, write_dict_to_file
from .resmgr import ResourceManager
from .schedulers import load_scheduler


@@ -33,9 +34,12 @@ class Engine:
    def __init__(self, *, power_manager, flops_manager, cooling_model=None, config, **kwargs):
        self.config = config
        self.down_nodes = summarize_ranges(self.config['DOWN_NODES'])
        self.available_nodes = list(set(range(self.config['TOTAL_NODES'])) - set(self.config['DOWN_NODES']))
        self.num_free_nodes = len(self.available_nodes)
        self.num_active_nodes = self.config['TOTAL_NODES'] - self.num_free_nodes - len(self.config['DOWN_NODES'])
        self.resource_manager = ResourceManager(
            total_nodes=self.config['TOTAL_NODES'],
            down_nodes=self.config['DOWN_NODES']
        ) 

        # Initialize running and queue, etc.
        self.running = []
        self.queue = []
        self.accounts = Accounts()
@@ -54,7 +58,11 @@ class Engine:
        
        # Get scheduler type from command-line args or default
        scheduler_type = kwargs.get('scheduler', 'default')
        self.scheduler = load_scheduler(scheduler_type)(config=self.config, policy=kwargs.get('policy'))
        self.scheduler = load_scheduler(scheduler_type)(
            config=self.config,
            policy=kwargs.get('policy'),
            resource_manager=self.resource_manager
        )
        print(f"Using scheduler: {scheduler_type}")


@@ -72,8 +80,10 @@ class Engine:
        newly_downed_nodes = self.node_failure(self.config['MTBF'])

        # Update active/free nodes
        self.num_free_nodes = len(self.available_nodes)
        self.num_active_nodes = self.config['TOTAL_NODES'] - self.num_free_nodes - len(expand_ranges(self.down_nodes))
        self.num_free_nodes = len(self.resource_manager.available_nodes)
        self.num_active_nodes = self.config['TOTAL_NODES'] 
                              - len(self.resource_manager.available_nodes) 
                              - len(self.resource_manager.down_nodes)

        # Update running time for all running jobs
        for job in self.running:
@@ -103,12 +113,11 @@ class Engine:
            self.jobs_completed += 1
            job_stats = job.statistics()
            self.accounts.update_account_statistics(job_stats)
            # Free nodes and ensure there are no duplicates
            self.available_nodes.extend(job.scheduled_nodes)
            self.available_nodes = sorted(set(self.available_nodes))
            # Free the nodes via the resource manager.
            self.resource_manager.free_nodes_from_job(job)

        # Ask scheduler to schedule any jobs waiting in queue
        self.scheduler.schedule(self.queue, self.running, self.available_nodes, self.current_time)
        self.scheduler.schedule(self.queue, self.running, self.current_time)

        # Update the power array UI component
        rack_power, rect_losses = self.power_manager.compute_rack_power()
@@ -120,7 +129,7 @@ class Engine:
        self.sys_util_history.append((self.current_time, system_util))

        # Render the updated layout
        power_df = None #self.power_manager.get_power_df() if self.power_manager else pd.DataFrame()
        power_df = None
        cooling_inputs, cooling_outputs = None, None

        # Update power history every 15s
@@ -196,7 +205,7 @@ class Engine:

        for timestep in range(timesteps):

            self.scheduler.schedule(self.queue, self.running, self.available_nodes, self.current_time)
            self.scheduler.schedule(self.queue, self.running, self.current_time)

            # Stop the simulation if no more jobs are running or in the queue
            if not self.queue and not self.running and not self.replay:

raps/resmgr.py

0 → 100644
+50 −0
Original line number Diff line number Diff line
from .job import JobState

class ResourceManager:
    def __init__(self, total_nodes, down_nodes):
        self.total_nodes = total_nodes
        # Maintain a set for down nodes (e.g., nodes that are offline)
        self.down_nodes = set(down_nodes)
        # Available nodes are those that are not down
        self.available_nodes = sorted(set(range(total_nodes)) - self.down_nodes)
        # You can track system utilization history here
        self.sys_util_history = []  # list of (time, utilization) tuples

    def assign_nodes_to_job(self, job, current_time):
        """Assigns nodes to a job and updates the available nodes."""
        if len(self.available_nodes) < job.nodes_required:
            raise ValueError(f"Not enough available nodes to schedule job {job.id}")

        if job.requested_nodes:  # Telemetry replay case
            job.scheduled_nodes = job.requested_nodes
            self.available_nodes = [n for n in self.available_nodes if n not in job.scheduled_nodes]
        else:  # Synthetic or reschedule case
            job.scheduled_nodes = self.available_nodes[:job.nodes_required]
            self.available_nodes = self.available_nodes[job.nodes_required:]

        # Set job start and end times
        job.start_time = current_time
        job.end_time = current_time + job.wall_time
        job.state = JobState.RUNNING  # Mark job as running

    def free_nodes_from_job(self, job):
        """Frees the nodes that were allocated to a completed job."""
        if hasattr(job, "scheduled_nodes"):
            self.available_nodes.extend(job.scheduled_nodes)
            # Remove duplicates and sort the list for consistency
            self.available_nodes = sorted(set(self.available_nodes))
        else:
            # If job has no scheduled nodes, there is nothing to free.
            pass

    def update_system_utilization(self, current_time, num_active_nodes):
        """
        Computes and records the system utilization.
        For example, utilization could be defined as the ratio of active nodes to the total non-down nodes.
        """
        # Number of nodes that are not down:
        total_operational = self.total_nodes - len(self.down_nodes)
        # Compute utilization as a percentage:
        utilization = (num_active_nodes / total_operational) * 100 if total_operational else 0
        self.sys_util_history.append((current_time, utilization))
        return utilization
+11 −27
Original line number Diff line number Diff line
@@ -15,9 +15,13 @@ class Scheduler:
    """ Default job scheduler with various scheduling policies. """
    

    def __init__(self, config, policy):
    def __init__(self, config, policy, resource_manager=None):
        self.config = config
        self.policy = PolicyType(policy)
        if resource_manager is None:
            raise ValueError("Scheduler requires a ResourceManager instance")
        self.resource_manager = resource_manager
        self.debug = False


    def sort_jobs(self, queue):
@@ -32,42 +36,22 @@ class Scheduler:
            raise ValueError(f"Unknown policy type: {self.policy}")


    def assign_nodes_to_job(self, job, available_nodes, current_time):
        """Assigns nodes to a job and updates available nodes."""
        if len(available_nodes) < job.nodes_required:
            raise ValueError(f"Not enough available nodes to schedule job {job.id}")

        if job.requested_nodes:  # Telemetry replay case
            job.scheduled_nodes = job.requested_nodes
            available_nodes[:] = [n for n in available_nodes if n not in job.scheduled_nodes]
        else:  # Synthetic or reschedule case
            job.scheduled_nodes = available_nodes[:job.nodes_required]
            available_nodes[:] = available_nodes[job.nodes_required:]

        # Set job start and end times
        job.start_time = current_time
        job.end_time = current_time + job.wall_time
        job.state = JobState.RUNNING  # Job is now running


    def schedule(self, queue, running, available_nodes, current_time, debug=False):
    def schedule(self, queue, running, current_time, debug=False):
        # Sort the queue in place.
        queue[:] = self.sort_jobs(queue)

        # Iterate over a copy of the queue since we might remove items
        for job in queue[:]:
            synthetic_bool = len(available_nodes) >= job.nodes_required
            telemetry_bool = job.requested_nodes and set(job.requested_nodes).issubset(set(available_nodes))

            if synthetic_bool or telemetry_bool:
                self.assign_nodes_to_job(job, available_nodes, current_time)
            # Check if the resource manager has enough nodes.
            if len(self.resource_manager.available_nodes) >= job.nodes_required:
                # Use ResourceManager to assign nodes.
                self.resource_manager.assign_nodes_to_job(job, current_time)
                running.append(job)
                queue.remove(job)  # Remove the job from the queue
                queue.remove(job)
                if debug:
                    scheduled_nodes = summarize_ranges(job.scheduled_nodes)
                    print(f"t={current_time}: Scheduled job {job.id} with wall time {job.wall_time} on nodes {scheduled_nodes}")
            else:
                # Optionally, if you have a BACKFILL policy, attempt backfilling here.
                if self.policy == PolicyType.BACKFILL:
                    # Try to find a backfill candidate from the entire queue.
                    backfill_job = self.find_backfill_job(queue, len(available_nodes), current_time)