diff --git a/args.py b/args.py index 1ca2343ea4b140fff007b073f38c6f87a9bdab82..8cfcd77f4d218958651bc76c52af5195fd99bc01 100644 --- a/args.py +++ b/args.py @@ -1,5 +1,5 @@ import argparse -from raps.policy import PolicyType +from raps.schedulers.default import PolicyType parser = argparse.ArgumentParser(description='Resource Allocator & Power Simulator (RAPS)') parser.add_argument('-c', '--cooling', action='store_true', help='Include FMU cooling model') @@ -28,8 +28,10 @@ choices = ['png', 'svg', 'jpg', 'pdf', 'eps'] parser.add_argument('--imtype', type=str, choices=choices, default=choices[0], help='Plot image type') parser.add_argument('--scale', type=int, default=0, help='Scale telemetry to max nodes specified in order to run telemetry on a smaller smaller target system/partition, e.g., --scale 192') parser.add_argument('--system', type=str, default='frontier', help='System config to use') +choices = ['default', 'nrel', 'anl', 'flux'] +parser.add_argument('--scheduler', type=str, choices=choices, default=choices[0], help='Name of scheduler') choices = [policy.value for policy in PolicyType] -parser.add_argument('-s', '--schedule', type=str, choices=choices, default=choices[0], help='Schedule policy to use') +parser.add_argument('--policy', type=str, choices=choices, default=choices[0], help='Schedule policy to use') choices = ['random', 'benchmark', 'peak', 'idle'] parser.add_argument('-w', '--workload', type=str, choices=choices, default=choices[0], help='Type of synthetic workload') choices = ['layout1', 'layout2'] diff --git a/main.py b/main.py index 67c04f8648caf094daf9459776072dd04810cc10..58e714703e07eb677c09c6d658bcee66562066ce 100644 --- a/main.py +++ b/main.py @@ -25,7 +25,8 @@ from raps.flops import FLOPSManager from raps.plotting import Plotter from raps.power import PowerManager, compute_node_power, compute_node_power_validate from raps.power import compute_node_power_uncertainties, compute_node_power_validate_uncertainties -from raps.scheduler import Scheduler, Job +from raps.engine import Engine +from raps.job import Job from raps.telemetry import Telemetry from raps.workload import Workload from raps.weather import Weather @@ -61,13 +62,13 @@ else: args_dict['config'] = config flops_manager = FLOPSManager(**args_dict) -sc = Scheduler( +sc = Engine( power_manager=power_manager, flops_manager=flops_manager, cooling_model=cooling_model, **args_dict, ) -layout_manager = LayoutManager(args.layout, scheduler=sc, debug=args.debug, **config) +layout_manager = LayoutManager(args.layout, engine=sc, debug=args.debug, **config) if args.replay: diff --git a/multi-part-sim.py b/multi-part-sim.py index 36218377c22fb99505375c025ee7e026f2458cd3..342d463012a742a8744686a4f14a9412007aee6d 100644 --- a/multi-part-sim.py +++ b/multi-part-sim.py @@ -8,9 +8,9 @@ import sys from args import args from raps.config import ConfigManager, CONFIG_PATH -from raps.policy import PolicyType +from raps.schedulers.default import PolicyType from raps.ui import LayoutManager -from raps.scheduler import Scheduler +from raps.engine import Engine from raps.flops import FLOPSManager from raps.power import PowerManager, compute_node_power from raps.telemetry import Telemetry @@ -74,8 +74,8 @@ layout_managers = {} for i, config in enumerate(configs): pm = PowerManager(compute_node_power, **configs[i]) fm = FLOPSManager(**args_dicts[i]) - sc = Scheduler(power_manager=pm, flops_manager=fm, cooling_model=None, **args_dicts[i]) - layout_managers[config['system_name']] = LayoutManager(args.layout, scheduler=sc, debug=args.debug, **config) + sc = Engine(power_manager=pm, flops_manager=fm, cooling_model=None, **args_dicts[i]) + layout_managers[config['system_name']] = LayoutManager(args.layout, engine=sc, debug=args.debug, **config) # Set simulation timesteps if args.time: @@ -96,9 +96,9 @@ for timestep in range(timesteps): if timestep % configs[0]['UI_UPDATE_FREQ'] == 0: # Assuming same frequency for all partitions sys_power = 0 for name, lm in layout_managers.items(): - sys_util = lm.scheduler.sys_util_history[-1] if lm.scheduler.sys_util_history else 0.0 - print(f"[DEBUG] {name} - Timestep {timestep} - Jobs running: {len(lm.scheduler.running)} - Utilization: {sys_util[1]:.2f}% - Power: {lm.scheduler.sys_power:.1f}kW") - sys_power += lm.scheduler.sys_power + sys_util = lm.engine.sys_util_history[-1] if lm.engine.sys_util_history else 0.0 + print(f"[DEBUG] {name} - Timestep {timestep} - Jobs running: {len(lm.engine.running)} - Utilization: {sys_util[1]:.2f}% - Power: {lm.engine.sys_power:.1f}kW") + sys_power += lm.engine.sys_power print(f"system power: {sys_power:.1f}kW") print("Simulation complete.") diff --git a/raps/engine.py b/raps/engine.py new file mode 100644 index 0000000000000000000000000000000000000000..ccb27fab23ee136121db16a6c170ed3be05dd087 --- /dev/null +++ b/raps/engine.py @@ -0,0 +1,266 @@ +from typing import Optional +import dataclasses +import pandas as pd + +from .job import Job, JobState +from .account import Accounts +from .network import network_utilization +from .utils import summarize_ranges, expand_ranges, get_utilization +from .resmgr import ResourceManager +from .schedulers import load_scheduler + + +@dataclasses.dataclass +class TickData: + """ Represents the state output from the simulation each tick """ + current_time: int + completed: list[Job] + running: list[Job] + queue: list[Job] + down_nodes: list[int] + power_df: Optional[pd.DataFrame] + p_flops: Optional[float] + g_flops_w: Optional[float] + system_util: float + fmu_inputs: Optional[dict] + fmu_outputs: Optional[dict] + num_active_nodes: int + num_free_nodes: int + + +class Engine: + """Job scheduling simulation engine.""" + def __init__(self, *, power_manager, flops_manager, cooling_model=None, config, **kwargs): + self.config = config + self.down_nodes = summarize_ranges(self.config['DOWN_NODES']) + self.resource_manager = ResourceManager( + total_nodes=self.config['TOTAL_NODES'], + down_nodes=self.config['DOWN_NODES'] + ) + + # Initialize running and queue, etc. + self.running = [] + self.queue = [] + self.accounts = Accounts() + if 'accounts_json' in kwargs and kwargs['accounts_json']: + self.accounts.initialize_accounts_from_json(kwargs.get('accounts_json')) + self.jobs_completed = 0 + self.current_time = 0 + self.cooling_model = cooling_model + self.sys_power = 0 + self.power_manager = power_manager + self.flops_manager = flops_manager + self.debug = kwargs.get('debug') + self.output = kwargs.get('output') + self.replay = kwargs.get('replay') + self.sys_util_history = [] + + # Get scheduler type from command-line args or default + scheduler_type = kwargs.get('scheduler', 'default') + self.scheduler = load_scheduler(scheduler_type)( + config=self.config, + policy=kwargs.get('policy'), + resource_manager=self.resource_manager + ) + print(f"Using scheduler: {scheduler_type}") + + + def add_job(self, job): + self.queue.append(job) + self.queue = self.scheduler.sort_jobs(self.queue) + + + def tick(self): + """Simulate a timestep.""" + completed_jobs = [job for job in self.running if job.end_time is not None and job.end_time <= self.current_time] + + # Simulate node failure + newly_downed_nodes = self.resource_manager.node_failure(self.config['MTBF']) + for node in newly_downed_nodes: + self.power_manager.set_idle(node) + + # Update active/free nodes + self.num_free_nodes = len(self.resource_manager.available_nodes) + self.num_active_nodes = self.config['TOTAL_NODES'] \ + - len(self.resource_manager.available_nodes) \ + - len(self.resource_manager.down_nodes) + + # Update running time for all running jobs + scheduled_nodes = [] + cpu_utils = [] + gpu_utils = [] + net_utils = [] + for job in self.running: + if job.end_time == self.current_time: + job.state = JobState.COMPLETED + + if job.state == JobState.RUNNING: + job.running_time = self.current_time - job.start_time + time_quanta_index = (self.current_time - job.start_time) // self.config['TRACE_QUANTA'] + cpu_util = get_utilization(job.cpu_trace, time_quanta_index) + gpu_util = get_utilization(job.gpu_trace, time_quanta_index) + net_util = 0 + + if len(job.ntx_trace) and len(job.nrx_trace): + net_tx = get_utilization(job.ntx_trace, time_quanta_index) + net_rx = get_utilization(job.nrx_trace, time_quanta_index) + net_util = network_utilization(net_tx, net_rx) + net_utils.append(net_util) + else: + net_utils.append(0) + + scheduled_nodes.append(job.scheduled_nodes) + cpu_utils.append(cpu_util) + gpu_utils.append(gpu_util) + + if len(scheduled_nodes) > 0: + self.flops_manager.update_flop_state(scheduled_nodes, cpu_utils, gpu_utils) + jobs_power = self.power_manager.update_power_state(scheduled_nodes, cpu_utils, gpu_utils, net_utils) + + _running_jobs = [job for job in self.running if job.state == JobState.RUNNING] + if len(jobs_power) != len(_running_jobs): + raise ValueError(f"Jobs power list of length ({len(jobs_power)}) should have ({len(_running_jobs)}) items.") + for i, job in enumerate(_running_jobs): + if job.running_time % self.config['TRACE_QUANTA'] == 0: + job.power_history.append(jobs_power[i] * len(job.scheduled_nodes)) + del _running_jobs + + for job in completed_jobs: + self.running.remove(job) + self.jobs_completed += 1 + job_stats = job.statistics() + self.accounts.update_account_statistics(job_stats) + # Free the nodes via the resource manager. + self.resource_manager.free_nodes_from_job(job) + + # Ask scheduler to schedule any jobs waiting in queue + self.scheduler.schedule(self.queue, self.running, self.current_time) + + # Update the power array UI component + rack_power, rect_losses = self.power_manager.compute_rack_power() + sivoc_losses = self.power_manager.compute_sivoc_losses() + rack_loss = rect_losses + sivoc_losses + + # Update system utilization + system_util = self.num_active_nodes / self.config['AVAILABLE_NODES'] * 100 + self.sys_util_history.append((self.current_time, system_util)) + + # Render the updated layout + power_df = None + cooling_inputs, cooling_outputs = None, None + + # Update power history every 15s + if self.current_time % self.config['POWER_UPDATE_FREQ'] == 0: + total_power_kw = sum(row[-1] for row in rack_power) + self.config['NUM_CDUS'] * self.config['POWER_CDU'] / 1000.0 + total_loss_kw = sum(row[-1] for row in rack_loss) + self.power_manager.history.append((self.current_time, total_power_kw)) + self.sys_power = total_power_kw + self.power_manager.loss_history.append((self.current_time, total_loss_kw)) + pflops = self.flops_manager.get_system_performance() / 1E15 + gflop_per_watt = pflops * 1E6 / (total_power_kw * 1000) + else: + pflops, gflop_per_watt = None, None + + if self.current_time % self.config['POWER_UPDATE_FREQ'] == 0: + if self.cooling_model: + # Power for NUM_CDUS (25 for Frontier) + cdu_power = rack_power.T[-1] * 1000 + runtime_values = self.cooling_model.generate_runtime_values(cdu_power, self) + + # FMU inputs are N powers and the wetbulb temp + fmu_inputs = self.cooling_model.generate_fmu_inputs(runtime_values, + uncertainties=self.power_manager.uncertainties) + cooling_inputs, cooling_outputs = ( + self.cooling_model.step(self.current_time, fmu_inputs, self.config['POWER_UPDATE_FREQ']) + ) + + # Get a dataframe of the power data + power_df = self.power_manager.get_power_df(rack_power, rack_loss) + else: + # Get a dataframe of the power data + power_df = self.power_manager.get_power_df(rack_power, rack_loss) + + tick_data = TickData( + current_time=self.current_time, + completed=completed_jobs, + running=self.running, + queue=self.queue, + down_nodes=expand_ranges(self.down_nodes[1:]), + power_df=power_df, + p_flops=pflops, + g_flops_w=gflop_per_watt, + system_util=self.num_active_nodes / self.config['AVAILABLE_NODES'] * 100, + fmu_inputs=cooling_inputs, + fmu_outputs=cooling_outputs, + num_active_nodes=self.num_active_nodes, + num_free_nodes=self.num_free_nodes, + ) + + self.current_time += 1 + return tick_data + + + def run_simulation(self, jobs, timesteps, autoshutdown=False): + """Generator that yields after each simulation tick.""" + self.timesteps = timesteps + + # Sort pending jobs by submit_time. + jobs_to_submit = sorted(jobs, key=lambda j: j['submit_time']) + + for timestep in range(timesteps): + # Submit jobs whose submit_time is <= current_time + while jobs_to_submit and jobs_to_submit[0]['submit_time'] <= self.current_time: + job_info = jobs_to_submit.pop(0) + job = Job(job_info, self.current_time) + self.add_job(job) + + # Schedule jobs that are now in the queue. + self.scheduler.schedule(self.queue, self.running, self.current_time) + + # Stop the simulation if no more jobs are running or in the queue. + if autoshutdown and not self.queue and not self.running and not self.replay: + print(f"[DEBUG] {self.config['system_name']} - Stopping simulation at time {self.current_time}") + break + + if self.debug and timestep % self.config['UI_UPDATE_FREQ'] == 0: + print(".", end="", flush=True) + + yield self.tick() + + + def get_stats(self): + """ Return output statistics """ + sum_values = lambda values: sum(x[1] for x in values) if values else 0 + min_value = lambda values: min(x[1] for x in values) if values else 0 + max_value = lambda values: max(x[1] for x in values) if values else 0 + num_samples = len(self.power_manager.history) if self.power_manager else 0 + + throughput = self.jobs_completed / self.timesteps * 3600 if self.timesteps else 0 # Jobs per hour + average_power_mw = sum_values(self.power_manager.history) / num_samples / 1000 if num_samples else 0 + average_loss_mw = sum_values(self.power_manager.loss_history) / num_samples / 1000 if num_samples else 0 + min_loss_mw = min_value(self.power_manager.loss_history) / 1000 if num_samples else 0 + max_loss_mw = max_value(self.power_manager.loss_history) / 1000 if num_samples else 0 + + loss_fraction = average_loss_mw / average_power_mw if average_power_mw else 0 + efficiency = 1 - loss_fraction if loss_fraction else 0 + total_energy_consumed = average_power_mw * self.timesteps / 3600 if self.timesteps else 0 # MW-hr + emissions = total_energy_consumed * 852.3 / 2204.6 / efficiency if efficiency else 0 + total_cost = total_energy_consumed * 1000 * self.config.get('POWER_COST', 0) # Total cost in dollars + + stats = { + 'num_samples': num_samples, + 'jobs completed': self.jobs_completed, + 'throughput': f'{throughput:.2f} jobs/hour', + 'jobs still running': [job.id for job in self.running], + 'jobs still in queue': [job.id for job in self.queue], + 'average power': f'{average_power_mw:.2f} MW', + 'min loss': f'{min_loss_mw:.2f} MW', + 'average loss': f'{average_loss_mw:.2f} MW', + 'max loss': f'{max_loss_mw:.2f} MW', + 'system power efficiency': f'{efficiency * 100:.2f}%', + 'total energy consumed': f'{total_energy_consumed:.2f} MW-hr', + 'carbon emissions': f'{emissions:.2f} metric tons CO2', + 'total cost': f'${total_cost:.2f}' + } + + return stats diff --git a/raps/flops.py b/raps/flops.py index 64511ad1a7e77f38b97874f3c9de6629a392f6bb..eebd0fa117c0243f509d70e7a87f940af17ad7f5 100644 --- a/raps/flops.py +++ b/raps/flops.py @@ -9,17 +9,35 @@ class FLOPSManager(): self.flop_state = np.zeros(self.config['SC_SHAPE']) def update_flop_state(self, scheduled_nodes, cpu_util, gpu_util): - node_indices = linear_to_3d_index(scheduled_nodes, self.config['SC_SHAPE']) - if self.validate: # cpu_util is in fact node_Watts in this case - self.flop_state[node_indices] = \ - (self.config['CPU_FP_RATIO']*self.config['CPU_PEAK_FLOPS'] + self.config['GPU_FP_RATIO'] * self.config['GPU_PEAK_FLOPS']) * (cpu_util / (self.config['POWER_CPU_MAX']*self.config['CPUS_PER_NODE'] + self.config['POWER_GPU_MAX']*self.config['GPUS_PER_NODE']+ self.config['POWER_NIC']*self.config['NICS_PER_NODE']+self.config['POWER_NVME'])) - else: - self.flop_state[node_indices] = \ - self.config['CPU_FP_RATIO'] * cpu_util * self.config['CPU_PEAK_FLOPS'] + \ - self.config['GPU_FP_RATIO'] * gpu_util * self.config['GPU_PEAK_FLOPS'] + cpu_util = np.asarray(cpu_util) + gpu_util = np.asarray(gpu_util) + job_lengths = np.array([len(job) for job in scheduled_nodes]) + flattened_nodes = np.concatenate(scheduled_nodes, axis=0) + + cpu_util_flat = np.repeat(cpu_util, job_lengths) + gpu_util_flat = np.repeat(gpu_util, job_lengths) + node_indices = linear_to_3d_index(flattened_nodes, self.config['SC_SHAPE']) + if self.validate: # cpu_util is in fact node_Watts in this case + total_peak = ( + self.config['CPU_FP_RATIO'] * self.config['CPU_PEAK_FLOPS'] + + self.config['GPU_FP_RATIO'] * self.config['GPU_PEAK_FLOPS'] + ) + denominator = ( + self.config['POWER_CPU_MAX'] * self.config['CPUS_PER_NODE'] + + self.config['POWER_GPU_MAX'] * self.config['GPUS_PER_NODE'] + + self.config['POWER_NIC'] * self.config['NICS_PER_NODE'] + + self.config['POWER_NVME'] + ) + self.flop_state[node_indices] = total_peak * (cpu_util_flat / denominator) + else: + self.flop_state[node_indices] = ( + self.config['CPU_FP_RATIO'] * cpu_util_flat * self.config['CPU_PEAK_FLOPS'] + + self.config['GPU_FP_RATIO'] * gpu_util_flat * self.config['GPU_PEAK_FLOPS'] + ) + def get_rpeak(self): node_peak_flops = self.config['CPUS_PER_NODE'] * self.config['CPU_PEAK_FLOPS'] \ + self.config['GPUS_PER_NODE'] * self.config['GPU_PEAK_FLOPS'] diff --git a/raps/policy.py b/raps/policy.py deleted file mode 100644 index c7900db7545ad777f37c56cec25c15d75aa3aecd..0000000000000000000000000000000000000000 --- a/raps/policy.py +++ /dev/null @@ -1,62 +0,0 @@ -from enum import Enum - -class PolicyType(Enum): - FCFS = 'fcfs' - BACKFILL = 'backfill' - PRIORITY = 'priority' - SJF = 'sjf' - #DEADLINE = 'deadline' # not yet supported - - -class Policy: - - def __init__(self, strategy): - self.strategy = PolicyType(strategy) - - def sort_jobs(self, jobs): - if self.strategy == PolicyType.FCFS or self.strategy == PolicyType.BACKFILL: - return sorted(jobs, key=lambda job: job.submit_time) - elif self.strategy == PolicyType.SJF: - return sorted(jobs, key=lambda job: job.wall_time) - elif self.strategy == PolicyType.PRIORITY: - return sorted(jobs, key=lambda job: job.priority, reverse=True) - else: - raise ValueError(f"Unknown policy type: {self.policy_type}") - - def find_backfill_job(self, queue, num_free_nodes, current_time): - """ This implementation is based on pseudocode from Leonenkov and Zhumatiy. - "Introducing new backfill-based scheduler for slurm resource manager." - Procedia computer science 66 (2015): 661-669. """ - - first_job = queue[0] - - for job in queue: job.end_time = current_time + job.wall_time - - # Sort jobs according to their termination time (end_time) - sorted_queue = sorted(queue, key=lambda job: job.end_time) - - # Compute shadow time - loop over the list and collect nodes until the - # number of available nodes is sufficient for the first job in the queue - sum_nodes = 0 - shadow_time = None - for job in sorted_queue: - sum_nodes += job.nodes_required - if sum_nodes >= first_job.nodes_required: - shadow_time = current_time + job.wall_time - num_extra_nodes = sum_nodes - job.nodes_required - break - - # Find backfill job - backfill_job = None - for job in queue: - # condition1 checks that the job ends before first_job starts - condition1 = job.nodes_required <= num_free_nodes \ - and current_time + job.wall_time < shadow_time - # condition2 checks that the job does not interfere with first_job - condition2 = job.nodes_required <= min(num_free_nodes, num_extra_nodes) - - if condition1 or condition2: - backfill_job = job - break - - return backfill_job diff --git a/raps/power.py b/raps/power.py index 29e31a09abe9c659eb4796ca4d9d95e81fea0479..e61010f86d1a82fc5f39436c27384e02f1d2452f 100644 --- a/raps/power.py +++ b/raps/power.py @@ -58,7 +58,10 @@ def compute_node_power(cpu_util, gpu_util, net_util, config): power_nic = config['POWER_NIC_IDLE'] + \ (config['POWER_NIC_MAX'] - config['POWER_NIC_IDLE']) * net_util except: - power_nic = config['POWER_NIC'] + if isinstance(net_util, np.ndarray): + power_nic = config['POWER_NIC'] * np.ones(net_util.shape) + else: + power_nic = config['POWER_NIC'] power_total = power_cpu + power_gpu + config['POWER_MEM'] + \ config['NICS_PER_NODE'] * power_nic + config['POWER_NVME'] @@ -260,11 +263,23 @@ class PowerManager: float Total power consumption of the scheduled nodes. """ - node_indices = linear_to_3d_index(scheduled_nodes, self.sc_shape) - power_value, sivoc_loss = self.power_func(cpu_util, gpu_util, net_util, self.config) + cpu_util = np.asarray(cpu_util) + gpu_util = np.asarray(gpu_util) + net_util = np.asarray(net_util) + job_lengths = np.array([len(job) for job in scheduled_nodes]) + flattened_nodes = np.concatenate(scheduled_nodes, axis=0) + + cpu_util_flat = np.repeat(cpu_util, job_lengths) + gpu_util_flat = np.repeat(gpu_util, job_lengths) + net_util_flat = np.repeat(net_util, job_lengths) + + node_indices = linear_to_3d_index(flattened_nodes, self.config['SC_SHAPE']) + + power_value, sivoc_loss = self.power_func(cpu_util_flat, gpu_util_flat, net_util_flat, self.config) self.power_state[node_indices] = power_value self.sivoc_loss[node_indices] = sivoc_loss - return power_value * len(scheduled_nodes) + return power_value[np.cumsum(job_lengths) - 1] + def calculate_rectifiers_needed(self, power_state_summed): """ @@ -319,7 +334,9 @@ class PowerManager: num_rectifiers = num_rectifiers_array[i, j, k] power_per_rectifier = chassis_power[i, j, k] / num_rectifiers rectifier_power[i, j, k, :num_rectifiers] = power_per_rectifier - power_with_losses[i, j, k, :num_rectifiers] = rectifier_loss(power_per_rectifier) + power_with_losses[i, j, k, :num_rectifiers] = compute_loss(power_per_rectifier, \ + self.config['RECTIFIER_LOSS_CONSTANT'], \ + self.config['RECTIFIER_EFFICIENCY']) rectifier_power = np.nan_to_num(rectifier_power) power_with_losses = np.nan_to_num(power_with_losses) diff --git a/raps/resmgr.py b/raps/resmgr.py new file mode 100644 index 0000000000000000000000000000000000000000..8abce81730f4d7b287277940169871e7db3b1e5a --- /dev/null +++ b/raps/resmgr.py @@ -0,0 +1,77 @@ +import numpy as np +from .job import JobState +from scipy.stats import weibull_min + + +class ResourceManager: + def __init__(self, total_nodes, down_nodes): + self.total_nodes = total_nodes + # Maintain a set for down nodes (e.g., nodes that are offline) + self.down_nodes = set(down_nodes) + # Available nodes are those that are not down + self.available_nodes = sorted(set(range(total_nodes)) - self.down_nodes) + # You can track system utilization history here + self.sys_util_history = [] # list of (time, utilization) tuples + + def assign_nodes_to_job(self, job, current_time): + """Assigns nodes to a job and updates the available nodes.""" + if len(self.available_nodes) < job.nodes_required: + raise ValueError(f"Not enough available nodes to schedule job {job.id}") + + if job.requested_nodes: # Telemetry replay case + job.scheduled_nodes = job.requested_nodes + self.available_nodes = [n for n in self.available_nodes if n not in job.scheduled_nodes] + else: # Synthetic or reschedule case + job.scheduled_nodes = self.available_nodes[:job.nodes_required] + self.available_nodes = self.available_nodes[job.nodes_required:] + + # Set job start and end times + job.start_time = current_time + job.end_time = current_time + job.wall_time + job.state = JobState.RUNNING # Mark job as running + + def free_nodes_from_job(self, job): + """Frees the nodes that were allocated to a completed job.""" + if hasattr(job, "scheduled_nodes"): + self.available_nodes.extend(job.scheduled_nodes) + # Remove duplicates and sort the list for consistency + self.available_nodes = sorted(set(self.available_nodes)) + else: + # If job has no scheduled nodes, there is nothing to free. + pass + + def update_system_utilization(self, current_time, num_active_nodes): + """ + Computes and records the system utilization. + For example, utilization could be defined as the ratio of active nodes to the total non-down nodes. + """ + # Number of nodes that are not down: + total_operational = self.total_nodes - len(self.down_nodes) + # Compute utilization as a percentage: + utilization = (num_active_nodes / total_operational) * 100 if total_operational else 0 + self.sys_util_history.append((current_time, utilization)) + return utilization + + def node_failure(self, mtbf): + """Simulate node failure using Weibull distribution.""" + shape_parameter = 1.5 + scale_parameter = mtbf * 3600 # Convert to seconds + + # Create a NumPy array of node indices, excluding down nodes + all_nodes = np.array(sorted(set(range(self.total_nodes)) - set(self.down_nodes))) + + # Sample the Weibull distribution for all nodes at once + random_values = weibull_min.rvs(shape_parameter, scale=scale_parameter, size=all_nodes.size) + + # Identify nodes that have failed + failure_threshold = 0.1 + failed_nodes_mask = random_values < failure_threshold + newly_downed_nodes = all_nodes[failed_nodes_mask] + + # Update available and down nodes + for node_index in newly_downed_nodes: + if node_index in self.available_nodes: + self.available_nodes.remove(node_index) + self.down_nodes.add(str(node_index)) + + return newly_downed_nodes.tolist() diff --git a/raps/scheduler.py b/raps/scheduler.py deleted file mode 100644 index 721d6e87386f0e834e84bb7004c246af6db3ad20..0000000000000000000000000000000000000000 --- a/raps/scheduler.py +++ /dev/null @@ -1,460 +0,0 @@ -"""A module for job scheduling and simulation in a distributed computing environment. - -This module provides classes and functions for managing job scheduling and simulating the behavior -of a distributed computing system. It includes functionalities for scheduling jobs based on various -policies, simulating the passage of time, handling node failures, and generating statistics about -the simulation. - -Classes: -- JobState: An enumeration representing the states of a job. -- Job: A class representing a job to be scheduled and executed. -- TickData: A dataclass representing the state output from the simulation each tick. -- Scheduler: A class for job scheduling and simulation management. - -Functions: -- summarize_ranges: A utility function to summarize ranges of values. -- expand_ranges: A utility function to expand ranges of values into individual elements. - -Dependencies: -- numpy: For numerical computations and array manipulations. -- dataclasses: For creating data classes with less boilerplate code. -- pandas: For data manipulation and DataFrame generation. -- enum: For creating enumerations. -- scipy.stats: For statistical distributions and random number generation. -- typing: For type hints and annotations. - -Config parameters used: -- TRACE_QUANTA: The quantum of time for tracing job CPU and GPU utilization. -- MTBF: Mean Time Between Failures, used for node failure simulation. -- POWER_COST: Cost of power consumption per unit, used for calculating total cost. -- UI_UPDATE_FREQ: Frequency of updating the user interface. -- MAX_TIME: Maximum simulation time. -- POWER_UPDATE_FREQ: Frequency of updating power-related metrics. -- POWER_DF_HEADER: Header for the power related components of DataFrame. -- POWER_CDU: Power consumption of CDU. -- TOTAL_NODES: Total number of nodes in the system. -- COOLING_EFFICIENCY: Cooling efficiency factor. - -This module can be used to simulate job scheduling algorithms, analyze system behavior, and -optimize resource utilization in distributed computing environments. -""" -from typing import Optional -import dataclasses -import numpy as np - -from scipy.stats import weibull_min -import pandas as pd - -from .job import Job, JobState -from .account import Accounts -from .network import network_utilization -from .policy import Policy, PolicyType -from .utils import summarize_ranges, expand_ranges, write_dict_to_file - - -@dataclasses.dataclass -class TickData: - """ Represents the state output from the simulation each tick """ - current_time: int - completed: list[Job] - running: list[Job] - queue: list[Job] - down_nodes: list[int] - power_df: Optional[pd.DataFrame] - p_flops: Optional[float] - g_flops_w: Optional[float] - system_util: float - fmu_inputs: Optional[dict] - fmu_outputs: Optional[dict] - num_active_nodes: int - num_free_nodes: int - - -def get_utilization(trace, time_quanta_index): - if isinstance(trace, (list, np.ndarray)): - return trace[time_quanta_index] - elif isinstance(trace, (int, float)): - return float(trace) - else: - raise TypeError(f"Invalid type for utilization: {type(trace)}.") - - -class Scheduler: - """Job scheduler and simulation manager.""" - def __init__(self, *, power_manager, flops_manager, cooling_model=None, config, **kwargs): - self.config = config - self.down_nodes = summarize_ranges(self.config['DOWN_NODES']) - self.available_nodes = list(set(range(self.config['TOTAL_NODES'])) - set(self.config['DOWN_NODES'])) - self.num_free_nodes = len(self.available_nodes) - self.num_active_nodes = self.config['TOTAL_NODES'] - self.num_free_nodes - len(self.config['DOWN_NODES']) - self.running = [] - self.queue = [] - self.accounts = Accounts() - if 'accounts_json' in kwargs and kwargs['accounts_json']: - self.accounts.initialize_accounts_from_json(kwargs.get('accounts_json')) - self.jobs_completed = 0 - self.current_time = 0 - self.cooling_model = cooling_model - self.sys_power = 0 - self.power_manager = power_manager - self.flops_manager = flops_manager - self.debug = kwargs.get('debug') - self.output = kwargs.get('output') - self.replay = kwargs.get('replay') - self.policy = Policy(strategy=kwargs.get('schedule')) - self.sys_util_history = [] - self.history = [] - - - def add_job(self, job): - self.queue.append(job) - self.queue = self.policy.sort_jobs(self.queue) - - - def assign_nodes_to_job(self, job): - """Helper function to assign nodes to a job and update available nodes.""" - if len(self.available_nodes) < job.nodes_required: - # If there are not enough nodes, return or raise an error (handle as needed) - raise ValueError(f"Not enough available nodes to schedule job {job.id}") - - if job.requested_nodes: # replay case - # If the job has requested specific nodes, assign them - job.scheduled_nodes = job.requested_nodes - mask = ~np.isin(self.available_nodes, job.scheduled_nodes) - self.available_nodes = np.array(self.available_nodes) - self.available_nodes = self.available_nodes[mask] - self.available_nodes = self.available_nodes.tolist() - else: # synthetic or reschedule case - # Assign the nodes from available pool - job.scheduled_nodes = self.available_nodes[:job.nodes_required] - self.available_nodes = self.available_nodes[job.nodes_required:] - - # Set job start and end times - job.start_time = self.current_time - job.end_time = self.current_time + job.wall_time - - # Mark the job as running - job.state = JobState.RUNNING - self.running.append(job) - - - def schedule(self, jobs): - """Schedule jobs""" - for job_info in jobs: - job = Job(job_info, self.current_time) - self.add_job(job) - - while self.queue: - - # Try scheduling the first job in the queue - job = self.queue.pop(0) - synthetic_bool = len(self.available_nodes) >= job.nodes_required - telemetry_bool = job.requested_nodes and set(job.requested_nodes).issubset(set(self.available_nodes)) - - if synthetic_bool or telemetry_bool: - - # Schedule job - self.assign_nodes_to_job(job) - self.history.append(dict(id=job.id, time=self.current_time, nodes=job.nodes_required, wall_time=job.wall_time)) - - if self.debug: - scheduled_nodes = summarize_ranges(job.scheduled_nodes) - print(f"t={self.current_time}: Scheduled job with wall time", - f"{job.wall_time} on nodes {scheduled_nodes}") - - else: - # If the job cannot be scheduled, either try backfilling or requeue it - if self.queue and self.policy.strategy == PolicyType.BACKFILL: - self.queue.insert(0, job) - backfill_job = self.policy.find_backfill_job(self.queue, len(self.available_nodes), self.current_time) - if backfill_job: - self.assign_nodes_to_job(backfill_job) - self.queue.remove(backfill_job) - if self.debug: - scheduled_nodes = summarize_ranges(backfill_job.scheduled_nodes) - print(f"t={self.current_time}: Backfilling job {backfill_job.id} with wall time", - f"{backfill_job.wall_time} on nodes {scheduled_nodes}") - else: - self.queue.append(job) # Note, this should be fixed. It shouldn't go to the end of the queue. - break - - def tick(self): - """Simulate a timestep.""" - completed_jobs = [job for job in self.running if job.end_time - is not None and job.end_time <= self.current_time] - completed_job_stats = [] - # Simulate node failure - newly_downed_nodes = self.node_failure(self.config['MTBF']) - - # Update active/free nodes - self.num_free_nodes = len(self.available_nodes) - self.num_active_nodes = self.config['TOTAL_NODES'] - self.num_free_nodes \ - - len(expand_ranges(self.down_nodes)) - - # Update running time for all running jobs - for job in self.running: - - if job.end_time == self.current_time: - job.state = JobState.COMPLETED - - if job.state == JobState.RUNNING: - # Deal with node that fails during the course of a running job - #if any(node in job.scheduled_nodes for node in newly_downed_nodes): - if False: # currently disabled b/c not working correctly - - # Update job state to FAILED - job.state = JobState.FAILED - - # Release all nodes except the downed node - for node in job.scheduled_nodes: - if node not in newly_downed_nodes: - self.available_nodes.append(node) - self.available_nodes.sort() - - # Remove job from the list of running jobs - self.running.remove(job) - - job.running_time = self.current_time - job.start_time - - time_quanta_index = (self.current_time - job.start_time) \ - // self.config['TRACE_QUANTA'] - - cpu_util = get_utilization(job.cpu_trace, time_quanta_index) - gpu_util = get_utilization(job.gpu_trace, time_quanta_index) - - if len(job.ntx_trace) and len(job.nrx_trace): - net_tx = get_utilization(job.ntx_trace, time_quanta_index) - net_rx = get_utilization(job.nrx_trace, time_quanta_index) - net_util = network_utilization(net_tx, net_rx) - else: - net_util = 0 - - self.flops_manager.update_flop_state(job.scheduled_nodes, cpu_util, gpu_util) - job.power = self.power_manager.update_power_state(job.scheduled_nodes, - cpu_util, gpu_util, net_util) - - if job.running_time % self.config['TRACE_QUANTA'] == 0: - job.power_history.append(job.power) - - for job in completed_jobs: - # Release the nodes used by this job - self.available_nodes.extend(job.scheduled_nodes) - self.available_nodes.sort() - - if self.debug: - print( - f"\nt={self.current_time}: " - f"Releasing {len(job.scheduled_nodes)} nodes from completed job; " - f"{len(self.available_nodes)} nodes available after release." - ) - - # Set nodes back to idle power - node_indices = np.array(job.scheduled_nodes) - if self.debug: - print("setting idle nodes:", node_indices) - self.power_manager.set_idle(node_indices) - self.flops_manager.update_flop_state(job.scheduled_nodes, \ - cpu_util=0, gpu_util=0) - - # Remove job from list of running jobs - self.running.remove(job) - scheduled_nodes = summarize_ranges(job.scheduled_nodes) - - if self.debug: - print(f"Released {scheduled_nodes}") - self.jobs_completed += 1 - job_stats = job.statistics() - if self.debug: - print(job_stats) - completed_job_stats.append(job_stats) - self.accounts.update_account_statistics(job_stats) - if self.output: - # output power trace - with open(self.opath / f'job-power-{job.id}.txt', 'w') as file: - print(*job.power_history, sep=', ', file=file) - write_dict_to_file(vars(job_stats),self.opath / f'job-stats-{job.account}.json') - # Ask scheduler to schedule any jobs waiting in queue - self.schedule([]) - - # Update the power array UI component - rack_power, rect_losses = self.power_manager.compute_rack_power() - sivoc_losses = self.power_manager.compute_sivoc_losses() - rack_loss = rect_losses + sivoc_losses - - # Update system utilization - system_util = self.num_active_nodes / self.config['AVAILABLE_NODES'] * 100 - self.sys_util_history.append((self.current_time, system_util)) - - # Render the updated layout - power_df = None - cooling_inputs, cooling_outputs = None, None - - # Update power history every 15s - if self.current_time % self.config['POWER_UPDATE_FREQ'] == 0: - total_power_kw = sum(row[-1] for row in rack_power) + self.config['NUM_CDUS'] * self.config['POWER_CDU'] / 1000.0 - total_loss_kw = sum(row[-1] for row in rack_loss) - self.power_manager.history.append((self.current_time, total_power_kw)) - self.sys_power = total_power_kw - self.power_manager.loss_history.append((self.current_time, total_loss_kw)) - output_df = self.power_manager.get_power_df(rack_power, rack_loss) - pflops = self.flops_manager.get_system_performance() / 1E15 - gflop_per_watt = pflops * 1E6 / (total_power_kw * 1000) - else: - pflops, gflop_per_watt = None, None - - if self.current_time % self.config['POWER_UPDATE_FREQ'] == 0: - if self.cooling_model: - # Power for NUM_CDUS (25 for Frontier) - cdu_power = rack_power.T[-1] * 1000 - runtime_values = self.cooling_model.generate_runtime_values(cdu_power, self) - - # FMU inputs are N powers and the wetbulb temp - fmu_inputs = self.cooling_model.generate_fmu_inputs(runtime_values, - uncertainties=self.power_manager.uncertainties) - cooling_inputs, cooling_outputs = ( - self.cooling_model.step(self.current_time, fmu_inputs, self.config['POWER_UPDATE_FREQ']) - ) - - # Get a dataframe of the power data - power_df = self.power_manager.get_power_df(rack_power, rack_loss) - else: - # Get a dataframe of the power data - power_df = self.power_manager.get_power_df(rack_power, rack_loss) - - tick_data = TickData( - current_time = self.current_time, - completed = completed_jobs, - running = self.running, - queue = self.queue, - down_nodes = expand_ranges(self.down_nodes), - power_df = power_df, - p_flops = pflops, - g_flops_w = gflop_per_watt, - system_util = system_util, - fmu_inputs = cooling_inputs, - fmu_outputs = cooling_outputs, - num_active_nodes = self.num_active_nodes, - num_free_nodes = self.num_free_nodes, - ) - - self.current_time += 1 - return tick_data - - def get_gauge_limits(self): - """For setting max values in dashboard gauges""" - peak_flops = self.flops_manager.get_rpeak() - peak_power = self.power_manager.get_peak_power() - gflops_per_watt_max = peak_flops / 1E9 / peak_power - - if self.debug: - print(f"System Rpeak: {peak_flops/1E15:.2f} PFLOPS") - print(f"Peak power: {peak_power/1E3:.0f} kW") - print(f"Max energy efficiency: {gflops_per_watt_max:.1f} GFLOPS/W") - - limits = {'peak_flops': peak_flops, 'peak_power': peak_power, \ - 'g_flops_w_peak': gflops_per_watt_max} - return limits - - def run_simulation(self, jobs, timesteps): - """ Generator that yields after each simulation tick """ - last_submit_time = 0 - self.timesteps = timesteps - if self.debug: - limits = self.get_gauge_limits() - - for timestep in range(timesteps): - # Print the current timestep for this partition - if timestep % self.config['UI_UPDATE_FREQ'] == 0: - sys_util = self.sys_util_history[-1][1] if self.sys_util_history else 0 - - while self.current_time >= last_submit_time and jobs: - job = jobs.pop(0) - self.schedule([job]) - if jobs: - last_submit_time = job['submit_time'] - else: # No more jobs, set to infinity to avoid triggering again - last_submit_time = float('inf') - yield self.tick() - - # Stop the simulation if no more jobs running or are in the queue - if not self.queue and not self.running and not self.replay: - print(f"[DEBUG] {self.config['system_name']} - Stopping simulation at time {self.current_time}") - break - if self.debug and timestep % self.config['UI_UPDATE_FREQ'] == 0: - print(".", end="", flush=True) - - def get_history(self): - return self.history - - def get_stats(self): - """ Return output statistics """ - sum_values = lambda values : sum(x[1] for x in values) - min_value = lambda values : min(x[1] for x in values) - max_value = lambda values : max(x[1] for x in values) - num_samples = len(self.power_manager.history) - throughput = self.jobs_completed / self.timesteps * 3600 # jobs/hour - average_power_mw = sum_values(self.power_manager.history) / num_samples / 1000 - average_loss_mw = sum_values(self.power_manager.loss_history) / num_samples / 1000 - min_loss_mw = min_value(self.power_manager.loss_history) / 1000 - max_loss_mw = max_value(self.power_manager.loss_history) / 1000 - self.power_manager.loss_history_percentage = \ - [(x[0], x[1] / y[1]) for x, y in zip(self.power_manager.loss_history, \ - self.power_manager.history)] - min_loss_pct = min_value(self.power_manager.loss_history_percentage) - max_loss_pct = max_value(self.power_manager.loss_history_percentage) - - loss_fraction = average_loss_mw / average_power_mw - efficiency = 1 - loss_fraction - # compute total power consumed by multiplying average power times length of simulation - total_energy_consumed = average_power_mw * self.timesteps / 3600 # MW-hr - # From https://www.epa.gov/energy/greenhouse-gases-equivalencies-\ - # calculator-calculations-and-references - emissions = total_energy_consumed * 852.3 / 2204.6 / efficiency - total_cost = total_energy_consumed * 1000 * self.config['POWER_COST'] # total cost in dollars - - stats = { - 'num_samples': num_samples, - 'jobs completed': self.jobs_completed, - 'throughput': f'{throughput:.2f} jobs/hour', - 'jobs still running': [job.id for job in self.running], - 'jobs still in queue': [job.id for job in self.queue], - 'average power': f'{average_power_mw:.2f} MW', - 'min loss': f'{min_loss_mw:.2f} MW ({min_loss_pct*100:.2f}%)', - 'average loss': f'{average_loss_mw:.2f} MW ({loss_fraction*100:.2f}%)', - 'max loss': f'{max_loss_mw:.2f} MW ({max_loss_pct*100:.2f}%)', - 'system power efficiency': f'{efficiency*100:.2f}', - 'total energy consumed': f'{total_energy_consumed:.2f} MW-hr', - 'carbon emissions': f'{emissions:.2f} metric tons CO2', - 'total cost': f'${total_cost:.2f}' - } - - return stats - - def node_failure(self, mtbf): - """Simulate node failure.""" - shape_parameter = 1.5 - scale_parameter = mtbf * 3600 # to seconds - - # Create a NumPy array of node indices, excluding down nodes - down_nodes = expand_ranges(self.down_nodes) - all_nodes = np.setdiff1d(np.arange(self.config['TOTAL_NODES']), - np.array(down_nodes, dtype=int)) - - # Sample the Weibull distribution for all nodes at once - random_values = weibull_min.rvs(shape_parameter, - scale=scale_parameter, - size=all_nodes.size) - - # Identify nodes that have failed - failure_threshold = 0.1 - failed_nodes_mask = random_values < failure_threshold - newly_downed_nodes = all_nodes[failed_nodes_mask] - - # Update available and down nodes - for node_index in newly_downed_nodes: - if node_index in self.available_nodes: - self.available_nodes.remove(node_index) - self.down_nodes.append(str(node_index)) - self.power_manager.set_idle(node_index) - - return newly_downed_nodes.tolist() diff --git a/raps/schedulers/__init__.py b/raps/schedulers/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..ca3431e041c0b866f5df9b541122228c1134a4ad --- /dev/null +++ b/raps/schedulers/__init__.py @@ -0,0 +1,6 @@ +from importlib import import_module + +def load_scheduler(scheduler_type="default"): + """Dynamically loads a scheduler by type.""" + module = import_module(f".{scheduler_type}", package="raps.schedulers") + return getattr(module, f"Scheduler") diff --git a/raps/schedulers/default.py b/raps/schedulers/default.py new file mode 100644 index 0000000000000000000000000000000000000000..7ab664bf34dc7f0af3d790ef4e1ec1e9eb5c4ffb --- /dev/null +++ b/raps/schedulers/default.py @@ -0,0 +1,114 @@ +from enum import Enum +from ..job import Job, JobState +from ..utils import summarize_ranges + + +class PolicyType(Enum): + """Supported scheduling policies.""" + FCFS = 'fcfs' + BACKFILL = 'backfill' + PRIORITY = 'priority' + SJF = 'sjf' + + +class Scheduler: + """ Default job scheduler with various scheduling policies. """ + + + def __init__(self, config, policy, resource_manager=None): + self.config = config + self.policy = PolicyType(policy) + if resource_manager is None: + raise ValueError("Scheduler requires a ResourceManager instance") + self.resource_manager = resource_manager + self.debug = False + + + def sort_jobs(self, queue): + """Sort jobs based on the selected scheduling policy.""" + if self.policy == PolicyType.FCFS or self.policy == PolicyType.BACKFILL: + return sorted(queue, key=lambda job: job.submit_time) + elif self.policy == PolicyType.SJF: + return sorted(queue, key=lambda job: job.wall_time) + elif self.policy == PolicyType.PRIORITY: + return sorted(queue, key=lambda job: job.priority, reverse=True) + else: + raise ValueError(f"Unknown policy type: {self.policy}") + + + def schedule(self, queue, running, current_time, debug=False): + # Sort the queue in place. + queue[:] = self.sort_jobs(queue) + + # Iterate over a copy of the queue since we might remove items + for job in queue[:]: + + # For synthetic jobs the number of requested nodes is given. + # Make sure the available nodes count meets job.nodes_required. + synthetic_bool = len(self.resource_manager.available_nodes) >= job.nodes_required + + # For telemetry replay jobs a list of requested nodes is provided. + # Make sure the requested nodes are available. + telemetry_bool = False + if job.requested_nodes: + telemetry_bool = set(job.requested_nodes).issubset(set(self.resource_manager.available_nodes)) + + if synthetic_bool or telemetry_bool: + self.resource_manager.assign_nodes_to_job(job, current_time) + running.append(job) + queue.remove(job) + if debug: + scheduled_nodes = summarize_ranges(job.scheduled_nodes) + print(f"t={current_time}: Scheduled job {job.id} with wall time {job.wall_time} on nodes {scheduled_nodes}") + else: + if self.policy == PolicyType.BACKFILL: + # Try to find a backfill candidate from the entire queue. + backfill_job = self.find_backfill_job(queue, len(available_nodes), current_time) + if backfill_job: + self.assign_nodes_to_job(backfill_job, available_nodes, current_time) + running.append(backfill_job) + queue.remove(backfill_job) + if debug: + scheduled_nodes = summarize_ranges(backfill_job.scheduled_nodes) + print(f"t={current_time}: Backfilling job {backfill_job.id} with wall time {backfill_job.wall_time} on nodes {scheduled_nodes}") + + + def find_backfill_job(self, queue, num_free_nodes, current_time): + """Finds a backfill job based on available nodes and estimated completion times. + + Based on pseudocode from Leonenkov and Zhumatiy, 'Introducing new backfill-based + scheduler for slurm resource manager.' Procedia computer science 66 (2015): 661-669. + """ + + if not queue: + return None + + first_job = queue[0] + + for job in queue: + job.end_time = current_time + job.wall_time # Estimate end time + + # Sort jobs according to their termination time (end_time) + sorted_queue = sorted(queue, key=lambda job: job.end_time) + + # Compute shadow time by accumulating nodes + sum_nodes = 0 + shadow_time = None + num_extra_nodes = 0 + + for job in sorted_queue: + sum_nodes += job.nodes_required + if sum_nodes >= first_job.nodes_required: + shadow_time = current_time + job.wall_time + num_extra_nodes = sum_nodes - job.nodes_required + break + + # Find backfill job + for job in queue: + condition1 = job.nodes_required <= num_free_nodes and current_time + job.wall_time < shadow_time + condition2 = job.nodes_required <= min(num_free_nodes, num_extra_nodes) + + if condition1 or condition2: + return job + + return None diff --git a/raps/telemetry.py b/raps/telemetry.py index 9c006a2f1b10e0889ba0a708da5cdc13531684fc..833ca39305cb4d2255ac3b28ab52aaa1a6379351 100644 --- a/raps/telemetry.py +++ b/raps/telemetry.py @@ -23,12 +23,10 @@ if __name__ == "__main__": import importlib import numpy as np -import re -from datetime import datetime from tqdm import tqdm from .config import ConfigManager -from .scheduler import Job +from .job import Job from .plotting import plot_submit_times, plot_nodes_histogram from .utils import next_arrival diff --git a/raps/ui.py b/raps/ui.py index dd7725f510938cbce76402bf68adf3aef09e81af..8bee172df2b524a9be38212eac72034b5e44047c 100644 --- a/raps/ui.py +++ b/raps/ui.py @@ -1,4 +1,3 @@ -import numpy as np import pandas as pd from rich.align import Align from rich.console import Console @@ -7,12 +6,12 @@ from rich.panel import Panel from rich.table import Table from .utils import summarize_ranges, convert_seconds from .constants import ELLIPSES -from .scheduler import TickData, Scheduler +from .engine import TickData, Engine class LayoutManager: - def __init__(self, layout_type, scheduler: Scheduler, debug, **config): - self.scheduler = scheduler + def __init__(self, layout_type, engine: Engine, debug, **config): + self.engine = engine self.config = config self.console = Console() self.layout = Layout() @@ -375,9 +374,9 @@ class LayoutManager: self.layout["lower"].update(Panel(Align(total_table, align="center"), title="Power and Performance")) def update(self, data: TickData): - uncertainties = self.scheduler.power_manager.uncertainties + uncertainties = self.engine.power_manager.uncertainties - if self.scheduler.cooling_model: + if self.engine.cooling_model: self.update_powertemp_array( data.power_df, data.fmu_outputs, data.p_flops, data.g_flops_w, data.system_util, uncertainties = uncertainties, @@ -401,11 +400,11 @@ class LayoutManager: def run(self, jobs, timesteps): """ Runs the UI, blocking until the simulation is complete """ - for data in self.scheduler.run_simulation(jobs, timesteps): + for data in self.engine.run_simulation(jobs, timesteps): if data.current_time % self.config['UI_UPDATE_FREQ'] == 0: self.update(data) self.render() def run_stepwise(self, jobs, timesteps): """ Prepares the UI and returns a generator for the simulation """ - return self.scheduler.run_simulation(jobs, timesteps) + return self.engine.run_simulation(jobs, timesteps) diff --git a/raps/utils.py b/raps/utils.py index 6c9d4d763435383e9f9b6d81e1d817494b447354..5ead3d137cf431af851fa897ae7aeccacec7b0ec 100644 --- a/raps/utils.py +++ b/raps/utils.py @@ -360,3 +360,13 @@ def toJSON(obj): default=lambda o:o.__dict__, sort_keys=True, indent=4) + + +def get_utilization(trace, time_quanta_index): + """Retrieve utilization value for a given trace at a specific time quanta index.""" + if isinstance(trace, (list, np.ndarray)): + return trace[time_quanta_index] + elif isinstance(trace, (int, float)): + return float(trace) + else: + raise TypeError(f"Invalid type for utilization: {type(trace)}.") diff --git a/raps/weather.py b/raps/weather.py index 3917279a0efc9070af21ed7f53f9b346461a743b..b31f88e2ab3888f57893d2c6ef5c6e1cc7611549 100644 --- a/raps/weather.py +++ b/raps/weather.py @@ -1,7 +1,6 @@ import requests import urllib3 -import json -from datetime import datetime, timedelta +from datetime import datetime # Disable SSL warnings when verify=False is used (temporary debugging purpose) urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) diff --git a/raps/workload.py b/raps/workload.py index 14d91cc40388e12dfe3ddd1e9ad1adeecb8fe904..b368246a0c24c503e2a7ac9868a7b80b3dad4090 100644 --- a/raps/workload.py +++ b/raps/workload.py @@ -25,7 +25,6 @@ JOB_END_PROBS : list """ -import math import random import numpy as np diff --git a/tests/smoke.py b/tests/smoke.py index 623908ba325b1421e4bdcae23ba08d2f50200a34..9174b3c22409cb68873ddd860cac9aac946c703c 100644 --- a/tests/smoke.py +++ b/tests/smoke.py @@ -11,7 +11,6 @@ DEFAULT_TIME = "1h" # Define systems and their corresponding filenames SYSTEMS = { "frontier": "frontier/slurm/joblive/date=2024-01-18 frontier/jobprofile/date=2024-01-18", - "fugaku": "fugaku/21_04.parquet", "marconi100": "marconi100/job_table.parquet", "lassen": "lassen/Lassen-Supercomputer-Job-Dataset", "adastraMI250": "adastra/AdastaJobsMI250_15days.parquet"