Loading raps/engine.py +7 −18 Original line number Diff line number Diff line from typing import Optional import dataclasses import numpy as np import pandas as pd from .job import Job, JobState from .account import Accounts from .network import network_utilization from .utils import summarize_ranges, expand_ranges, write_dict_to_file from .utils import summarize_ranges, expand_ranges, get_utilization from .resmgr import ResourceManager from .schedulers import load_scheduler Loading Loading @@ -74,10 +73,11 @@ class Engine: def tick(self): """Simulate a timestep.""" completed_jobs = [job for job in self.running if job.end_time is not None and job.end_time <= self.current_time] completed_job_stats = [] # Simulate node failure newly_downed_nodes = self.resource_manager.node_failure(self.config['MTBF']) for node in newly_downed_nodes: self.power_manager.set_idle(node) # Update active/free nodes self.num_free_nodes = len(self.resource_manager.available_nodes) Loading @@ -97,13 +97,13 @@ class Engine: if job.state == JobState.RUNNING: job.running_time = self.current_time - job.start_time time_quanta_index = (self.current_time - job.start_time) // self.config['TRACE_QUANTA'] cpu_util = self.get_utilization(job.cpu_trace, time_quanta_index) gpu_util = self.get_utilization(job.gpu_trace, time_quanta_index) cpu_util = get_utilization(job.cpu_trace, time_quanta_index) gpu_util = get_utilization(job.gpu_trace, time_quanta_index) net_util = 0 if len(job.ntx_trace) and len(job.nrx_trace): net_tx = self.get_utilization(job.ntx_trace, time_quanta_index) net_rx = self.get_utilization(job.nrx_trace, time_quanta_index) net_tx = get_utilization(job.ntx_trace, time_quanta_index) net_rx = get_utilization(job.nrx_trace, time_quanta_index) net_util = network_utilization(net_tx, net_rx) net_utils.append(net_util) else: Loading Loading @@ -156,7 +156,6 @@ class Engine: self.power_manager.history.append((self.current_time, total_power_kw)) self.sys_power = total_power_kw self.power_manager.loss_history.append((self.current_time, total_loss_kw)) output_df = self.power_manager.get_power_df(rack_power, rack_loss) pflops = self.flops_manager.get_system_performance() / 1E15 gflop_per_watt = pflops * 1E6 / (total_power_kw * 1000) else: Loading Loading @@ -201,16 +200,6 @@ class Engine: return tick_data def get_utilization(self, trace, time_quanta_index): """Retrieve utilization value for a given trace at a specific time quanta index.""" if isinstance(trace, (list, np.ndarray)): return trace[time_quanta_index] elif isinstance(trace, (int, float)): return float(trace) else: raise TypeError(f"Invalid type for utilization: {type(trace)}.") def run_simulation(self, jobs, timesteps, autoshutdown=False): """Generator that yields after each simulation tick.""" self.timesteps = timesteps Loading raps/power.py +3 −1 Original line number Diff line number Diff line Loading @@ -334,7 +334,9 @@ class PowerManager: num_rectifiers = num_rectifiers_array[i, j, k] power_per_rectifier = chassis_power[i, j, k] / num_rectifiers rectifier_power[i, j, k, :num_rectifiers] = power_per_rectifier power_with_losses[i, j, k, :num_rectifiers] = rectifier_loss(power_per_rectifier) power_with_losses[i, j, k, :num_rectifiers] = compute_loss(power_per_rectifier, \ self.config['RECTIFIER_LOSS_CONSTANT'], \ self.config['RECTIFIER_EFFICIENCY']) rectifier_power = np.nan_to_num(rectifier_power) power_with_losses = np.nan_to_num(power_with_losses) Loading raps/resmgr.py +0 −7 Original line number Diff line number Diff line import numpy as np from .job import JobState from .utils import expand_ranges from scipy.stats import weibull_min Loading Loading @@ -59,15 +58,10 @@ class ResourceManager: scale_parameter = mtbf * 3600 # Convert to seconds # Create a NumPy array of node indices, excluding down nodes #print(self.down_nodes) #down_nodes = expand_ranges(self.down_nodes) #all_nodes = np.setdiff1d(np.arange(self.config['TOTAL_NODES']), np.array(self.down_nodes, dtype=int)) all_nodes = np.array(sorted(set(range(self.total_nodes)) - set(self.down_nodes))) # Sample the Weibull distribution for all nodes at once random_values = weibull_min.rvs(shape_parameter, scale=scale_parameter, size=all_nodes.size) failure_threshold = 0.1 failed_nodes = [node for node, r in zip(all_nodes, random_values) if r < failure_threshold] # Identify nodes that have failed failure_threshold = 0.1 Loading @@ -79,6 +73,5 @@ class ResourceManager: if node_index in self.available_nodes: self.available_nodes.remove(node_index) self.down_nodes.add(str(node_index)) self.power_manager.set_idle(node_index) return newly_downed_nodes.tolist() raps/telemetry.py +0 −2 Original line number Diff line number Diff line Loading @@ -23,8 +23,6 @@ if __name__ == "__main__": import importlib import numpy as np import re from datetime import datetime from tqdm import tqdm from .config import ConfigManager Loading raps/ui.py +0 −1 Original line number Diff line number Diff line import numpy as np import pandas as pd from rich.align import Align from rich.console import Console Loading Loading
raps/engine.py +7 −18 Original line number Diff line number Diff line from typing import Optional import dataclasses import numpy as np import pandas as pd from .job import Job, JobState from .account import Accounts from .network import network_utilization from .utils import summarize_ranges, expand_ranges, write_dict_to_file from .utils import summarize_ranges, expand_ranges, get_utilization from .resmgr import ResourceManager from .schedulers import load_scheduler Loading Loading @@ -74,10 +73,11 @@ class Engine: def tick(self): """Simulate a timestep.""" completed_jobs = [job for job in self.running if job.end_time is not None and job.end_time <= self.current_time] completed_job_stats = [] # Simulate node failure newly_downed_nodes = self.resource_manager.node_failure(self.config['MTBF']) for node in newly_downed_nodes: self.power_manager.set_idle(node) # Update active/free nodes self.num_free_nodes = len(self.resource_manager.available_nodes) Loading @@ -97,13 +97,13 @@ class Engine: if job.state == JobState.RUNNING: job.running_time = self.current_time - job.start_time time_quanta_index = (self.current_time - job.start_time) // self.config['TRACE_QUANTA'] cpu_util = self.get_utilization(job.cpu_trace, time_quanta_index) gpu_util = self.get_utilization(job.gpu_trace, time_quanta_index) cpu_util = get_utilization(job.cpu_trace, time_quanta_index) gpu_util = get_utilization(job.gpu_trace, time_quanta_index) net_util = 0 if len(job.ntx_trace) and len(job.nrx_trace): net_tx = self.get_utilization(job.ntx_trace, time_quanta_index) net_rx = self.get_utilization(job.nrx_trace, time_quanta_index) net_tx = get_utilization(job.ntx_trace, time_quanta_index) net_rx = get_utilization(job.nrx_trace, time_quanta_index) net_util = network_utilization(net_tx, net_rx) net_utils.append(net_util) else: Loading Loading @@ -156,7 +156,6 @@ class Engine: self.power_manager.history.append((self.current_time, total_power_kw)) self.sys_power = total_power_kw self.power_manager.loss_history.append((self.current_time, total_loss_kw)) output_df = self.power_manager.get_power_df(rack_power, rack_loss) pflops = self.flops_manager.get_system_performance() / 1E15 gflop_per_watt = pflops * 1E6 / (total_power_kw * 1000) else: Loading Loading @@ -201,16 +200,6 @@ class Engine: return tick_data def get_utilization(self, trace, time_quanta_index): """Retrieve utilization value for a given trace at a specific time quanta index.""" if isinstance(trace, (list, np.ndarray)): return trace[time_quanta_index] elif isinstance(trace, (int, float)): return float(trace) else: raise TypeError(f"Invalid type for utilization: {type(trace)}.") def run_simulation(self, jobs, timesteps, autoshutdown=False): """Generator that yields after each simulation tick.""" self.timesteps = timesteps Loading
raps/power.py +3 −1 Original line number Diff line number Diff line Loading @@ -334,7 +334,9 @@ class PowerManager: num_rectifiers = num_rectifiers_array[i, j, k] power_per_rectifier = chassis_power[i, j, k] / num_rectifiers rectifier_power[i, j, k, :num_rectifiers] = power_per_rectifier power_with_losses[i, j, k, :num_rectifiers] = rectifier_loss(power_per_rectifier) power_with_losses[i, j, k, :num_rectifiers] = compute_loss(power_per_rectifier, \ self.config['RECTIFIER_LOSS_CONSTANT'], \ self.config['RECTIFIER_EFFICIENCY']) rectifier_power = np.nan_to_num(rectifier_power) power_with_losses = np.nan_to_num(power_with_losses) Loading
raps/resmgr.py +0 −7 Original line number Diff line number Diff line import numpy as np from .job import JobState from .utils import expand_ranges from scipy.stats import weibull_min Loading Loading @@ -59,15 +58,10 @@ class ResourceManager: scale_parameter = mtbf * 3600 # Convert to seconds # Create a NumPy array of node indices, excluding down nodes #print(self.down_nodes) #down_nodes = expand_ranges(self.down_nodes) #all_nodes = np.setdiff1d(np.arange(self.config['TOTAL_NODES']), np.array(self.down_nodes, dtype=int)) all_nodes = np.array(sorted(set(range(self.total_nodes)) - set(self.down_nodes))) # Sample the Weibull distribution for all nodes at once random_values = weibull_min.rvs(shape_parameter, scale=scale_parameter, size=all_nodes.size) failure_threshold = 0.1 failed_nodes = [node for node, r in zip(all_nodes, random_values) if r < failure_threshold] # Identify nodes that have failed failure_threshold = 0.1 Loading @@ -79,6 +73,5 @@ class ResourceManager: if node_index in self.available_nodes: self.available_nodes.remove(node_index) self.down_nodes.add(str(node_index)) self.power_manager.set_idle(node_index) return newly_downed_nodes.tolist()
raps/telemetry.py +0 −2 Original line number Diff line number Diff line Loading @@ -23,8 +23,6 @@ if __name__ == "__main__": import importlib import numpy as np import re from datetime import datetime from tqdm import tqdm from .config import ConfigManager Loading
raps/ui.py +0 −1 Original line number Diff line number Diff line import numpy as np import pandas as pd from rich.align import Align from rich.console import Console Loading