Loading raps/stats.py +81 −5 Original line number Diff line number Diff line Loading @@ -13,10 +13,9 @@ from .utils import sum_values, min_value, max_value, convert_seconds_to_hhmmss from .engine import Engine def get_engine_stats(engine: Engine, *, fast = False): def get_engine_stats(engine: Engine): """ Return engine statistics Setting `fast = False` excludes some stats that are more expensive to calculate. """ timesteps = engine.current_timestep - engine.timestep_start num_samples = len(engine.power_manager.history) if engine.power_manager else 0 Loading Loading @@ -54,7 +53,7 @@ def get_engine_stats(engine: Engine, *, fast = False): else: stats['jobs_completed_percentage'] = 0 if not fast and engine.node_occupancy_history: if engine.node_occupancy_history: # Calculate average concurrent jobs per node (average density across all nodes and timesteps) total_jobs_running_timesteps = 0 max_concurrent_jobs_per_node = 0 Loading Loading @@ -361,9 +360,9 @@ def get_job_stats(engine: Engine): return job_stats def get_stats(engine: Engine, *, fast = False): def get_stats(engine: Engine): return { 'engine': get_engine_stats(engine, fast = fast), 'engine': get_engine_stats(engine), 'job': get_job_stats(engine), 'scheduler': get_scheduler_stats(engine), 'network': get_network_stats(engine) if engine.simulate_network else {}, Loading Loading @@ -429,3 +428,80 @@ def get_gauge_limits(engine: Engine): 'peak_power': peak_power, 'g_flops_w_peak': gflops_per_watt_max } class RunningStats: """ Calculate a subset of the stats in as "running totals" for each engine tick. This is much more efficient than calling get_engine_stats() repeatedly. """ # TODO: maybe should combine this and get_engine_stats logic? @staticmethod def _running_stats(engine: Engine): # Infinite generator used for the RunningStats logic def running_sum_values(values, last_value, last_index): return last_value + sum_values(values[last_index:]) def running_min_value(values, last_value, last_index): if last_index < len(values): new_min = min_value(values[last_index:]) rtrn = new_min if last_value is None else min(new_min, last_value) else: rtrn = last_value # No change return rtrn def running_max_value(values, last_value, last_index): if last_index < len(values): new_max = max_value(values[last_index:]) return new_max if last_value is None else max(new_max, last_value) else: return last_value # No change last_power_index = 0 power_sum = 0 last_loss_index = 0 loss_sum = 0 loss_min = None loss_max = None while True: timesteps = engine.current_timestep - engine.timestep_start throughput = engine.jobs_completed / timesteps * 3600 if timesteps != 0 else 0 # Jobs per hour num_samples = len(engine.power_manager.history) if engine.power_manager else 0 power_sum = running_sum_values(engine.power_manager.history, power_sum, last_power_index) average_power_mw = power_sum / num_samples / 1000 if num_samples else 0 last_power_index = len(engine.power_manager.history) loss_sum = running_sum_values(engine.power_manager.loss_history, loss_sum, last_loss_index) average_loss_mw = loss_sum / num_samples / 1000 if num_samples else 0 loss_min = running_min_value(engine.power_manager.loss_history, loss_min, last_loss_index) min_loss_mw = loss_min / 1000 if num_samples else 0 loss_max = running_max_value(engine.power_manager.loss_history, loss_max, last_loss_index) max_loss_mw = loss_max / 1000 if num_samples else 0 last_loss_index = len(engine.power_manager.loss_history) loss_fraction = average_loss_mw / average_power_mw if average_power_mw else 0 efficiency = 1 - loss_fraction if loss_fraction else 0 total_energy_consumed = average_power_mw * timesteps / 3600 if timesteps else 0 # MW-hr carbon_emissions = total_energy_consumed * 852.3 / 2204.6 / efficiency if efficiency else 0 total_cost = total_energy_consumed * 1000 * engine.config.get('POWER_COST', 0) # Total cost in dollars yield { "throughput": throughput, "num_samples": num_samples, "average_power": average_power_mw, "min_loss": min_loss_mw, "average_loss": average_loss_mw, "max_loss": max_loss_mw, "system_power_efficiency": efficiency * 100, "total_energy_consumed": total_energy_consumed, "carbon_emissions": carbon_emissions, "total_cost": total_cost, } def __init__(self, engine: Engine): self._gen = RunningStats._running_stats(engine) def get_stats(self) -> dict: return next(self._gen) tests/systems/test_engine_basic.py +28 −0 Original line number Diff line number Diff line import pytest from ..util import run_engine from raps.engine import Engine from raps.sim_config import SingleSimConfig from raps.stats import get_engine_stats, get_job_stats, RunningStats pytestmark = [ pytest.mark.system, Loading @@ -18,3 +21,28 @@ def test_engine_basic(system, system_config, sim_output): assert stats['tick_count'] == 120 assert stats['engine']['time_simulated'] == '0:02:00' def test_engine_stats(system, system_config, sim_output): if not system_config.get("main", False): pytest.skip(f"{system} does not support basic main run.") engine = Engine(SingleSimConfig.model_validate({ "system": system, "time": "2m", })) gen = engine.run_simulation() running_stats = RunningStats(engine) for tick in gen: stats = running_stats.get_stats() stats = running_stats.get_stats() final_stats = { **get_engine_stats(engine), **get_job_stats(engine), } # Confirm the running stats match up with the final stat computation for stat in stats.keys(): assert pytest.approx(stats[stat]) == final_stats[stat], f"stat {stat}" Loading
raps/stats.py +81 −5 Original line number Diff line number Diff line Loading @@ -13,10 +13,9 @@ from .utils import sum_values, min_value, max_value, convert_seconds_to_hhmmss from .engine import Engine def get_engine_stats(engine: Engine, *, fast = False): def get_engine_stats(engine: Engine): """ Return engine statistics Setting `fast = False` excludes some stats that are more expensive to calculate. """ timesteps = engine.current_timestep - engine.timestep_start num_samples = len(engine.power_manager.history) if engine.power_manager else 0 Loading Loading @@ -54,7 +53,7 @@ def get_engine_stats(engine: Engine, *, fast = False): else: stats['jobs_completed_percentage'] = 0 if not fast and engine.node_occupancy_history: if engine.node_occupancy_history: # Calculate average concurrent jobs per node (average density across all nodes and timesteps) total_jobs_running_timesteps = 0 max_concurrent_jobs_per_node = 0 Loading Loading @@ -361,9 +360,9 @@ def get_job_stats(engine: Engine): return job_stats def get_stats(engine: Engine, *, fast = False): def get_stats(engine: Engine): return { 'engine': get_engine_stats(engine, fast = fast), 'engine': get_engine_stats(engine), 'job': get_job_stats(engine), 'scheduler': get_scheduler_stats(engine), 'network': get_network_stats(engine) if engine.simulate_network else {}, Loading Loading @@ -429,3 +428,80 @@ def get_gauge_limits(engine: Engine): 'peak_power': peak_power, 'g_flops_w_peak': gflops_per_watt_max } class RunningStats: """ Calculate a subset of the stats in as "running totals" for each engine tick. This is much more efficient than calling get_engine_stats() repeatedly. """ # TODO: maybe should combine this and get_engine_stats logic? @staticmethod def _running_stats(engine: Engine): # Infinite generator used for the RunningStats logic def running_sum_values(values, last_value, last_index): return last_value + sum_values(values[last_index:]) def running_min_value(values, last_value, last_index): if last_index < len(values): new_min = min_value(values[last_index:]) rtrn = new_min if last_value is None else min(new_min, last_value) else: rtrn = last_value # No change return rtrn def running_max_value(values, last_value, last_index): if last_index < len(values): new_max = max_value(values[last_index:]) return new_max if last_value is None else max(new_max, last_value) else: return last_value # No change last_power_index = 0 power_sum = 0 last_loss_index = 0 loss_sum = 0 loss_min = None loss_max = None while True: timesteps = engine.current_timestep - engine.timestep_start throughput = engine.jobs_completed / timesteps * 3600 if timesteps != 0 else 0 # Jobs per hour num_samples = len(engine.power_manager.history) if engine.power_manager else 0 power_sum = running_sum_values(engine.power_manager.history, power_sum, last_power_index) average_power_mw = power_sum / num_samples / 1000 if num_samples else 0 last_power_index = len(engine.power_manager.history) loss_sum = running_sum_values(engine.power_manager.loss_history, loss_sum, last_loss_index) average_loss_mw = loss_sum / num_samples / 1000 if num_samples else 0 loss_min = running_min_value(engine.power_manager.loss_history, loss_min, last_loss_index) min_loss_mw = loss_min / 1000 if num_samples else 0 loss_max = running_max_value(engine.power_manager.loss_history, loss_max, last_loss_index) max_loss_mw = loss_max / 1000 if num_samples else 0 last_loss_index = len(engine.power_manager.loss_history) loss_fraction = average_loss_mw / average_power_mw if average_power_mw else 0 efficiency = 1 - loss_fraction if loss_fraction else 0 total_energy_consumed = average_power_mw * timesteps / 3600 if timesteps else 0 # MW-hr carbon_emissions = total_energy_consumed * 852.3 / 2204.6 / efficiency if efficiency else 0 total_cost = total_energy_consumed * 1000 * engine.config.get('POWER_COST', 0) # Total cost in dollars yield { "throughput": throughput, "num_samples": num_samples, "average_power": average_power_mw, "min_loss": min_loss_mw, "average_loss": average_loss_mw, "max_loss": max_loss_mw, "system_power_efficiency": efficiency * 100, "total_energy_consumed": total_energy_consumed, "carbon_emissions": carbon_emissions, "total_cost": total_cost, } def __init__(self, engine: Engine): self._gen = RunningStats._running_stats(engine) def get_stats(self) -> dict: return next(self._gen)
tests/systems/test_engine_basic.py +28 −0 Original line number Diff line number Diff line import pytest from ..util import run_engine from raps.engine import Engine from raps.sim_config import SingleSimConfig from raps.stats import get_engine_stats, get_job_stats, RunningStats pytestmark = [ pytest.mark.system, Loading @@ -18,3 +21,28 @@ def test_engine_basic(system, system_config, sim_output): assert stats['tick_count'] == 120 assert stats['engine']['time_simulated'] == '0:02:00' def test_engine_stats(system, system_config, sim_output): if not system_config.get("main", False): pytest.skip(f"{system} does not support basic main run.") engine = Engine(SingleSimConfig.model_validate({ "system": system, "time": "2m", })) gen = engine.run_simulation() running_stats = RunningStats(engine) for tick in gen: stats = running_stats.get_stats() stats = running_stats.get_stats() final_stats = { **get_engine_stats(engine), **get_job_stats(engine), } # Confirm the running stats match up with the final stat computation for stat in stats.keys(): assert pytest.approx(stats[stat]) == final_stats[stat], f"stat {stat}"