Commit 0e40c7ea authored by Maiterth, Matthias's avatar Maiterth, Matthias
Browse files

Merge branch 'tick-stats' into 'develop'

Add RunningStats helper

See merge request !128
parents 9652af75 481fb0a8
Loading
Loading
Loading
Loading
+81 −5
Original line number Diff line number Diff line
@@ -13,10 +13,9 @@ from .utils import sum_values, min_value, max_value, convert_seconds_to_hhmmss
from .engine import Engine


def get_engine_stats(engine: Engine, *, fast = False):
def get_engine_stats(engine: Engine):
    """
    Return engine statistics
    Setting `fast = False` excludes some stats that are more expensive to calculate.
    """
    timesteps = engine.current_timestep - engine.timestep_start
    num_samples = len(engine.power_manager.history) if engine.power_manager else 0
@@ -54,7 +53,7 @@ def get_engine_stats(engine: Engine, *, fast = False):
        else:
            stats['jobs_completed_percentage'] = 0

    if not fast and engine.node_occupancy_history:
    if engine.node_occupancy_history:
        # Calculate average concurrent jobs per node (average density across all nodes and timesteps)
        total_jobs_running_timesteps = 0
        max_concurrent_jobs_per_node = 0
@@ -361,9 +360,9 @@ def get_job_stats(engine: Engine):
    return job_stats


def get_stats(engine: Engine, *, fast = False):
def get_stats(engine: Engine):
    return {
        'engine': get_engine_stats(engine, fast = fast),
        'engine': get_engine_stats(engine),
        'job': get_job_stats(engine),
        'scheduler': get_scheduler_stats(engine),
        'network': get_network_stats(engine) if engine.simulate_network else {},
@@ -429,3 +428,80 @@ def get_gauge_limits(engine: Engine):
        'peak_power': peak_power,
        'g_flops_w_peak': gflops_per_watt_max
    }


class RunningStats:
    """
    Calculate a subset of the stats in as "running totals" for each engine tick. This is much more
    efficient than calling get_engine_stats() repeatedly.
    """
    # TODO: maybe should combine this and get_engine_stats logic?
    @staticmethod
    def _running_stats(engine: Engine):
        # Infinite generator used for the RunningStats logic
        def running_sum_values(values, last_value, last_index):
            return last_value + sum_values(values[last_index:])
        
        def running_min_value(values, last_value, last_index):
            if last_index < len(values):
                new_min = min_value(values[last_index:])
                rtrn = new_min if last_value is None else min(new_min, last_value)
            else:
                rtrn = last_value  # No change
            return rtrn

        def running_max_value(values, last_value, last_index):
            if last_index < len(values):
                new_max = max_value(values[last_index:])
                return new_max if last_value is None else max(new_max, last_value)
            else:
                return last_value  # No change

        last_power_index = 0
        power_sum = 0
        last_loss_index = 0
        loss_sum = 0
        loss_min = None
        loss_max = None

        while True:
            timesteps = engine.current_timestep - engine.timestep_start
            throughput = engine.jobs_completed / timesteps * 3600 if timesteps != 0 else 0  # Jobs per hour
            num_samples = len(engine.power_manager.history) if engine.power_manager else 0

            power_sum = running_sum_values(engine.power_manager.history, power_sum, last_power_index)
            average_power_mw = power_sum / num_samples / 1000 if num_samples else 0
            last_power_index = len(engine.power_manager.history)

            loss_sum = running_sum_values(engine.power_manager.loss_history, loss_sum, last_loss_index)
            average_loss_mw = loss_sum / num_samples / 1000 if num_samples else 0
            loss_min = running_min_value(engine.power_manager.loss_history, loss_min, last_loss_index)
            min_loss_mw = loss_min / 1000 if num_samples else 0
            loss_max = running_max_value(engine.power_manager.loss_history, loss_max, last_loss_index)
            max_loss_mw = loss_max / 1000 if num_samples else 0
            last_loss_index = len(engine.power_manager.loss_history)

            loss_fraction = average_loss_mw / average_power_mw if average_power_mw else 0
            efficiency = 1 - loss_fraction if loss_fraction else 0
            total_energy_consumed = average_power_mw * timesteps / 3600 if timesteps else 0  # MW-hr
            carbon_emissions = total_energy_consumed * 852.3 / 2204.6 / efficiency if efficiency else 0
            total_cost = total_energy_consumed * 1000 * engine.config.get('POWER_COST', 0)  # Total cost in dollars

            yield {
                "throughput": throughput,
                "num_samples": num_samples,
                "average_power": average_power_mw,
                "min_loss": min_loss_mw,
                "average_loss": average_loss_mw,
                "max_loss": max_loss_mw,
                "system_power_efficiency": efficiency * 100,
                "total_energy_consumed": total_energy_consumed,
                "carbon_emissions": carbon_emissions,
                "total_cost": total_cost,
            }

    def __init__(self, engine: Engine):
        self._gen = RunningStats._running_stats(engine)

    def get_stats(self) -> dict:
        return next(self._gen)
+28 −0
Original line number Diff line number Diff line
import pytest
from ..util import run_engine
from raps.engine import Engine
from raps.sim_config import SingleSimConfig
from raps.stats import get_engine_stats, get_job_stats, RunningStats

pytestmark = [
    pytest.mark.system,
@@ -18,3 +21,28 @@ def test_engine_basic(system, system_config, sim_output):

    assert stats['tick_count'] == 120
    assert stats['engine']['time_simulated'] == '0:02:00'


def test_engine_stats(system, system_config, sim_output):
    if not system_config.get("main", False):
        pytest.skip(f"{system} does not support basic main run.")

    engine = Engine(SingleSimConfig.model_validate({
        "system": system,
        "time": "2m",
    }))
    gen = engine.run_simulation()
    running_stats = RunningStats(engine)

    for tick in gen:
        stats = running_stats.get_stats()
    stats = running_stats.get_stats()

    final_stats = {
        **get_engine_stats(engine),
        **get_job_stats(engine),
    }

    # Confirm the running stats match up with the final stat computation
    for stat in stats.keys():
        assert pytest.approx(stats[stat]) == final_stats[stat], f"stat {stat}"