From 19f0189c274fa98792c45641224cc3b963253612 Mon Sep 17 00:00:00 2001 From: Wes Brewer Date: Tue, 4 Feb 2025 23:18:03 -0500 Subject: [PATCH 01/15] Initial attempts to support pluggable schedulers specified by --scheduler --- args.py | 6 +- main.py | 7 +- multi-part-sim.py | 14 +- raps/engine.py | 272 +++++++++++++++++++++++++++ raps/policy.py | 62 ------- raps/scheduler.py | 460 ---------------------------------------------- raps/telemetry.py | 2 +- raps/ui.py | 14 +- 8 files changed, 295 insertions(+), 542 deletions(-) create mode 100644 raps/engine.py delete mode 100644 raps/policy.py delete mode 100644 raps/scheduler.py diff --git a/args.py b/args.py index 1ca2343..8cfcd77 100644 --- a/args.py +++ b/args.py @@ -1,5 +1,5 @@ import argparse -from raps.policy import PolicyType +from raps.schedulers.default import PolicyType parser = argparse.ArgumentParser(description='Resource Allocator & Power Simulator (RAPS)') parser.add_argument('-c', '--cooling', action='store_true', help='Include FMU cooling model') @@ -28,8 +28,10 @@ choices = ['png', 'svg', 'jpg', 'pdf', 'eps'] parser.add_argument('--imtype', type=str, choices=choices, default=choices[0], help='Plot image type') parser.add_argument('--scale', type=int, default=0, help='Scale telemetry to max nodes specified in order to run telemetry on a smaller smaller target system/partition, e.g., --scale 192') parser.add_argument('--system', type=str, default='frontier', help='System config to use') +choices = ['default', 'nrel', 'anl', 'flux'] +parser.add_argument('--scheduler', type=str, choices=choices, default=choices[0], help='Name of scheduler') choices = [policy.value for policy in PolicyType] -parser.add_argument('-s', '--schedule', type=str, choices=choices, default=choices[0], help='Schedule policy to use') +parser.add_argument('--policy', type=str, choices=choices, default=choices[0], help='Schedule policy to use') choices = ['random', 'benchmark', 'peak', 'idle'] parser.add_argument('-w', '--workload', type=str, choices=choices, default=choices[0], help='Type of synthetic workload') choices = ['layout1', 'layout2'] diff --git a/main.py b/main.py index 67c04f8..58e7147 100644 --- a/main.py +++ b/main.py @@ -25,7 +25,8 @@ from raps.flops import FLOPSManager from raps.plotting import Plotter from raps.power import PowerManager, compute_node_power, compute_node_power_validate from raps.power import compute_node_power_uncertainties, compute_node_power_validate_uncertainties -from raps.scheduler import Scheduler, Job +from raps.engine import Engine +from raps.job import Job from raps.telemetry import Telemetry from raps.workload import Workload from raps.weather import Weather @@ -61,13 +62,13 @@ else: args_dict['config'] = config flops_manager = FLOPSManager(**args_dict) -sc = Scheduler( +sc = Engine( power_manager=power_manager, flops_manager=flops_manager, cooling_model=cooling_model, **args_dict, ) -layout_manager = LayoutManager(args.layout, scheduler=sc, debug=args.debug, **config) +layout_manager = LayoutManager(args.layout, engine=sc, debug=args.debug, **config) if args.replay: diff --git a/multi-part-sim.py b/multi-part-sim.py index 3621837..342d463 100644 --- a/multi-part-sim.py +++ b/multi-part-sim.py @@ -8,9 +8,9 @@ import sys from args import args from raps.config import ConfigManager, CONFIG_PATH -from raps.policy import PolicyType +from raps.schedulers.default import PolicyType from raps.ui import LayoutManager -from raps.scheduler import Scheduler +from raps.engine import Engine from raps.flops import FLOPSManager from raps.power import PowerManager, compute_node_power from raps.telemetry import Telemetry @@ -74,8 +74,8 @@ layout_managers = {} for i, config in enumerate(configs): pm = PowerManager(compute_node_power, **configs[i]) fm = FLOPSManager(**args_dicts[i]) - sc = Scheduler(power_manager=pm, flops_manager=fm, cooling_model=None, **args_dicts[i]) - layout_managers[config['system_name']] = LayoutManager(args.layout, scheduler=sc, debug=args.debug, **config) + sc = Engine(power_manager=pm, flops_manager=fm, cooling_model=None, **args_dicts[i]) + layout_managers[config['system_name']] = LayoutManager(args.layout, engine=sc, debug=args.debug, **config) # Set simulation timesteps if args.time: @@ -96,9 +96,9 @@ for timestep in range(timesteps): if timestep % configs[0]['UI_UPDATE_FREQ'] == 0: # Assuming same frequency for all partitions sys_power = 0 for name, lm in layout_managers.items(): - sys_util = lm.scheduler.sys_util_history[-1] if lm.scheduler.sys_util_history else 0.0 - print(f"[DEBUG] {name} - Timestep {timestep} - Jobs running: {len(lm.scheduler.running)} - Utilization: {sys_util[1]:.2f}% - Power: {lm.scheduler.sys_power:.1f}kW") - sys_power += lm.scheduler.sys_power + sys_util = lm.engine.sys_util_history[-1] if lm.engine.sys_util_history else 0.0 + print(f"[DEBUG] {name} - Timestep {timestep} - Jobs running: {len(lm.engine.running)} - Utilization: {sys_util[1]:.2f}% - Power: {lm.engine.sys_power:.1f}kW") + sys_power += lm.engine.sys_power print(f"system power: {sys_power:.1f}kW") print("Simulation complete.") diff --git a/raps/engine.py b/raps/engine.py new file mode 100644 index 0000000..ba4014c --- /dev/null +++ b/raps/engine.py @@ -0,0 +1,272 @@ +from typing import Optional +import dataclasses +import numpy as np +import pandas as pd + +from .job import Job, JobState +from .account import Accounts +from .network import network_utilization +from .utils import summarize_ranges, expand_ranges, write_dict_to_file +from .schedulers import load_scheduler + + +@dataclasses.dataclass +class TickData: + """ Represents the state output from the simulation each tick """ + current_time: int + completed: list[Job] + running: list[Job] + queue: list[Job] + down_nodes: list[int] + power_df: Optional[pd.DataFrame] + p_flops: Optional[float] + g_flops_w: Optional[float] + system_util: float + fmu_inputs: Optional[dict] + fmu_outputs: Optional[dict] + num_active_nodes: int + num_free_nodes: int + + +class Engine: + """Job scheduling simulation engine.""" + def __init__(self, *, power_manager, flops_manager, cooling_model=None, config, **kwargs): + self.config = config + self.down_nodes = summarize_ranges(self.config['DOWN_NODES']) + self.available_nodes = list(set(range(self.config['TOTAL_NODES'])) - set(self.config['DOWN_NODES'])) + self.num_free_nodes = len(self.available_nodes) + self.num_active_nodes = self.config['TOTAL_NODES'] - self.num_free_nodes - len(self.config['DOWN_NODES']) + self.running = [] + self.queue = [] + self.accounts = Accounts() + if 'accounts_json' in kwargs and kwargs['accounts_json']: + self.accounts.initialize_accounts_from_json(kwargs.get('accounts_json')) + self.jobs_completed = 0 + self.current_time = 0 + self.cooling_model = cooling_model + self.sys_power = 0 + self.power_manager = power_manager + self.flops_manager = flops_manager + self.debug = kwargs.get('debug') + self.output = kwargs.get('output') + self.replay = kwargs.get('replay') + self.sys_util_history = [] + + # Get scheduler type from command-line args or default + scheduler_type = kwargs.get('scheduler', 'default') + self.scheduler = load_scheduler(scheduler_type)(config=self.config, policy=kwargs.get('policy')) + print(f"Using scheduler: {scheduler_type}") + + + def add_job(self, job): + self.queue.append(job) + self.queue = self.scheduler.sort_jobs(self.queue) + + + def tick(self): + """Simulate a timestep.""" + completed_jobs = [job for job in self.running if job.end_time is not None and job.end_time <= self.current_time] + completed_job_stats = [] + + # Simulate node failure + newly_downed_nodes = self.node_failure(self.config['MTBF']) + + # Update active/free nodes + self.num_free_nodes = len(self.available_nodes) + self.num_active_nodes = self.config['TOTAL_NODES'] - self.num_free_nodes - len(expand_ranges(self.down_nodes)) + + # Update running time for all running jobs + for job in self.running: + if job.end_time == self.current_time: + job.state = JobState.COMPLETED + + if job.state == JobState.RUNNING: + job.running_time = self.current_time - job.start_time + time_quanta_index = (self.current_time - job.start_time) // self.config['TRACE_QUANTA'] + cpu_util = self.get_utilization(job.cpu_trace, time_quanta_index) + gpu_util = self.get_utilization(job.gpu_trace, time_quanta_index) + net_util = 0 + + if len(job.ntx_trace) and len(job.nrx_trace): + net_tx = self.get_utilization(job.ntx_trace, time_quanta_index) + net_rx = self.get_utilization(job.nrx_trace, time_quanta_index) + net_util = network_utilization(net_tx, net_rx) + + self.flops_manager.update_flop_state(job.scheduled_nodes, cpu_util, gpu_util) + job.power = self.power_manager.update_power_state(job.scheduled_nodes, cpu_util, gpu_util, net_util) + + if job.running_time % self.config['TRACE_QUANTA'] == 0: + job.power_history.append(job.power) + + for job in completed_jobs: + self.running.remove(job) + self.jobs_completed += 1 + job_stats = job.statistics() + self.accounts.update_account_statistics(job_stats) + + # Ask scheduler to schedule any jobs waiting in queue + self.scheduler.schedule(self.queue, self.running, self.available_nodes, self.current_time) + + # Update the power array UI component + rack_power, rect_losses = self.power_manager.compute_rack_power() + sivoc_losses = self.power_manager.compute_sivoc_losses() + rack_loss = rect_losses + sivoc_losses + + # Update system utilization + system_util = self.num_active_nodes / self.config['AVAILABLE_NODES'] * 100 + self.sys_util_history.append((self.current_time, system_util)) + + # Render the updated layout + power_df = None #self.power_manager.get_power_df() if self.power_manager else pd.DataFrame() + cooling_inputs, cooling_outputs = None, None + + # Update power history every 15s + if self.current_time % self.config['POWER_UPDATE_FREQ'] == 0: + total_power_kw = sum(row[-1] for row in rack_power) + self.config['NUM_CDUS'] * self.config['POWER_CDU'] / 1000.0 + total_loss_kw = sum(row[-1] for row in rack_loss) + self.power_manager.history.append((self.current_time, total_power_kw)) + self.sys_power = total_power_kw + self.power_manager.loss_history.append((self.current_time, total_loss_kw)) + output_df = self.power_manager.get_power_df(rack_power, rack_loss) + pflops = self.flops_manager.get_system_performance() / 1E15 + gflop_per_watt = pflops * 1E6 / (total_power_kw * 1000) + else: + pflops, gflop_per_watt = None, None + + if self.current_time % self.config['POWER_UPDATE_FREQ'] == 0: + if self.cooling_model: + # Power for NUM_CDUS (25 for Frontier) + cdu_power = rack_power.T[-1] * 1000 + runtime_values = self.cooling_model.generate_runtime_values(cdu_power, self) + + # FMU inputs are N powers and the wetbulb temp + fmu_inputs = self.cooling_model.generate_fmu_inputs(runtime_values, + uncertainties=self.power_manager.uncertainties) + cooling_inputs, cooling_outputs = ( + self.cooling_model.step(self.current_time, fmu_inputs, self.config['POWER_UPDATE_FREQ']) + ) + + # Get a dataframe of the power data + power_df = self.power_manager.get_power_df(rack_power, rack_loss) + else: + # Get a dataframe of the power data + power_df = self.power_manager.get_power_df(rack_power, rack_loss) + + tick_data = TickData( + current_time=self.current_time, + completed=completed_jobs, + running=self.running, + queue=self.queue, + down_nodes=expand_ranges(self.down_nodes), + power_df=power_df, + p_flops=pflops, + g_flops_w=gflop_per_watt, + system_util=self.num_active_nodes / self.config['AVAILABLE_NODES'] * 100, + fmu_inputs=cooling_inputs, + fmu_outputs=cooling_outputs, + num_active_nodes=self.num_active_nodes, + num_free_nodes=self.num_free_nodes, + ) + + self.current_time += 1 + return tick_data + + + + def get_utilization(self, trace, time_quanta_index): + """Retrieve utilization value for a given trace at a specific time quanta index.""" + if isinstance(trace, (list, np.ndarray)): + return trace[time_quanta_index] + elif isinstance(trace, (int, float)): + return float(trace) + else: + raise TypeError(f"Invalid type for utilization: {type(trace)}.") + + + def run_simulation(self, jobs, timesteps): + """ Generator that yields after each simulation tick """ + last_submit_time = 0 + self.timesteps = timesteps + + for job_info in jobs: + job = Job(job_info, self.current_time) + self.add_job(job) + + for timestep in range(timesteps): + while self.current_time >= last_submit_time and jobs: + self.scheduler.schedule(self.queue, self.running, self.available_nodes, self.current_time) + + if jobs: + last_submit_time = job.submit_time + else: # No more jobs, set submit_time to infinity to avoid triggering again + last_submit_time = float('inf') + + yield self.tick() + + # Stop the simulation if no more jobs are running or in the queue + if not self.queue and not self.running and not self.replay: + print(f"[DEBUG] {self.config['system_name']} - Stopping simulation at time {self.current_time}") + break + if self.debug and timestep % self.config['UI_UPDATE_FREQ'] == 0: + print(".", end="", flush=True) + + + def get_stats(self): + """ Return output statistics """ + sum_values = lambda values: sum(x[1] for x in values) if values else 0 + min_value = lambda values: min(x[1] for x in values) if values else 0 + max_value = lambda values: max(x[1] for x in values) if values else 0 + num_samples = len(self.power_manager.history) if self.power_manager else 0 + + throughput = self.jobs_completed / self.timesteps * 3600 if self.timesteps else 0 # Jobs per hour + average_power_mw = sum_values(self.power_manager.history) / num_samples / 1000 if num_samples else 0 + average_loss_mw = sum_values(self.power_manager.loss_history) / num_samples / 1000 if num_samples else 0 + min_loss_mw = min_value(self.power_manager.loss_history) / 1000 if num_samples else 0 + max_loss_mw = max_value(self.power_manager.loss_history) / 1000 if num_samples else 0 + + loss_fraction = average_loss_mw / average_power_mw if average_power_mw else 0 + efficiency = 1 - loss_fraction if loss_fraction else 0 + total_energy_consumed = average_power_mw * self.timesteps / 3600 if self.timesteps else 0 # MW-hr + emissions = total_energy_consumed * 852.3 / 2204.6 / efficiency if efficiency else 0 + total_cost = total_energy_consumed * 1000 * self.config.get('POWER_COST', 0) # Total cost in dollars + + stats = { + 'num_samples': num_samples, + 'jobs completed': self.jobs_completed, + 'throughput': f'{throughput:.2f} jobs/hour', + 'jobs still running': [job.id for job in self.running], + 'jobs still in queue': [job.id for job in self.queue], + 'average power': f'{average_power_mw:.2f} MW', + 'min loss': f'{min_loss_mw:.2f} MW', + 'average loss': f'{average_loss_mw:.2f} MW', + 'max loss': f'{max_loss_mw:.2f} MW', + 'system power efficiency': f'{efficiency * 100:.2f}%', + 'total energy consumed': f'{total_energy_consumed:.2f} MW-hr', + 'carbon emissions': f'{emissions:.2f} metric tons CO2', + 'total cost': f'${total_cost:.2f}' + } + + return stats + + + def node_failure(self, mtbf): + """Simulate node failure using Weibull distribution.""" + from scipy.stats import weibull_min + shape_parameter = 1.5 + scale_parameter = mtbf * 3600 # Convert to seconds + + down_nodes = expand_ranges(self.down_nodes) + all_nodes = np.setdiff1d(np.arange(self.config['TOTAL_NODES']), np.array(down_nodes, dtype=int)) + + random_values = weibull_min.rvs(shape_parameter, scale=scale_parameter, size=all_nodes.size) + failure_threshold = 0.1 + failed_nodes_mask = random_values < failure_threshold + newly_downed_nodes = all_nodes[failed_nodes_mask] + + for node_index in newly_downed_nodes: + if node_index in self.available_nodes: + self.available_nodes.remove(node_index) + self.down_nodes.append(str(node_index)) + self.power_manager.set_idle(node_index) + + return newly_downed_nodes.tolist() diff --git a/raps/policy.py b/raps/policy.py deleted file mode 100644 index c7900db..0000000 --- a/raps/policy.py +++ /dev/null @@ -1,62 +0,0 @@ -from enum import Enum - -class PolicyType(Enum): - FCFS = 'fcfs' - BACKFILL = 'backfill' - PRIORITY = 'priority' - SJF = 'sjf' - #DEADLINE = 'deadline' # not yet supported - - -class Policy: - - def __init__(self, strategy): - self.strategy = PolicyType(strategy) - - def sort_jobs(self, jobs): - if self.strategy == PolicyType.FCFS or self.strategy == PolicyType.BACKFILL: - return sorted(jobs, key=lambda job: job.submit_time) - elif self.strategy == PolicyType.SJF: - return sorted(jobs, key=lambda job: job.wall_time) - elif self.strategy == PolicyType.PRIORITY: - return sorted(jobs, key=lambda job: job.priority, reverse=True) - else: - raise ValueError(f"Unknown policy type: {self.policy_type}") - - def find_backfill_job(self, queue, num_free_nodes, current_time): - """ This implementation is based on pseudocode from Leonenkov and Zhumatiy. - "Introducing new backfill-based scheduler for slurm resource manager." - Procedia computer science 66 (2015): 661-669. """ - - first_job = queue[0] - - for job in queue: job.end_time = current_time + job.wall_time - - # Sort jobs according to their termination time (end_time) - sorted_queue = sorted(queue, key=lambda job: job.end_time) - - # Compute shadow time - loop over the list and collect nodes until the - # number of available nodes is sufficient for the first job in the queue - sum_nodes = 0 - shadow_time = None - for job in sorted_queue: - sum_nodes += job.nodes_required - if sum_nodes >= first_job.nodes_required: - shadow_time = current_time + job.wall_time - num_extra_nodes = sum_nodes - job.nodes_required - break - - # Find backfill job - backfill_job = None - for job in queue: - # condition1 checks that the job ends before first_job starts - condition1 = job.nodes_required <= num_free_nodes \ - and current_time + job.wall_time < shadow_time - # condition2 checks that the job does not interfere with first_job - condition2 = job.nodes_required <= min(num_free_nodes, num_extra_nodes) - - if condition1 or condition2: - backfill_job = job - break - - return backfill_job diff --git a/raps/scheduler.py b/raps/scheduler.py deleted file mode 100644 index 721d6e8..0000000 --- a/raps/scheduler.py +++ /dev/null @@ -1,460 +0,0 @@ -"""A module for job scheduling and simulation in a distributed computing environment. - -This module provides classes and functions for managing job scheduling and simulating the behavior -of a distributed computing system. It includes functionalities for scheduling jobs based on various -policies, simulating the passage of time, handling node failures, and generating statistics about -the simulation. - -Classes: -- JobState: An enumeration representing the states of a job. -- Job: A class representing a job to be scheduled and executed. -- TickData: A dataclass representing the state output from the simulation each tick. -- Scheduler: A class for job scheduling and simulation management. - -Functions: -- summarize_ranges: A utility function to summarize ranges of values. -- expand_ranges: A utility function to expand ranges of values into individual elements. - -Dependencies: -- numpy: For numerical computations and array manipulations. -- dataclasses: For creating data classes with less boilerplate code. -- pandas: For data manipulation and DataFrame generation. -- enum: For creating enumerations. -- scipy.stats: For statistical distributions and random number generation. -- typing: For type hints and annotations. - -Config parameters used: -- TRACE_QUANTA: The quantum of time for tracing job CPU and GPU utilization. -- MTBF: Mean Time Between Failures, used for node failure simulation. -- POWER_COST: Cost of power consumption per unit, used for calculating total cost. -- UI_UPDATE_FREQ: Frequency of updating the user interface. -- MAX_TIME: Maximum simulation time. -- POWER_UPDATE_FREQ: Frequency of updating power-related metrics. -- POWER_DF_HEADER: Header for the power related components of DataFrame. -- POWER_CDU: Power consumption of CDU. -- TOTAL_NODES: Total number of nodes in the system. -- COOLING_EFFICIENCY: Cooling efficiency factor. - -This module can be used to simulate job scheduling algorithms, analyze system behavior, and -optimize resource utilization in distributed computing environments. -""" -from typing import Optional -import dataclasses -import numpy as np - -from scipy.stats import weibull_min -import pandas as pd - -from .job import Job, JobState -from .account import Accounts -from .network import network_utilization -from .policy import Policy, PolicyType -from .utils import summarize_ranges, expand_ranges, write_dict_to_file - - -@dataclasses.dataclass -class TickData: - """ Represents the state output from the simulation each tick """ - current_time: int - completed: list[Job] - running: list[Job] - queue: list[Job] - down_nodes: list[int] - power_df: Optional[pd.DataFrame] - p_flops: Optional[float] - g_flops_w: Optional[float] - system_util: float - fmu_inputs: Optional[dict] - fmu_outputs: Optional[dict] - num_active_nodes: int - num_free_nodes: int - - -def get_utilization(trace, time_quanta_index): - if isinstance(trace, (list, np.ndarray)): - return trace[time_quanta_index] - elif isinstance(trace, (int, float)): - return float(trace) - else: - raise TypeError(f"Invalid type for utilization: {type(trace)}.") - - -class Scheduler: - """Job scheduler and simulation manager.""" - def __init__(self, *, power_manager, flops_manager, cooling_model=None, config, **kwargs): - self.config = config - self.down_nodes = summarize_ranges(self.config['DOWN_NODES']) - self.available_nodes = list(set(range(self.config['TOTAL_NODES'])) - set(self.config['DOWN_NODES'])) - self.num_free_nodes = len(self.available_nodes) - self.num_active_nodes = self.config['TOTAL_NODES'] - self.num_free_nodes - len(self.config['DOWN_NODES']) - self.running = [] - self.queue = [] - self.accounts = Accounts() - if 'accounts_json' in kwargs and kwargs['accounts_json']: - self.accounts.initialize_accounts_from_json(kwargs.get('accounts_json')) - self.jobs_completed = 0 - self.current_time = 0 - self.cooling_model = cooling_model - self.sys_power = 0 - self.power_manager = power_manager - self.flops_manager = flops_manager - self.debug = kwargs.get('debug') - self.output = kwargs.get('output') - self.replay = kwargs.get('replay') - self.policy = Policy(strategy=kwargs.get('schedule')) - self.sys_util_history = [] - self.history = [] - - - def add_job(self, job): - self.queue.append(job) - self.queue = self.policy.sort_jobs(self.queue) - - - def assign_nodes_to_job(self, job): - """Helper function to assign nodes to a job and update available nodes.""" - if len(self.available_nodes) < job.nodes_required: - # If there are not enough nodes, return or raise an error (handle as needed) - raise ValueError(f"Not enough available nodes to schedule job {job.id}") - - if job.requested_nodes: # replay case - # If the job has requested specific nodes, assign them - job.scheduled_nodes = job.requested_nodes - mask = ~np.isin(self.available_nodes, job.scheduled_nodes) - self.available_nodes = np.array(self.available_nodes) - self.available_nodes = self.available_nodes[mask] - self.available_nodes = self.available_nodes.tolist() - else: # synthetic or reschedule case - # Assign the nodes from available pool - job.scheduled_nodes = self.available_nodes[:job.nodes_required] - self.available_nodes = self.available_nodes[job.nodes_required:] - - # Set job start and end times - job.start_time = self.current_time - job.end_time = self.current_time + job.wall_time - - # Mark the job as running - job.state = JobState.RUNNING - self.running.append(job) - - - def schedule(self, jobs): - """Schedule jobs""" - for job_info in jobs: - job = Job(job_info, self.current_time) - self.add_job(job) - - while self.queue: - - # Try scheduling the first job in the queue - job = self.queue.pop(0) - synthetic_bool = len(self.available_nodes) >= job.nodes_required - telemetry_bool = job.requested_nodes and set(job.requested_nodes).issubset(set(self.available_nodes)) - - if synthetic_bool or telemetry_bool: - - # Schedule job - self.assign_nodes_to_job(job) - self.history.append(dict(id=job.id, time=self.current_time, nodes=job.nodes_required, wall_time=job.wall_time)) - - if self.debug: - scheduled_nodes = summarize_ranges(job.scheduled_nodes) - print(f"t={self.current_time}: Scheduled job with wall time", - f"{job.wall_time} on nodes {scheduled_nodes}") - - else: - # If the job cannot be scheduled, either try backfilling or requeue it - if self.queue and self.policy.strategy == PolicyType.BACKFILL: - self.queue.insert(0, job) - backfill_job = self.policy.find_backfill_job(self.queue, len(self.available_nodes), self.current_time) - if backfill_job: - self.assign_nodes_to_job(backfill_job) - self.queue.remove(backfill_job) - if self.debug: - scheduled_nodes = summarize_ranges(backfill_job.scheduled_nodes) - print(f"t={self.current_time}: Backfilling job {backfill_job.id} with wall time", - f"{backfill_job.wall_time} on nodes {scheduled_nodes}") - else: - self.queue.append(job) # Note, this should be fixed. It shouldn't go to the end of the queue. - break - - def tick(self): - """Simulate a timestep.""" - completed_jobs = [job for job in self.running if job.end_time - is not None and job.end_time <= self.current_time] - completed_job_stats = [] - # Simulate node failure - newly_downed_nodes = self.node_failure(self.config['MTBF']) - - # Update active/free nodes - self.num_free_nodes = len(self.available_nodes) - self.num_active_nodes = self.config['TOTAL_NODES'] - self.num_free_nodes \ - - len(expand_ranges(self.down_nodes)) - - # Update running time for all running jobs - for job in self.running: - - if job.end_time == self.current_time: - job.state = JobState.COMPLETED - - if job.state == JobState.RUNNING: - # Deal with node that fails during the course of a running job - #if any(node in job.scheduled_nodes for node in newly_downed_nodes): - if False: # currently disabled b/c not working correctly - - # Update job state to FAILED - job.state = JobState.FAILED - - # Release all nodes except the downed node - for node in job.scheduled_nodes: - if node not in newly_downed_nodes: - self.available_nodes.append(node) - self.available_nodes.sort() - - # Remove job from the list of running jobs - self.running.remove(job) - - job.running_time = self.current_time - job.start_time - - time_quanta_index = (self.current_time - job.start_time) \ - // self.config['TRACE_QUANTA'] - - cpu_util = get_utilization(job.cpu_trace, time_quanta_index) - gpu_util = get_utilization(job.gpu_trace, time_quanta_index) - - if len(job.ntx_trace) and len(job.nrx_trace): - net_tx = get_utilization(job.ntx_trace, time_quanta_index) - net_rx = get_utilization(job.nrx_trace, time_quanta_index) - net_util = network_utilization(net_tx, net_rx) - else: - net_util = 0 - - self.flops_manager.update_flop_state(job.scheduled_nodes, cpu_util, gpu_util) - job.power = self.power_manager.update_power_state(job.scheduled_nodes, - cpu_util, gpu_util, net_util) - - if job.running_time % self.config['TRACE_QUANTA'] == 0: - job.power_history.append(job.power) - - for job in completed_jobs: - # Release the nodes used by this job - self.available_nodes.extend(job.scheduled_nodes) - self.available_nodes.sort() - - if self.debug: - print( - f"\nt={self.current_time}: " - f"Releasing {len(job.scheduled_nodes)} nodes from completed job; " - f"{len(self.available_nodes)} nodes available after release." - ) - - # Set nodes back to idle power - node_indices = np.array(job.scheduled_nodes) - if self.debug: - print("setting idle nodes:", node_indices) - self.power_manager.set_idle(node_indices) - self.flops_manager.update_flop_state(job.scheduled_nodes, \ - cpu_util=0, gpu_util=0) - - # Remove job from list of running jobs - self.running.remove(job) - scheduled_nodes = summarize_ranges(job.scheduled_nodes) - - if self.debug: - print(f"Released {scheduled_nodes}") - self.jobs_completed += 1 - job_stats = job.statistics() - if self.debug: - print(job_stats) - completed_job_stats.append(job_stats) - self.accounts.update_account_statistics(job_stats) - if self.output: - # output power trace - with open(self.opath / f'job-power-{job.id}.txt', 'w') as file: - print(*job.power_history, sep=', ', file=file) - write_dict_to_file(vars(job_stats),self.opath / f'job-stats-{job.account}.json') - # Ask scheduler to schedule any jobs waiting in queue - self.schedule([]) - - # Update the power array UI component - rack_power, rect_losses = self.power_manager.compute_rack_power() - sivoc_losses = self.power_manager.compute_sivoc_losses() - rack_loss = rect_losses + sivoc_losses - - # Update system utilization - system_util = self.num_active_nodes / self.config['AVAILABLE_NODES'] * 100 - self.sys_util_history.append((self.current_time, system_util)) - - # Render the updated layout - power_df = None - cooling_inputs, cooling_outputs = None, None - - # Update power history every 15s - if self.current_time % self.config['POWER_UPDATE_FREQ'] == 0: - total_power_kw = sum(row[-1] for row in rack_power) + self.config['NUM_CDUS'] * self.config['POWER_CDU'] / 1000.0 - total_loss_kw = sum(row[-1] for row in rack_loss) - self.power_manager.history.append((self.current_time, total_power_kw)) - self.sys_power = total_power_kw - self.power_manager.loss_history.append((self.current_time, total_loss_kw)) - output_df = self.power_manager.get_power_df(rack_power, rack_loss) - pflops = self.flops_manager.get_system_performance() / 1E15 - gflop_per_watt = pflops * 1E6 / (total_power_kw * 1000) - else: - pflops, gflop_per_watt = None, None - - if self.current_time % self.config['POWER_UPDATE_FREQ'] == 0: - if self.cooling_model: - # Power for NUM_CDUS (25 for Frontier) - cdu_power = rack_power.T[-1] * 1000 - runtime_values = self.cooling_model.generate_runtime_values(cdu_power, self) - - # FMU inputs are N powers and the wetbulb temp - fmu_inputs = self.cooling_model.generate_fmu_inputs(runtime_values, - uncertainties=self.power_manager.uncertainties) - cooling_inputs, cooling_outputs = ( - self.cooling_model.step(self.current_time, fmu_inputs, self.config['POWER_UPDATE_FREQ']) - ) - - # Get a dataframe of the power data - power_df = self.power_manager.get_power_df(rack_power, rack_loss) - else: - # Get a dataframe of the power data - power_df = self.power_manager.get_power_df(rack_power, rack_loss) - - tick_data = TickData( - current_time = self.current_time, - completed = completed_jobs, - running = self.running, - queue = self.queue, - down_nodes = expand_ranges(self.down_nodes), - power_df = power_df, - p_flops = pflops, - g_flops_w = gflop_per_watt, - system_util = system_util, - fmu_inputs = cooling_inputs, - fmu_outputs = cooling_outputs, - num_active_nodes = self.num_active_nodes, - num_free_nodes = self.num_free_nodes, - ) - - self.current_time += 1 - return tick_data - - def get_gauge_limits(self): - """For setting max values in dashboard gauges""" - peak_flops = self.flops_manager.get_rpeak() - peak_power = self.power_manager.get_peak_power() - gflops_per_watt_max = peak_flops / 1E9 / peak_power - - if self.debug: - print(f"System Rpeak: {peak_flops/1E15:.2f} PFLOPS") - print(f"Peak power: {peak_power/1E3:.0f} kW") - print(f"Max energy efficiency: {gflops_per_watt_max:.1f} GFLOPS/W") - - limits = {'peak_flops': peak_flops, 'peak_power': peak_power, \ - 'g_flops_w_peak': gflops_per_watt_max} - return limits - - def run_simulation(self, jobs, timesteps): - """ Generator that yields after each simulation tick """ - last_submit_time = 0 - self.timesteps = timesteps - if self.debug: - limits = self.get_gauge_limits() - - for timestep in range(timesteps): - # Print the current timestep for this partition - if timestep % self.config['UI_UPDATE_FREQ'] == 0: - sys_util = self.sys_util_history[-1][1] if self.sys_util_history else 0 - - while self.current_time >= last_submit_time and jobs: - job = jobs.pop(0) - self.schedule([job]) - if jobs: - last_submit_time = job['submit_time'] - else: # No more jobs, set to infinity to avoid triggering again - last_submit_time = float('inf') - yield self.tick() - - # Stop the simulation if no more jobs running or are in the queue - if not self.queue and not self.running and not self.replay: - print(f"[DEBUG] {self.config['system_name']} - Stopping simulation at time {self.current_time}") - break - if self.debug and timestep % self.config['UI_UPDATE_FREQ'] == 0: - print(".", end="", flush=True) - - def get_history(self): - return self.history - - def get_stats(self): - """ Return output statistics """ - sum_values = lambda values : sum(x[1] for x in values) - min_value = lambda values : min(x[1] for x in values) - max_value = lambda values : max(x[1] for x in values) - num_samples = len(self.power_manager.history) - throughput = self.jobs_completed / self.timesteps * 3600 # jobs/hour - average_power_mw = sum_values(self.power_manager.history) / num_samples / 1000 - average_loss_mw = sum_values(self.power_manager.loss_history) / num_samples / 1000 - min_loss_mw = min_value(self.power_manager.loss_history) / 1000 - max_loss_mw = max_value(self.power_manager.loss_history) / 1000 - self.power_manager.loss_history_percentage = \ - [(x[0], x[1] / y[1]) for x, y in zip(self.power_manager.loss_history, \ - self.power_manager.history)] - min_loss_pct = min_value(self.power_manager.loss_history_percentage) - max_loss_pct = max_value(self.power_manager.loss_history_percentage) - - loss_fraction = average_loss_mw / average_power_mw - efficiency = 1 - loss_fraction - # compute total power consumed by multiplying average power times length of simulation - total_energy_consumed = average_power_mw * self.timesteps / 3600 # MW-hr - # From https://www.epa.gov/energy/greenhouse-gases-equivalencies-\ - # calculator-calculations-and-references - emissions = total_energy_consumed * 852.3 / 2204.6 / efficiency - total_cost = total_energy_consumed * 1000 * self.config['POWER_COST'] # total cost in dollars - - stats = { - 'num_samples': num_samples, - 'jobs completed': self.jobs_completed, - 'throughput': f'{throughput:.2f} jobs/hour', - 'jobs still running': [job.id for job in self.running], - 'jobs still in queue': [job.id for job in self.queue], - 'average power': f'{average_power_mw:.2f} MW', - 'min loss': f'{min_loss_mw:.2f} MW ({min_loss_pct*100:.2f}%)', - 'average loss': f'{average_loss_mw:.2f} MW ({loss_fraction*100:.2f}%)', - 'max loss': f'{max_loss_mw:.2f} MW ({max_loss_pct*100:.2f}%)', - 'system power efficiency': f'{efficiency*100:.2f}', - 'total energy consumed': f'{total_energy_consumed:.2f} MW-hr', - 'carbon emissions': f'{emissions:.2f} metric tons CO2', - 'total cost': f'${total_cost:.2f}' - } - - return stats - - def node_failure(self, mtbf): - """Simulate node failure.""" - shape_parameter = 1.5 - scale_parameter = mtbf * 3600 # to seconds - - # Create a NumPy array of node indices, excluding down nodes - down_nodes = expand_ranges(self.down_nodes) - all_nodes = np.setdiff1d(np.arange(self.config['TOTAL_NODES']), - np.array(down_nodes, dtype=int)) - - # Sample the Weibull distribution for all nodes at once - random_values = weibull_min.rvs(shape_parameter, - scale=scale_parameter, - size=all_nodes.size) - - # Identify nodes that have failed - failure_threshold = 0.1 - failed_nodes_mask = random_values < failure_threshold - newly_downed_nodes = all_nodes[failed_nodes_mask] - - # Update available and down nodes - for node_index in newly_downed_nodes: - if node_index in self.available_nodes: - self.available_nodes.remove(node_index) - self.down_nodes.append(str(node_index)) - self.power_manager.set_idle(node_index) - - return newly_downed_nodes.tolist() diff --git a/raps/telemetry.py b/raps/telemetry.py index 9c006a2..9aa72bc 100644 --- a/raps/telemetry.py +++ b/raps/telemetry.py @@ -28,7 +28,7 @@ from datetime import datetime from tqdm import tqdm from .config import ConfigManager -from .scheduler import Job +from .job import Job from .plotting import plot_submit_times, plot_nodes_histogram from .utils import next_arrival diff --git a/raps/ui.py b/raps/ui.py index dd7725f..fc83e6b 100644 --- a/raps/ui.py +++ b/raps/ui.py @@ -7,12 +7,12 @@ from rich.panel import Panel from rich.table import Table from .utils import summarize_ranges, convert_seconds from .constants import ELLIPSES -from .scheduler import TickData, Scheduler +from .engine import TickData, Engine class LayoutManager: - def __init__(self, layout_type, scheduler: Scheduler, debug, **config): - self.scheduler = scheduler + def __init__(self, layout_type, engine: Engine, debug, **config): + self.engine = engine self.config = config self.console = Console() self.layout = Layout() @@ -375,9 +375,9 @@ class LayoutManager: self.layout["lower"].update(Panel(Align(total_table, align="center"), title="Power and Performance")) def update(self, data: TickData): - uncertainties = self.scheduler.power_manager.uncertainties + uncertainties = self.engine.power_manager.uncertainties - if self.scheduler.cooling_model: + if self.engine.cooling_model: self.update_powertemp_array( data.power_df, data.fmu_outputs, data.p_flops, data.g_flops_w, data.system_util, uncertainties = uncertainties, @@ -401,11 +401,11 @@ class LayoutManager: def run(self, jobs, timesteps): """ Runs the UI, blocking until the simulation is complete """ - for data in self.scheduler.run_simulation(jobs, timesteps): + for data in self.engine.run_simulation(jobs, timesteps): if data.current_time % self.config['UI_UPDATE_FREQ'] == 0: self.update(data) self.render() def run_stepwise(self, jobs, timesteps): """ Prepares the UI and returns a generator for the simulation """ - return self.scheduler.run_simulation(jobs, timesteps) + return self.engine.run_simulation(jobs, timesteps) -- GitLab From a8d186eb298b6dec137673d983a848ee62ab4e3f Mon Sep 17 00:00:00 2001 From: Wes Brewer Date: Tue, 4 Feb 2025 23:27:03 -0500 Subject: [PATCH 02/15] Add schedulers directory. Change DefaultScheduler class to just Scheduler --- raps/schedulers/__init__.py | 6 ++ raps/schedulers/default.py | 149 ++++++++++++++++++++++++++++++++++++ 2 files changed, 155 insertions(+) create mode 100644 raps/schedulers/__init__.py create mode 100644 raps/schedulers/default.py diff --git a/raps/schedulers/__init__.py b/raps/schedulers/__init__.py new file mode 100644 index 0000000..ca3431e --- /dev/null +++ b/raps/schedulers/__init__.py @@ -0,0 +1,6 @@ +from importlib import import_module + +def load_scheduler(scheduler_type="default"): + """Dynamically loads a scheduler by type.""" + module = import_module(f".{scheduler_type}", package="raps.schedulers") + return getattr(module, f"Scheduler") diff --git a/raps/schedulers/default.py b/raps/schedulers/default.py new file mode 100644 index 0000000..a661be8 --- /dev/null +++ b/raps/schedulers/default.py @@ -0,0 +1,149 @@ +from enum import Enum +from ..job import Job, JobState +from ..utils import summarize_ranges + + +class PolicyType(Enum): + """Supported scheduling policies.""" + FCFS = 'fcfs' + BACKFILL = 'backfill' + PRIORITY = 'priority' + SJF = 'sjf' + + +class Scheduler: + """ Default job scheduler with various scheduling policies. """ + + + def __init__(self, config, policy): + self.config = config + self.policy = PolicyType(policy) + + + def sort_jobs(self, queue): + """Sort jobs based on the selected scheduling policy.""" + if self.policy == PolicyType.FCFS or self.policy == PolicyType.BACKFILL: + return sorted(queue, key=lambda job: job.submit_time) + elif self.policy == PolicyType.SJF: + return sorted(queue, key=lambda job: job.wall_time) + elif self.policy == PolicyType.PRIORITY: + return sorted(queue, key=lambda job: job.priority, reverse=True) + else: + raise ValueError(f"Unknown policy type: {self.policy}") + + + def assign_nodes_to_job(self, job, available_nodes, current_time): + """Assigns nodes to a job and updates available nodes.""" + if len(available_nodes) < job.nodes_required: + raise ValueError(f"Not enough available nodes to schedule job {job.id}") + + if job.requested_nodes: # Telemetry replay case + job.scheduled_nodes = job.requested_nodes + available_nodes[:] = [n for n in available_nodes if n not in job.scheduled_nodes] + else: # Synthetic or reschedule case + job.scheduled_nodes = available_nodes[:job.nodes_required] + available_nodes[:] = available_nodes[job.nodes_required:] + + # Set job start and end times + job.start_time = current_time + job.end_time = current_time + job.wall_time + job.state = JobState.RUNNING # Job is now running + + + def schedule(self, queue, running, available_nodes, current_time): + """Schedules jobs from the queue to available nodes.""" + while queue: + job = queue.pop(0) + if len(available_nodes) >= job.nodes_required: + job.scheduled_nodes = available_nodes[:job.nodes_required] + available_nodes[:] = available_nodes[job.nodes_required:] + job.start_time = current_time + job.end_time = current_time + job.wall_time + job.state = JobState.RUNNING + running.append(job) + #print(f"t={current_time}: Scheduled job {job.id} on nodes {summarize_ranges(job.scheduled_nodes)}") + else: + queue.insert(0, job) # Keep the job at the front if it can't be scheduled + break + + + def schedule2(self, queue, running, available_nodes, current_time, debug=False): + """Schedules jobs from the queue to available nodes.""" + queue = self.sort_jobs(queue) # Ensure queue is sorted before scheduling + + while queue: + + # Try scheduling the first job in the queue + job = queue.pop(0) + synthetic_bool = len(available_nodes) >= job.nodes_required + telemetry_bool = job.requested_nodes and set(job.requested_nodes).issubset(set(available_nodes)) + + if synthetic_bool or telemetry_bool: + + # Schedule job + self.assign_nodes_to_job(job, available_nodes, current_time) + running.append(job) + #self.history.append(dict(id=job.id, time=current_time, nodes=job.nodes_required, wall_time=job.wall_time)) + + if debug: + scheduled_nodes = summarize_ranges(job.scheduled_nodes) + print(f"t={current_time}: Scheduled job with wall time", + f"{job.wall_time} on nodes {scheduled_nodes}") + + else: + # If the job cannot be scheduled, either try backfilling or requeue it + if queue and self.policy == PolicyType.BACKFILL: + queue.insert(0, job) + backfill_job = self.find_backfill_job(queue, len(available_nodes), current_time) + if backfill_job: + self.assign_nodes_to_job(backfill_job) + self.queue.remove(backfill_job) + if self.debug: + scheduled_nodes = summarize_ranges(backfill_job.scheduled_nodes) + print(f"t={self.current_time}: Backfilling job {backfill_job.id} with wall time", + f"{backfill_job.wall_time} on nodes {scheduled_nodes}") + else: + queue.append(job) # Note, this should be fixed. It shouldn't go to the end of the queue. + break + + + + def find_backfill_job(self, queue, num_free_nodes, current_time): + """Finds a backfill job based on available nodes and estimated completion times. + + Based on pseudocode from Leonenkov and Zhumatiy, 'Introducing new backfill-based + scheduler for slurm resource manager.' Procedia computer science 66 (2015): 661-669. + """ + + if not queue: + return None + + first_job = queue[0] + + for job in queue: + job.end_time = current_time + job.wall_time # Estimate end time + + # Sort jobs according to their termination time (end_time) + sorted_queue = sorted(queue, key=lambda job: job.end_time) + + # Compute shadow time by accumulating nodes + sum_nodes = 0 + shadow_time = None + num_extra_nodes = 0 + + for job in sorted_queue: + sum_nodes += job.nodes_required + if sum_nodes >= first_job.nodes_required: + shadow_time = current_time + job.wall_time + num_extra_nodes = sum_nodes - job.nodes_required + break + + # Find backfill job + for job in queue: + condition1 = job.nodes_required <= num_free_nodes and current_time + job.wall_time < shadow_time + condition2 = job.nodes_required <= min(num_free_nodes, num_extra_nodes) + + if condition1 or condition2: + return job + + return None -- GitLab From bd90a5582237e11ce38a5180b159d4d258a4fbc5 Mon Sep 17 00:00:00 2001 From: Wes Brewer Date: Wed, 5 Feb 2025 11:59:59 -0500 Subject: [PATCH 03/15] Fix issue with `python main.py -w benchmark` hanging --- raps/engine.py | 9 ++++++--- raps/schedulers/default.py | 15 ++++++++------- 2 files changed, 14 insertions(+), 10 deletions(-) diff --git a/raps/engine.py b/raps/engine.py index ba4014c..43ce74f 100644 --- a/raps/engine.py +++ b/raps/engine.py @@ -194,12 +194,15 @@ class Engine: for timestep in range(timesteps): while self.current_time >= last_submit_time and jobs: - self.scheduler.schedule(self.queue, self.running, self.available_nodes, self.current_time) + + job = jobs.pop(0) + job = Job(job_info, self.current_time) + self.scheduler.schedule([job], self.running, self.available_nodes, self.current_time) if jobs: last_submit_time = job.submit_time - else: # No more jobs, set submit_time to infinity to avoid triggering again - last_submit_time = float('inf') + else: + last_submit_time = float('inf') # Avoid infinite loop yield self.tick() diff --git a/raps/schedulers/default.py b/raps/schedulers/default.py index a661be8..3e92699 100644 --- a/raps/schedulers/default.py +++ b/raps/schedulers/default.py @@ -50,10 +50,12 @@ class Scheduler: job.state = JobState.RUNNING # Job is now running - def schedule(self, queue, running, available_nodes, current_time): - """Schedules jobs from the queue to available nodes.""" - while queue: - job = queue.pop(0) + def schedule(self, job_list, running, available_nodes, current_time): + """Schedules jobs from the given job_list directly, modifying available_nodes.""" + + while job_list: + job = job_list.pop(0) + if len(available_nodes) >= job.nodes_required: job.scheduled_nodes = available_nodes[:job.nodes_required] available_nodes[:] = available_nodes[job.nodes_required:] @@ -61,10 +63,9 @@ class Scheduler: job.end_time = current_time + job.wall_time job.state = JobState.RUNNING running.append(job) - #print(f"t={current_time}: Scheduled job {job.id} on nodes {summarize_ranges(job.scheduled_nodes)}") else: - queue.insert(0, job) # Keep the job at the front if it can't be scheduled - break + job_list.insert(0, job) # Put job back at the front if it can't be scheduled + break # Stop scheduling if no nodes are available def schedule2(self, queue, running, available_nodes, current_time, debug=False): -- GitLab From 185d3c8e9101fa229d77699797f01ff53da24d5f Mon Sep 17 00:00:00 2001 From: Wes Brewer Date: Wed, 5 Feb 2025 14:33:14 -0500 Subject: [PATCH 04/15] Fix scheduling bug --- raps/engine.py | 20 +++++++++++--------- raps/schedulers/default.py | 35 ++++++++++++++++++++--------------- 2 files changed, 31 insertions(+), 24 deletions(-) diff --git a/raps/engine.py b/raps/engine.py index 43ce74f..1d976ed 100644 --- a/raps/engine.py +++ b/raps/engine.py @@ -103,6 +103,9 @@ class Engine: self.jobs_completed += 1 job_stats = job.statistics() self.accounts.update_account_statistics(job_stats) + # Free nodes and ensure there are no duplicates + self.available_nodes.extend(job.scheduled_nodes) + self.available_nodes = sorted(set(self.available_nodes)) # Ask scheduler to schedule any jobs waiting in queue self.scheduler.schedule(self.queue, self.running, self.available_nodes, self.current_time) @@ -172,7 +175,6 @@ class Engine: return tick_data - def get_utilization(self, trace, time_quanta_index): """Retrieve utilization value for a given trace at a specific time quanta index.""" if isinstance(trace, (list, np.ndarray)): @@ -193,16 +195,16 @@ class Engine: self.add_job(job) for timestep in range(timesteps): - while self.current_time >= last_submit_time and jobs: + #while self.current_time >= last_submit_time and jobs: - job = jobs.pop(0) - job = Job(job_info, self.current_time) - self.scheduler.schedule([job], self.running, self.available_nodes, self.current_time) + #job = self.queue.pop(0) + #self.scheduler.schedule([job], self.running, self.available_nodes, self.current_time) + self.scheduler.schedule(self.queue, self.running, self.available_nodes, self.current_time) - if jobs: - last_submit_time = job.submit_time - else: - last_submit_time = float('inf') # Avoid infinite loop + #if jobs: + # last_submit_time = job.submit_time + #else: + # last_submit_time = float('inf') # Avoid infinite loop yield self.tick() diff --git a/raps/schedulers/default.py b/raps/schedulers/default.py index 3e92699..2e5506b 100644 --- a/raps/schedulers/default.py +++ b/raps/schedulers/default.py @@ -50,27 +50,32 @@ class Scheduler: job.state = JobState.RUNNING # Job is now running - def schedule(self, job_list, running, available_nodes, current_time): - """Schedules jobs from the given job_list directly, modifying available_nodes.""" + def schedule(self, queue, running, available_nodes, current_time, debug=False): + # Sort the queue in place. + queue[:] = self.sort_jobs(queue) - while job_list: - job = job_list.pop(0) - - if len(available_nodes) >= job.nodes_required: - job.scheduled_nodes = available_nodes[:job.nodes_required] - available_nodes[:] = available_nodes[job.nodes_required:] - job.start_time = current_time - job.end_time = current_time + job.wall_time - job.state = JobState.RUNNING + # Iterate over a copy of the queue since we might remove items + for job in queue[:]: + synthetic_bool = len(available_nodes) >= job.nodes_required + telemetry_bool = job.requested_nodes and set(job.requested_nodes).issubset(set(available_nodes)) + + if synthetic_bool or telemetry_bool: + self.assign_nodes_to_job(job, available_nodes, current_time) running.append(job) + queue.remove(job) # Remove the job from the queue + if debug: + scheduled_nodes = summarize_ranges(job.scheduled_nodes) + print(f"t={current_time}: Scheduled job with wall time {job.wall_time} on nodes {scheduled_nodes}") else: - job_list.insert(0, job) # Put job back at the front if it can't be scheduled - break # Stop scheduling if no nodes are available + # Optionally, if you have a BACKFILL policy, you can attempt that here. + # Otherwise, just leave the job in the queue. + continue + def schedule2(self, queue, running, available_nodes, current_time, debug=False): """Schedules jobs from the queue to available nodes.""" - queue = self.sort_jobs(queue) # Ensure queue is sorted before scheduling + queue[:] = self.sort_jobs(queue) # Ensure queue is sorted before scheduling while queue: @@ -97,7 +102,7 @@ class Scheduler: queue.insert(0, job) backfill_job = self.find_backfill_job(queue, len(available_nodes), current_time) if backfill_job: - self.assign_nodes_to_job(backfill_job) + self.assign_nodes_to_job(backfill_job, available_nodes, current_time) self.queue.remove(backfill_job) if self.debug: scheduled_nodes = summarize_ranges(backfill_job.scheduled_nodes) -- GitLab From 1672500277479ecfd2690ed5a5f7382849de9e5f Mon Sep 17 00:00:00 2001 From: Wes Brewer Date: Wed, 5 Feb 2025 14:44:09 -0500 Subject: [PATCH 05/15] More cleanup from previous fixes --- raps/engine.py | 11 +------- raps/schedulers/default.py | 52 +++++++------------------------------- 2 files changed, 10 insertions(+), 53 deletions(-) diff --git a/raps/engine.py b/raps/engine.py index 1d976ed..bbc9d47 100644 --- a/raps/engine.py +++ b/raps/engine.py @@ -195,19 +195,9 @@ class Engine: self.add_job(job) for timestep in range(timesteps): - #while self.current_time >= last_submit_time and jobs: - #job = self.queue.pop(0) - #self.scheduler.schedule([job], self.running, self.available_nodes, self.current_time) self.scheduler.schedule(self.queue, self.running, self.available_nodes, self.current_time) - #if jobs: - # last_submit_time = job.submit_time - #else: - # last_submit_time = float('inf') # Avoid infinite loop - - yield self.tick() - # Stop the simulation if no more jobs are running or in the queue if not self.queue and not self.running and not self.replay: print(f"[DEBUG] {self.config['system_name']} - Stopping simulation at time {self.current_time}") @@ -215,6 +205,7 @@ class Engine: if self.debug and timestep % self.config['UI_UPDATE_FREQ'] == 0: print(".", end="", flush=True) + yield self.tick() def get_stats(self): """ Return output statistics """ diff --git a/raps/schedulers/default.py b/raps/schedulers/default.py index 2e5506b..46dadd9 100644 --- a/raps/schedulers/default.py +++ b/raps/schedulers/default.py @@ -53,7 +53,7 @@ class Scheduler: def schedule(self, queue, running, available_nodes, current_time, debug=False): # Sort the queue in place. queue[:] = self.sort_jobs(queue) - + # Iterate over a copy of the queue since we might remove items for job in queue[:]: synthetic_bool = len(available_nodes) >= job.nodes_required @@ -65,53 +65,19 @@ class Scheduler: queue.remove(job) # Remove the job from the queue if debug: scheduled_nodes = summarize_ranges(job.scheduled_nodes) - print(f"t={current_time}: Scheduled job with wall time {job.wall_time} on nodes {scheduled_nodes}") + print(f"t={current_time}: Scheduled job {job.id} with wall time {job.wall_time} on nodes {scheduled_nodes}") else: - # Optionally, if you have a BACKFILL policy, you can attempt that here. - # Otherwise, just leave the job in the queue. - continue - - - - def schedule2(self, queue, running, available_nodes, current_time, debug=False): - """Schedules jobs from the queue to available nodes.""" - queue[:] = self.sort_jobs(queue) # Ensure queue is sorted before scheduling - - while queue: - - # Try scheduling the first job in the queue - job = queue.pop(0) - synthetic_bool = len(available_nodes) >= job.nodes_required - telemetry_bool = job.requested_nodes and set(job.requested_nodes).issubset(set(available_nodes)) - - if synthetic_bool or telemetry_bool: - - # Schedule job - self.assign_nodes_to_job(job, available_nodes, current_time) - running.append(job) - #self.history.append(dict(id=job.id, time=current_time, nodes=job.nodes_required, wall_time=job.wall_time)) - - if debug: - scheduled_nodes = summarize_ranges(job.scheduled_nodes) - print(f"t={current_time}: Scheduled job with wall time", - f"{job.wall_time} on nodes {scheduled_nodes}") - - else: - # If the job cannot be scheduled, either try backfilling or requeue it - if queue and self.policy == PolicyType.BACKFILL: - queue.insert(0, job) + # Optionally, if you have a BACKFILL policy, attempt backfilling here. + if self.policy == PolicyType.BACKFILL: + # Try to find a backfill candidate from the entire queue. backfill_job = self.find_backfill_job(queue, len(available_nodes), current_time) if backfill_job: self.assign_nodes_to_job(backfill_job, available_nodes, current_time) - self.queue.remove(backfill_job) - if self.debug: + running.append(backfill_job) + queue.remove(backfill_job) + if debug: scheduled_nodes = summarize_ranges(backfill_job.scheduled_nodes) - print(f"t={self.current_time}: Backfilling job {backfill_job.id} with wall time", - f"{backfill_job.wall_time} on nodes {scheduled_nodes}") - else: - queue.append(job) # Note, this should be fixed. It shouldn't go to the end of the queue. - break - + print(f"t={current_time}: Backfilling job {backfill_job.id} with wall time {backfill_job.wall_time} on nodes {scheduled_nodes}") def find_backfill_job(self, queue, num_free_nodes, current_time): -- GitLab From cf3e0c941f1c8dd88f015d4ed261dd1c4430eb50 Mon Sep 17 00:00:00 2001 From: Wes Brewer Date: Wed, 5 Feb 2025 15:06:08 -0500 Subject: [PATCH 06/15] Add resmgr.py with ResourceManager class --- raps/engine.py | 33 ++++++++++++++++--------- raps/resmgr.py | 50 ++++++++++++++++++++++++++++++++++++++ raps/schedulers/default.py | 38 +++++++++-------------------- 3 files changed, 82 insertions(+), 39 deletions(-) create mode 100644 raps/resmgr.py diff --git a/raps/engine.py b/raps/engine.py index bbc9d47..7fbdb63 100644 --- a/raps/engine.py +++ b/raps/engine.py @@ -7,6 +7,7 @@ from .job import Job, JobState from .account import Accounts from .network import network_utilization from .utils import summarize_ranges, expand_ranges, write_dict_to_file +from .resmgr import ResourceManager from .schedulers import load_scheduler @@ -33,9 +34,12 @@ class Engine: def __init__(self, *, power_manager, flops_manager, cooling_model=None, config, **kwargs): self.config = config self.down_nodes = summarize_ranges(self.config['DOWN_NODES']) - self.available_nodes = list(set(range(self.config['TOTAL_NODES'])) - set(self.config['DOWN_NODES'])) - self.num_free_nodes = len(self.available_nodes) - self.num_active_nodes = self.config['TOTAL_NODES'] - self.num_free_nodes - len(self.config['DOWN_NODES']) + self.resource_manager = ResourceManager( + total_nodes=self.config['TOTAL_NODES'], + down_nodes=self.config['DOWN_NODES'] + ) + + # Initialize running and queue, etc. self.running = [] self.queue = [] self.accounts = Accounts() @@ -54,7 +58,11 @@ class Engine: # Get scheduler type from command-line args or default scheduler_type = kwargs.get('scheduler', 'default') - self.scheduler = load_scheduler(scheduler_type)(config=self.config, policy=kwargs.get('policy')) + self.scheduler = load_scheduler(scheduler_type)( + config=self.config, + policy=kwargs.get('policy'), + resource_manager=self.resource_manager + ) print(f"Using scheduler: {scheduler_type}") @@ -72,8 +80,10 @@ class Engine: newly_downed_nodes = self.node_failure(self.config['MTBF']) # Update active/free nodes - self.num_free_nodes = len(self.available_nodes) - self.num_active_nodes = self.config['TOTAL_NODES'] - self.num_free_nodes - len(expand_ranges(self.down_nodes)) + self.num_free_nodes = len(self.resource_manager.available_nodes) + self.num_active_nodes = self.config['TOTAL_NODES'] + - len(self.resource_manager.available_nodes) + - len(self.resource_manager.down_nodes) # Update running time for all running jobs for job in self.running: @@ -103,12 +113,11 @@ class Engine: self.jobs_completed += 1 job_stats = job.statistics() self.accounts.update_account_statistics(job_stats) - # Free nodes and ensure there are no duplicates - self.available_nodes.extend(job.scheduled_nodes) - self.available_nodes = sorted(set(self.available_nodes)) + # Free the nodes via the resource manager. + self.resource_manager.free_nodes_from_job(job) # Ask scheduler to schedule any jobs waiting in queue - self.scheduler.schedule(self.queue, self.running, self.available_nodes, self.current_time) + self.scheduler.schedule(self.queue, self.running, self.current_time) # Update the power array UI component rack_power, rect_losses = self.power_manager.compute_rack_power() @@ -120,7 +129,7 @@ class Engine: self.sys_util_history.append((self.current_time, system_util)) # Render the updated layout - power_df = None #self.power_manager.get_power_df() if self.power_manager else pd.DataFrame() + power_df = None cooling_inputs, cooling_outputs = None, None # Update power history every 15s @@ -196,7 +205,7 @@ class Engine: for timestep in range(timesteps): - self.scheduler.schedule(self.queue, self.running, self.available_nodes, self.current_time) + self.scheduler.schedule(self.queue, self.running, self.current_time) # Stop the simulation if no more jobs are running or in the queue if not self.queue and not self.running and not self.replay: diff --git a/raps/resmgr.py b/raps/resmgr.py new file mode 100644 index 0000000..d4fee8a --- /dev/null +++ b/raps/resmgr.py @@ -0,0 +1,50 @@ +from .job import JobState + +class ResourceManager: + def __init__(self, total_nodes, down_nodes): + self.total_nodes = total_nodes + # Maintain a set for down nodes (e.g., nodes that are offline) + self.down_nodes = set(down_nodes) + # Available nodes are those that are not down + self.available_nodes = sorted(set(range(total_nodes)) - self.down_nodes) + # You can track system utilization history here + self.sys_util_history = [] # list of (time, utilization) tuples + + def assign_nodes_to_job(self, job, current_time): + """Assigns nodes to a job and updates the available nodes.""" + if len(self.available_nodes) < job.nodes_required: + raise ValueError(f"Not enough available nodes to schedule job {job.id}") + + if job.requested_nodes: # Telemetry replay case + job.scheduled_nodes = job.requested_nodes + self.available_nodes = [n for n in self.available_nodes if n not in job.scheduled_nodes] + else: # Synthetic or reschedule case + job.scheduled_nodes = self.available_nodes[:job.nodes_required] + self.available_nodes = self.available_nodes[job.nodes_required:] + + # Set job start and end times + job.start_time = current_time + job.end_time = current_time + job.wall_time + job.state = JobState.RUNNING # Mark job as running + + def free_nodes_from_job(self, job): + """Frees the nodes that were allocated to a completed job.""" + if hasattr(job, "scheduled_nodes"): + self.available_nodes.extend(job.scheduled_nodes) + # Remove duplicates and sort the list for consistency + self.available_nodes = sorted(set(self.available_nodes)) + else: + # If job has no scheduled nodes, there is nothing to free. + pass + + def update_system_utilization(self, current_time, num_active_nodes): + """ + Computes and records the system utilization. + For example, utilization could be defined as the ratio of active nodes to the total non-down nodes. + """ + # Number of nodes that are not down: + total_operational = self.total_nodes - len(self.down_nodes) + # Compute utilization as a percentage: + utilization = (num_active_nodes / total_operational) * 100 if total_operational else 0 + self.sys_util_history.append((current_time, utilization)) + return utilization diff --git a/raps/schedulers/default.py b/raps/schedulers/default.py index 46dadd9..6b113de 100644 --- a/raps/schedulers/default.py +++ b/raps/schedulers/default.py @@ -15,9 +15,13 @@ class Scheduler: """ Default job scheduler with various scheduling policies. """ - def __init__(self, config, policy): + def __init__(self, config, policy, resource_manager=None): self.config = config self.policy = PolicyType(policy) + if resource_manager is None: + raise ValueError("Scheduler requires a ResourceManager instance") + self.resource_manager = resource_manager + self.debug = False def sort_jobs(self, queue): @@ -32,42 +36,22 @@ class Scheduler: raise ValueError(f"Unknown policy type: {self.policy}") - def assign_nodes_to_job(self, job, available_nodes, current_time): - """Assigns nodes to a job and updates available nodes.""" - if len(available_nodes) < job.nodes_required: - raise ValueError(f"Not enough available nodes to schedule job {job.id}") - - if job.requested_nodes: # Telemetry replay case - job.scheduled_nodes = job.requested_nodes - available_nodes[:] = [n for n in available_nodes if n not in job.scheduled_nodes] - else: # Synthetic or reschedule case - job.scheduled_nodes = available_nodes[:job.nodes_required] - available_nodes[:] = available_nodes[job.nodes_required:] - - # Set job start and end times - job.start_time = current_time - job.end_time = current_time + job.wall_time - job.state = JobState.RUNNING # Job is now running - - - def schedule(self, queue, running, available_nodes, current_time, debug=False): + def schedule(self, queue, running, current_time, debug=False): # Sort the queue in place. queue[:] = self.sort_jobs(queue) # Iterate over a copy of the queue since we might remove items for job in queue[:]: - synthetic_bool = len(available_nodes) >= job.nodes_required - telemetry_bool = job.requested_nodes and set(job.requested_nodes).issubset(set(available_nodes)) - - if synthetic_bool or telemetry_bool: - self.assign_nodes_to_job(job, available_nodes, current_time) + # Check if the resource manager has enough nodes. + if len(self.resource_manager.available_nodes) >= job.nodes_required: + # Use ResourceManager to assign nodes. + self.resource_manager.assign_nodes_to_job(job, current_time) running.append(job) - queue.remove(job) # Remove the job from the queue + queue.remove(job) if debug: scheduled_nodes = summarize_ranges(job.scheduled_nodes) print(f"t={current_time}: Scheduled job {job.id} with wall time {job.wall_time} on nodes {scheduled_nodes}") else: - # Optionally, if you have a BACKFILL policy, attempt backfilling here. if self.policy == PolicyType.BACKFILL: # Try to find a backfill candidate from the entire queue. backfill_job = self.find_backfill_job(queue, len(available_nodes), current_time) -- GitLab From 5c7c377ed6c3ccaf004aec0c8e8fc9c0872c342a Mon Sep 17 00:00:00 2001 From: Wes Brewer Date: Wed, 5 Feb 2025 15:24:20 -0500 Subject: [PATCH 07/15] Fix to previous commit --- raps/engine.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/raps/engine.py b/raps/engine.py index 7fbdb63..73385c1 100644 --- a/raps/engine.py +++ b/raps/engine.py @@ -81,8 +81,8 @@ class Engine: # Update active/free nodes self.num_free_nodes = len(self.resource_manager.available_nodes) - self.num_active_nodes = self.config['TOTAL_NODES'] - - len(self.resource_manager.available_nodes) + self.num_active_nodes = self.config['TOTAL_NODES'] \ + - len(self.resource_manager.available_nodes) \ - len(self.resource_manager.down_nodes) # Update running time for all running jobs -- GitLab From 84f5cfd31dddf3e5506bbd4d57373ba00eeb5c87 Mon Sep 17 00:00:00 2001 From: Wes Brewer Date: Wed, 5 Feb 2025 17:34:45 -0500 Subject: [PATCH 08/15] Bug fix related to creating ResourceManager --- raps/engine.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/raps/engine.py b/raps/engine.py index 73385c1..a96f9ef 100644 --- a/raps/engine.py +++ b/raps/engine.py @@ -269,8 +269,8 @@ class Engine: newly_downed_nodes = all_nodes[failed_nodes_mask] for node_index in newly_downed_nodes: - if node_index in self.available_nodes: - self.available_nodes.remove(node_index) + if node_index in self.resource_manager.available_nodes: + self.resource_manager.available_nodes.remove(node_index) self.down_nodes.append(str(node_index)) self.power_manager.set_idle(node_index) -- GitLab From d93b054f5b63c246a32035aafedd293ac1b2a138 Mon Sep 17 00:00:00 2001 From: Wes Brewer Date: Wed, 5 Feb 2025 21:31:04 -0500 Subject: [PATCH 09/15] Fix a couple more bugs which was causing significant slowdown b/c of incorrect scheduling --- raps/engine.py | 24 +++++++++++++++--------- raps/schedulers/default.py | 15 ++++++++++++--- 2 files changed, 27 insertions(+), 12 deletions(-) diff --git a/raps/engine.py b/raps/engine.py index a96f9ef..d8250cc 100644 --- a/raps/engine.py +++ b/raps/engine.py @@ -194,28 +194,34 @@ class Engine: raise TypeError(f"Invalid type for utilization: {type(trace)}.") - def run_simulation(self, jobs, timesteps): - """ Generator that yields after each simulation tick """ - last_submit_time = 0 + def run_simulation(self, jobs, timesteps, autoshutdown=False): + """Generator that yields after each simulation tick.""" self.timesteps = timesteps - for job_info in jobs: - job = Job(job_info, self.current_time) - self.add_job(job) + # Sort pending jobs by submit_time. + jobs_to_submit = sorted(jobs, key=lambda j: j['submit_time']) for timestep in range(timesteps): + # Submit jobs whose submit_time is <= current_time + while jobs_to_submit and jobs_to_submit[0]['submit_time'] <= self.current_time: + job_info = jobs_to_submit.pop(0) + job = Job(job_info, self.current_time) + self.add_job(job) + # Schedule jobs that are now in the queue. self.scheduler.schedule(self.queue, self.running, self.current_time) - # Stop the simulation if no more jobs are running or in the queue - if not self.queue and not self.running and not self.replay: + # Stop the simulation if no more jobs are running or in the queue. + if autoshutdown and not self.queue and not self.running and not self.replay: print(f"[DEBUG] {self.config['system_name']} - Stopping simulation at time {self.current_time}") break + if self.debug and timestep % self.config['UI_UPDATE_FREQ'] == 0: - print(".", end="", flush=True) + print(".", end="", flush=True) yield self.tick() + def get_stats(self): """ Return output statistics """ sum_values = lambda values: sum(x[1] for x in values) if values else 0 diff --git a/raps/schedulers/default.py b/raps/schedulers/default.py index 6b113de..7ab664b 100644 --- a/raps/schedulers/default.py +++ b/raps/schedulers/default.py @@ -42,9 +42,18 @@ class Scheduler: # Iterate over a copy of the queue since we might remove items for job in queue[:]: - # Check if the resource manager has enough nodes. - if len(self.resource_manager.available_nodes) >= job.nodes_required: - # Use ResourceManager to assign nodes. + + # For synthetic jobs the number of requested nodes is given. + # Make sure the available nodes count meets job.nodes_required. + synthetic_bool = len(self.resource_manager.available_nodes) >= job.nodes_required + + # For telemetry replay jobs a list of requested nodes is provided. + # Make sure the requested nodes are available. + telemetry_bool = False + if job.requested_nodes: + telemetry_bool = set(job.requested_nodes).issubset(set(self.resource_manager.available_nodes)) + + if synthetic_bool or telemetry_bool: self.resource_manager.assign_nodes_to_job(job, current_time) running.append(job) queue.remove(job) -- GitLab From 94c84694d0a95178a81913789850dcb23eb54b33 Mon Sep 17 00:00:00 2001 From: kevinmenear Date: Thu, 6 Feb 2025 01:20:35 -0500 Subject: [PATCH 10/15] Vectorize update flop state and update power state --- raps/engine.py | 25 ++++++++++++++++++++----- raps/flops.py | 34 ++++++++++++++++++++++++++-------- raps/power.py | 23 +++++++++++++++++++---- 3 files changed, 65 insertions(+), 17 deletions(-) diff --git a/raps/engine.py b/raps/engine.py index d8250cc..78617b7 100644 --- a/raps/engine.py +++ b/raps/engine.py @@ -86,6 +86,10 @@ class Engine: - len(self.resource_manager.down_nodes) # Update running time for all running jobs + scheduled_nodes = [] + cpu_utils = [] + gpu_utils = [] + net_utils = [] for job in self.running: if job.end_time == self.current_time: job.state = JobState.COMPLETED @@ -101,12 +105,23 @@ class Engine: net_tx = self.get_utilization(job.ntx_trace, time_quanta_index) net_rx = self.get_utilization(job.nrx_trace, time_quanta_index) net_util = network_utilization(net_tx, net_rx) + net_utils.append(net_util) + else: + net_utils.append(0) - self.flops_manager.update_flop_state(job.scheduled_nodes, cpu_util, gpu_util) - job.power = self.power_manager.update_power_state(job.scheduled_nodes, cpu_util, gpu_util, net_util) - - if job.running_time % self.config['TRACE_QUANTA'] == 0: - job.power_history.append(job.power) + scheduled_nodes.append(job.scheduled_nodes) + cpu_utils.append(cpu_util) + gpu_utils.append(gpu_util) + + if len(scheduled_nodes) > 0: + self.flops_manager.update_flop_state(scheduled_nodes, cpu_utils, gpu_utils) + jobs_power = self.power_manager.update_power_state(scheduled_nodes, cpu_utils, gpu_utils, net_utils) + job_index = 0 + for job in self.running: + if job.state == JobState.RUNNING: + if job.running_time % self.config['TRACE_QUANTA'] == 0: + job.power_history.append(jobs_power[job_index] * len(job.scheduled_nodes)) + job_index += len(job.scheduled_nodes) for job in completed_jobs: self.running.remove(job) diff --git a/raps/flops.py b/raps/flops.py index 64511ad..eebd0fa 100644 --- a/raps/flops.py +++ b/raps/flops.py @@ -9,17 +9,35 @@ class FLOPSManager(): self.flop_state = np.zeros(self.config['SC_SHAPE']) def update_flop_state(self, scheduled_nodes, cpu_util, gpu_util): - node_indices = linear_to_3d_index(scheduled_nodes, self.config['SC_SHAPE']) - if self.validate: # cpu_util is in fact node_Watts in this case - self.flop_state[node_indices] = \ - (self.config['CPU_FP_RATIO']*self.config['CPU_PEAK_FLOPS'] + self.config['GPU_FP_RATIO'] * self.config['GPU_PEAK_FLOPS']) * (cpu_util / (self.config['POWER_CPU_MAX']*self.config['CPUS_PER_NODE'] + self.config['POWER_GPU_MAX']*self.config['GPUS_PER_NODE']+ self.config['POWER_NIC']*self.config['NICS_PER_NODE']+self.config['POWER_NVME'])) - else: - self.flop_state[node_indices] = \ - self.config['CPU_FP_RATIO'] * cpu_util * self.config['CPU_PEAK_FLOPS'] + \ - self.config['GPU_FP_RATIO'] * gpu_util * self.config['GPU_PEAK_FLOPS'] + cpu_util = np.asarray(cpu_util) + gpu_util = np.asarray(gpu_util) + job_lengths = np.array([len(job) for job in scheduled_nodes]) + flattened_nodes = np.concatenate(scheduled_nodes, axis=0) + + cpu_util_flat = np.repeat(cpu_util, job_lengths) + gpu_util_flat = np.repeat(gpu_util, job_lengths) + node_indices = linear_to_3d_index(flattened_nodes, self.config['SC_SHAPE']) + if self.validate: # cpu_util is in fact node_Watts in this case + total_peak = ( + self.config['CPU_FP_RATIO'] * self.config['CPU_PEAK_FLOPS'] + + self.config['GPU_FP_RATIO'] * self.config['GPU_PEAK_FLOPS'] + ) + denominator = ( + self.config['POWER_CPU_MAX'] * self.config['CPUS_PER_NODE'] + + self.config['POWER_GPU_MAX'] * self.config['GPUS_PER_NODE'] + + self.config['POWER_NIC'] * self.config['NICS_PER_NODE'] + + self.config['POWER_NVME'] + ) + self.flop_state[node_indices] = total_peak * (cpu_util_flat / denominator) + else: + self.flop_state[node_indices] = ( + self.config['CPU_FP_RATIO'] * cpu_util_flat * self.config['CPU_PEAK_FLOPS'] + + self.config['GPU_FP_RATIO'] * gpu_util_flat * self.config['GPU_PEAK_FLOPS'] + ) + def get_rpeak(self): node_peak_flops = self.config['CPUS_PER_NODE'] * self.config['CPU_PEAK_FLOPS'] \ + self.config['GPUS_PER_NODE'] * self.config['GPU_PEAK_FLOPS'] diff --git a/raps/power.py b/raps/power.py index 29e31a0..e1a5d0b 100644 --- a/raps/power.py +++ b/raps/power.py @@ -58,7 +58,10 @@ def compute_node_power(cpu_util, gpu_util, net_util, config): power_nic = config['POWER_NIC_IDLE'] + \ (config['POWER_NIC_MAX'] - config['POWER_NIC_IDLE']) * net_util except: - power_nic = config['POWER_NIC'] + if isinstance(net_util, np.ndarray): + power_nic = config['POWER_NIC'] * np.ones(net_util.shape) + else: + power_nic = config['POWER_NIC'] power_total = power_cpu + power_gpu + config['POWER_MEM'] + \ config['NICS_PER_NODE'] * power_nic + config['POWER_NVME'] @@ -260,11 +263,23 @@ class PowerManager: float Total power consumption of the scheduled nodes. """ - node_indices = linear_to_3d_index(scheduled_nodes, self.sc_shape) - power_value, sivoc_loss = self.power_func(cpu_util, gpu_util, net_util, self.config) + cpu_util = np.asarray(cpu_util) + gpu_util = np.asarray(gpu_util) + net_util = np.asarray(net_util) + job_lengths = np.array([len(job) for job in scheduled_nodes]) + flattened_nodes = np.concatenate(scheduled_nodes, axis=0) + + cpu_util_flat = np.repeat(cpu_util, job_lengths) + gpu_util_flat = np.repeat(gpu_util, job_lengths) + net_util_flat = np.repeat(net_util, job_lengths) + + node_indices = linear_to_3d_index(flattened_nodes, self.config['SC_SHAPE']) + + power_value, sivoc_loss = self.power_func(cpu_util_flat, gpu_util_flat, net_util_flat, self.config) self.power_state[node_indices] = power_value self.sivoc_loss[node_indices] = sivoc_loss - return power_value * len(scheduled_nodes) + return power_value + def calculate_rectifiers_needed(self, power_state_summed): """ -- GitLab From d1aa5d3e067c70ce27c89687930b621e594ebe29 Mon Sep 17 00:00:00 2001 From: kevinmenear Date: Thu, 6 Feb 2025 09:23:53 -0500 Subject: [PATCH 11/15] Return single power value per job from update_power_state --- raps/engine.py | 14 ++++++++------ raps/power.py | 2 +- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/raps/engine.py b/raps/engine.py index 78617b7..77643c1 100644 --- a/raps/engine.py +++ b/raps/engine.py @@ -116,12 +116,14 @@ class Engine: if len(scheduled_nodes) > 0: self.flops_manager.update_flop_state(scheduled_nodes, cpu_utils, gpu_utils) jobs_power = self.power_manager.update_power_state(scheduled_nodes, cpu_utils, gpu_utils, net_utils) - job_index = 0 - for job in self.running: - if job.state == JobState.RUNNING: - if job.running_time % self.config['TRACE_QUANTA'] == 0: - job.power_history.append(jobs_power[job_index] * len(job.scheduled_nodes)) - job_index += len(job.scheduled_nodes) + + _running_jobs = [job for job in self.running if job.state == JobState.RUNNING] + if len(jobs_power) != len(_running_jobs): + raise ValueError(f"Jobs power list of length ({len(jobs_power)}) should have ({len(running_jobs)}) items.") + for i, job in enumerate(_running_jobs): + if job.running_time % self.config['TRACE_QUANTA'] == 0: + job.power_history.append(jobs_power[i] * len(job.scheduled_nodes)) + del _running_jobs for job in completed_jobs: self.running.remove(job) diff --git a/raps/power.py b/raps/power.py index e1a5d0b..832a1ab 100644 --- a/raps/power.py +++ b/raps/power.py @@ -278,7 +278,7 @@ class PowerManager: power_value, sivoc_loss = self.power_func(cpu_util_flat, gpu_util_flat, net_util_flat, self.config) self.power_state[node_indices] = power_value self.sivoc_loss[node_indices] = sivoc_loss - return power_value + return power_value[np.cumsum(job_lengths) - 1] def calculate_rectifiers_needed(self, power_state_summed): -- GitLab From c9b49c74030c64da97ed94f783e06dc781afb6d6 Mon Sep 17 00:00:00 2001 From: kevinmenear Date: Thu, 6 Feb 2025 09:32:45 -0500 Subject: [PATCH 12/15] Fix bug in jobs_power length ValueError exception message --- raps/engine.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/raps/engine.py b/raps/engine.py index 77643c1..9972854 100644 --- a/raps/engine.py +++ b/raps/engine.py @@ -116,10 +116,10 @@ class Engine: if len(scheduled_nodes) > 0: self.flops_manager.update_flop_state(scheduled_nodes, cpu_utils, gpu_utils) jobs_power = self.power_manager.update_power_state(scheduled_nodes, cpu_utils, gpu_utils, net_utils) - + _running_jobs = [job for job in self.running if job.state == JobState.RUNNING] if len(jobs_power) != len(_running_jobs): - raise ValueError(f"Jobs power list of length ({len(jobs_power)}) should have ({len(running_jobs)}) items.") + raise ValueError(f"Jobs power list of length ({len(jobs_power)}) should have ({len(_running_jobs)}) items.") for i, job in enumerate(_running_jobs): if job.running_time % self.config['TRACE_QUANTA'] == 0: job.power_history.append(jobs_power[i] * len(job.scheduled_nodes)) -- GitLab From 84f32bea60d4e5d9934366d96728f995562026bd Mon Sep 17 00:00:00 2001 From: Wes Brewer Date: Thu, 6 Feb 2025 14:38:44 -0500 Subject: [PATCH 13/15] Move node_failure() to resmgr.py --- raps/engine.py | 25 +------------------------ raps/resmgr.py | 34 ++++++++++++++++++++++++++++++++++ tests/smoke.py | 1 - 3 files changed, 35 insertions(+), 25 deletions(-) diff --git a/raps/engine.py b/raps/engine.py index 9972854..0201aec 100644 --- a/raps/engine.py +++ b/raps/engine.py @@ -77,7 +77,7 @@ class Engine: completed_job_stats = [] # Simulate node failure - newly_downed_nodes = self.node_failure(self.config['MTBF']) + newly_downed_nodes = self.resource_manager.node_failure(self.config['MTBF']) # Update active/free nodes self.num_free_nodes = len(self.resource_manager.available_nodes) @@ -275,26 +275,3 @@ class Engine: } return stats - - - def node_failure(self, mtbf): - """Simulate node failure using Weibull distribution.""" - from scipy.stats import weibull_min - shape_parameter = 1.5 - scale_parameter = mtbf * 3600 # Convert to seconds - - down_nodes = expand_ranges(self.down_nodes) - all_nodes = np.setdiff1d(np.arange(self.config['TOTAL_NODES']), np.array(down_nodes, dtype=int)) - - random_values = weibull_min.rvs(shape_parameter, scale=scale_parameter, size=all_nodes.size) - failure_threshold = 0.1 - failed_nodes_mask = random_values < failure_threshold - newly_downed_nodes = all_nodes[failed_nodes_mask] - - for node_index in newly_downed_nodes: - if node_index in self.resource_manager.available_nodes: - self.resource_manager.available_nodes.remove(node_index) - self.down_nodes.append(str(node_index)) - self.power_manager.set_idle(node_index) - - return newly_downed_nodes.tolist() diff --git a/raps/resmgr.py b/raps/resmgr.py index d4fee8a..508cc6f 100644 --- a/raps/resmgr.py +++ b/raps/resmgr.py @@ -1,4 +1,8 @@ +import numpy as np from .job import JobState +from .utils import expand_ranges +from scipy.stats import weibull_min + class ResourceManager: def __init__(self, total_nodes, down_nodes): @@ -48,3 +52,33 @@ class ResourceManager: utilization = (num_active_nodes / total_operational) * 100 if total_operational else 0 self.sys_util_history.append((current_time, utilization)) return utilization + + def node_failure(self, mtbf): + """Simulate node failure using Weibull distribution.""" + shape_parameter = 1.5 + scale_parameter = mtbf * 3600 # Convert to seconds + + # Create a NumPy array of node indices, excluding down nodes + #print(self.down_nodes) + #down_nodes = expand_ranges(self.down_nodes) + #all_nodes = np.setdiff1d(np.arange(self.config['TOTAL_NODES']), np.array(self.down_nodes, dtype=int)) + all_nodes = np.array(sorted(set(range(self.total_nodes)) - set(self.down_nodes))) + + # Sample the Weibull distribution for all nodes at once + random_values = weibull_min.rvs(shape_parameter, scale=scale_parameter, size=all_nodes.size) + failure_threshold = 0.1 + failed_nodes = [node for node, r in zip(all_nodes, random_values) if r < failure_threshold] + + # Identify nodes that have failed + failure_threshold = 0.1 + failed_nodes_mask = random_values < failure_threshold + newly_downed_nodes = all_nodes[failed_nodes_mask] + + # Update available and down nodes + for node_index in newly_downed_nodes: + if node_index in self.available_nodes: + self.available_nodes.remove(node_index) + self.down_nodes.append(str(node_index)) + self.power_manager.set_idle(node_index) + + return newly_downed_nodes.tolist() diff --git a/tests/smoke.py b/tests/smoke.py index 623908b..9174b3c 100644 --- a/tests/smoke.py +++ b/tests/smoke.py @@ -11,7 +11,6 @@ DEFAULT_TIME = "1h" # Define systems and their corresponding filenames SYSTEMS = { "frontier": "frontier/slurm/joblive/date=2024-01-18 frontier/jobprofile/date=2024-01-18", - "fugaku": "fugaku/21_04.parquet", "marconi100": "marconi100/job_table.parquet", "lassen": "lassen/Lassen-Supercomputer-Job-Dataset", "adastraMI250": "adastra/AdastaJobsMI250_15days.parquet" -- GitLab From d75b912d993224ca5818505bf392993e0c922ab9 Mon Sep 17 00:00:00 2001 From: Wes Brewer Date: Thu, 6 Feb 2025 15:22:10 -0500 Subject: [PATCH 14/15] Fix bug in UI reporting of down_nodes --- raps/engine.py | 2 +- raps/resmgr.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/raps/engine.py b/raps/engine.py index 0201aec..df62af2 100644 --- a/raps/engine.py +++ b/raps/engine.py @@ -186,7 +186,7 @@ class Engine: completed=completed_jobs, running=self.running, queue=self.queue, - down_nodes=expand_ranges(self.down_nodes), + down_nodes=expand_ranges(self.down_nodes[1:]), power_df=power_df, p_flops=pflops, g_flops_w=gflop_per_watt, diff --git a/raps/resmgr.py b/raps/resmgr.py index 508cc6f..196e636 100644 --- a/raps/resmgr.py +++ b/raps/resmgr.py @@ -78,7 +78,7 @@ class ResourceManager: for node_index in newly_downed_nodes: if node_index in self.available_nodes: self.available_nodes.remove(node_index) - self.down_nodes.append(str(node_index)) + self.down_nodes.add(str(node_index)) self.power_manager.set_idle(node_index) return newly_downed_nodes.tolist() -- GitLab From 086ce17bc915f5d43681a3a89be8e19ed847d264 Mon Sep 17 00:00:00 2001 From: Wes Brewer Date: Thu, 6 Feb 2025 15:46:59 -0500 Subject: [PATCH 15/15] Move get_utilization() from engine.py to utils.py and cleanup code. --- raps/engine.py | 25 +++++++------------------ raps/power.py | 4 +++- raps/resmgr.py | 7 ------- raps/telemetry.py | 2 -- raps/ui.py | 1 - raps/utils.py | 10 ++++++++++ raps/weather.py | 3 +-- raps/workload.py | 1 - 8 files changed, 21 insertions(+), 32 deletions(-) diff --git a/raps/engine.py b/raps/engine.py index df62af2..ccb27fa 100644 --- a/raps/engine.py +++ b/raps/engine.py @@ -1,12 +1,11 @@ from typing import Optional import dataclasses -import numpy as np import pandas as pd from .job import Job, JobState from .account import Accounts from .network import network_utilization -from .utils import summarize_ranges, expand_ranges, write_dict_to_file +from .utils import summarize_ranges, expand_ranges, get_utilization from .resmgr import ResourceManager from .schedulers import load_scheduler @@ -74,10 +73,11 @@ class Engine: def tick(self): """Simulate a timestep.""" completed_jobs = [job for job in self.running if job.end_time is not None and job.end_time <= self.current_time] - completed_job_stats = [] # Simulate node failure newly_downed_nodes = self.resource_manager.node_failure(self.config['MTBF']) + for node in newly_downed_nodes: + self.power_manager.set_idle(node) # Update active/free nodes self.num_free_nodes = len(self.resource_manager.available_nodes) @@ -97,13 +97,13 @@ class Engine: if job.state == JobState.RUNNING: job.running_time = self.current_time - job.start_time time_quanta_index = (self.current_time - job.start_time) // self.config['TRACE_QUANTA'] - cpu_util = self.get_utilization(job.cpu_trace, time_quanta_index) - gpu_util = self.get_utilization(job.gpu_trace, time_quanta_index) + cpu_util = get_utilization(job.cpu_trace, time_quanta_index) + gpu_util = get_utilization(job.gpu_trace, time_quanta_index) net_util = 0 if len(job.ntx_trace) and len(job.nrx_trace): - net_tx = self.get_utilization(job.ntx_trace, time_quanta_index) - net_rx = self.get_utilization(job.nrx_trace, time_quanta_index) + net_tx = get_utilization(job.ntx_trace, time_quanta_index) + net_rx = get_utilization(job.nrx_trace, time_quanta_index) net_util = network_utilization(net_tx, net_rx) net_utils.append(net_util) else: @@ -156,7 +156,6 @@ class Engine: self.power_manager.history.append((self.current_time, total_power_kw)) self.sys_power = total_power_kw self.power_manager.loss_history.append((self.current_time, total_loss_kw)) - output_df = self.power_manager.get_power_df(rack_power, rack_loss) pflops = self.flops_manager.get_system_performance() / 1E15 gflop_per_watt = pflops * 1E6 / (total_power_kw * 1000) else: @@ -201,16 +200,6 @@ class Engine: return tick_data - def get_utilization(self, trace, time_quanta_index): - """Retrieve utilization value for a given trace at a specific time quanta index.""" - if isinstance(trace, (list, np.ndarray)): - return trace[time_quanta_index] - elif isinstance(trace, (int, float)): - return float(trace) - else: - raise TypeError(f"Invalid type for utilization: {type(trace)}.") - - def run_simulation(self, jobs, timesteps, autoshutdown=False): """Generator that yields after each simulation tick.""" self.timesteps = timesteps diff --git a/raps/power.py b/raps/power.py index 832a1ab..e61010f 100644 --- a/raps/power.py +++ b/raps/power.py @@ -334,7 +334,9 @@ class PowerManager: num_rectifiers = num_rectifiers_array[i, j, k] power_per_rectifier = chassis_power[i, j, k] / num_rectifiers rectifier_power[i, j, k, :num_rectifiers] = power_per_rectifier - power_with_losses[i, j, k, :num_rectifiers] = rectifier_loss(power_per_rectifier) + power_with_losses[i, j, k, :num_rectifiers] = compute_loss(power_per_rectifier, \ + self.config['RECTIFIER_LOSS_CONSTANT'], \ + self.config['RECTIFIER_EFFICIENCY']) rectifier_power = np.nan_to_num(rectifier_power) power_with_losses = np.nan_to_num(power_with_losses) diff --git a/raps/resmgr.py b/raps/resmgr.py index 196e636..8abce81 100644 --- a/raps/resmgr.py +++ b/raps/resmgr.py @@ -1,6 +1,5 @@ import numpy as np from .job import JobState -from .utils import expand_ranges from scipy.stats import weibull_min @@ -59,15 +58,10 @@ class ResourceManager: scale_parameter = mtbf * 3600 # Convert to seconds # Create a NumPy array of node indices, excluding down nodes - #print(self.down_nodes) - #down_nodes = expand_ranges(self.down_nodes) - #all_nodes = np.setdiff1d(np.arange(self.config['TOTAL_NODES']), np.array(self.down_nodes, dtype=int)) all_nodes = np.array(sorted(set(range(self.total_nodes)) - set(self.down_nodes))) # Sample the Weibull distribution for all nodes at once random_values = weibull_min.rvs(shape_parameter, scale=scale_parameter, size=all_nodes.size) - failure_threshold = 0.1 - failed_nodes = [node for node, r in zip(all_nodes, random_values) if r < failure_threshold] # Identify nodes that have failed failure_threshold = 0.1 @@ -79,6 +73,5 @@ class ResourceManager: if node_index in self.available_nodes: self.available_nodes.remove(node_index) self.down_nodes.add(str(node_index)) - self.power_manager.set_idle(node_index) return newly_downed_nodes.tolist() diff --git a/raps/telemetry.py b/raps/telemetry.py index 9aa72bc..833ca39 100644 --- a/raps/telemetry.py +++ b/raps/telemetry.py @@ -23,8 +23,6 @@ if __name__ == "__main__": import importlib import numpy as np -import re -from datetime import datetime from tqdm import tqdm from .config import ConfigManager diff --git a/raps/ui.py b/raps/ui.py index fc83e6b..8bee172 100644 --- a/raps/ui.py +++ b/raps/ui.py @@ -1,4 +1,3 @@ -import numpy as np import pandas as pd from rich.align import Align from rich.console import Console diff --git a/raps/utils.py b/raps/utils.py index 6c9d4d7..5ead3d1 100644 --- a/raps/utils.py +++ b/raps/utils.py @@ -360,3 +360,13 @@ def toJSON(obj): default=lambda o:o.__dict__, sort_keys=True, indent=4) + + +def get_utilization(trace, time_quanta_index): + """Retrieve utilization value for a given trace at a specific time quanta index.""" + if isinstance(trace, (list, np.ndarray)): + return trace[time_quanta_index] + elif isinstance(trace, (int, float)): + return float(trace) + else: + raise TypeError(f"Invalid type for utilization: {type(trace)}.") diff --git a/raps/weather.py b/raps/weather.py index 3917279..b31f88e 100644 --- a/raps/weather.py +++ b/raps/weather.py @@ -1,7 +1,6 @@ import requests import urllib3 -import json -from datetime import datetime, timedelta +from datetime import datetime # Disable SSL warnings when verify=False is used (temporary debugging purpose) urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) diff --git a/raps/workload.py b/raps/workload.py index 14d91cc..b368246 100644 --- a/raps/workload.py +++ b/raps/workload.py @@ -25,7 +25,6 @@ JOB_END_PROBS : list """ -import math import random import numpy as np -- GitLab