Commit 4f06747a authored by Hines, Jesse's avatar Hines, Jesse
Browse files

Improve IO performance

parent 36f07b37
Loading
Loading
Loading
Loading
+10 −10
Original line number Diff line number Diff line
@@ -51,18 +51,18 @@ async def lifespan(api: FastAPI):
    if settings.env == 'dev':
        kafka_admin = get_kafka_admin()
        existing_topics = set(kafka_admin.list_topics())
        topic_configs = {"compression.type": "snappy"}
        new_topics = [
            "svc-event-exadigit-sim",
            "svc-ts-exadigit-schedulersimsystem",
            "svc-event-exadigit-schedulersimjob",
            "svc-ts-exadigit-coolingsimcdu",
            "svc-ts-exadigit-coolingsimcep",
            "svc-ts-exadigit-jobpowerhistory",
            NewTopic("svc-event-exadigit-sim", 1, 1, topic_configs = topic_configs),
            NewTopic("svc-ts-exadigit-schedulersimsystem", 4, 1, topic_configs = topic_configs),
            NewTopic("svc-event-exadigit-schedulersimjob", 2, 1, topic_configs = topic_configs),
            NewTopic("svc-ts-exadigit-coolingsimcdu", 4, 1, topic_configs = topic_configs),
            NewTopic("svc-ts-exadigit-coolingsimcep", 2, 1, topic_configs = topic_configs),
            NewTopic("svc-ts-exadigit-jobpowerhistory", 4, 1, topic_configs = topic_configs),
        ]
        for topic in new_topics:
            if topic not in existing_topics:
                logger.info(f"Creating kafka topic {topic}")
                kafka_admin.create_topics([NewTopic(topic, 1, 1)])
        new_topics = [t for t in new_topics if t.name not in existing_topics]
        logger.info(f"Creating kafka topics {', '.join(t.name for t in new_topics)}")
        kafka_admin.create_topics(new_topics)

        druid_ingests_dir = Path(__file__).parent.parent.parent.resolve() / 'druid_ingests'
        ingests = [
+38 −23
Original line number Diff line number Diff line
""" A script to run the ExaDigiT simulation """
from typing import Callable
import argparse, os, json
from collections.abc import Iterable
import argparse, os, orjson
from pathlib import Path
from datetime import datetime, timezone
from loguru import logger
@@ -10,25 +10,28 @@ from .simulation import run_simulation
from ..util.kafka import get_kafka_producer


def write_sim(sim: Sim, writer: Callable[[str, bytes], None]):
def run_simulation_serialized(sim: Sim) -> Iterable[dict[str, list[bytes]]]:
    sim = sim.model_copy()

    def output_rows(topic, rows):
        for row in rows:
            value = json.dumps({"sim_id": sim.id, **row.model_dump(mode='json')}).encode()
            writer(topic, value)
    def serialize_rows(rows):
        return [
            orjson.dumps({"sim_id": sim.id, **row.model_dump(mode='json')})
            for row in rows
        ]

    logger.info(f"Starting simulation {sim.model_dump_json()}")
    logger.info(f"Starting simulation: {sim.model_dump_json(indent = 4)}")
    config = ServerSimConfig.model_validate(sim.config)
    progress_date = sim.start

    try:
        for data in run_simulation(config):
            output_rows("svc-ts-exadigit-schedulersimsystem", data.scheduler_sim_system)
            output_rows("svc-event-exadigit-schedulersimjob", data.scheduler_sim_jobs)
            output_rows("svc-ts-exadigit-coolingsimcdu", data.cooling_sim_cdus)
            output_rows("svc-ts-exadigit-coolingsimcep", data.cooling_sim_cep)
            output_rows("svc-ts-exadigit-jobpowerhistory", data.power_history)
            yield {
                "svc-ts-exadigit-schedulersimsystem": serialize_rows(data.scheduler_sim_system),
                "svc-event-exadigit-schedulersimjob": serialize_rows(data.scheduler_sim_jobs),
                "svc-ts-exadigit-coolingsimcdu": serialize_rows(data.cooling_sim_cdus),
                "svc-ts-exadigit-coolingsimcep": serialize_rows(data.cooling_sim_cep),
                "svc-ts-exadigit-jobpowerhistory": serialize_rows(data.power_history),
            }
            progress_date = data.timestamp
            if data.timestamp.second == 0:
                logger.info(f"progress: {data.timestamp.isoformat()} / {sim.end.isoformat()}")
@@ -37,33 +40,45 @@ def write_sim(sim: Sim, writer: Callable[[str, bytes], None]):
        sim.execution_end = datetime.now(timezone.utc)
        sim.error_messages = str(e)
        sim.progress_date = progress_date
        writer("svc-event-exadigit-sim", sim.serialize_for_druid())
        yield {"svc-event-exadigit-sim": [sim.serialize_for_druid()]}
        logger.info(f"Simulation {sim.id} failed")
        raise e
    
    sim.state = "success"
    sim.execution_end = datetime.now(timezone.utc)
    sim.progress_date = sim.end
    writer("svc-event-exadigit-sim", sim.serialize_for_druid())
    yield {"svc-event-exadigit-sim": [sim.serialize_for_druid()]}
    logger.info(f"Simulation {sim.id} finished")


def write_sim_to_kafka(sim: Sim):
    kafka_producer = get_kafka_producer()
    def writer(topic: str, value: bytes):
        kafka_producer.send(topic=topic, value=value)
    kafka_producer = get_kafka_producer(
        linger_ms = 2 * 1000,
        batch_size = 65536,
        compression_type = "snappy",
    )
    try:
        write_sim(sim, writer=writer)
        for data in run_simulation_serialized(sim):
            # kafka_producer does its own buffering of output so we don't need to worry about batching
            for topic, rows in data.items():
                for row in rows:
                    kafka_producer.send(topic=topic, value=row)
    finally:
        kafka_producer.close()


def write_sim_to_disk(sim: Sim, dest: str):
    Path(dest).mkdir(exist_ok=True)
    def writer(topic: str, value: bytes):
        with open(Path(dest) / f"{topic}.jsonl", 'ab') as f:
            f.write(value + b"\n")
    write_sim(sim, writer=writer)
    files = {}
    try:
        for data in run_simulation_serialized(sim):
            for topic, rows in data.items():
                if topic not in files:
                    files[topic] = open(Path(dest) / f"{topic}.jsonl", 'ab')
                files[topic].writelines(l + b"\n" for l in rows)
    finally:
        for file in files.values():
            file.close()


if __name__ == "__main__":
+72 −70
Original line number Diff line number Diff line
@@ -56,8 +56,9 @@ def run_simulation(sim_config: ServerSimConfig):
    # TODO: replay logic
    engine = Engine(sim_config)

    sample_system = int(timedelta(seconds = 1).total_seconds())
    sample_system = 1
    sample_power = snap_sample_rate(5, int(sim_config.time_delta.total_seconds()))
    sample_cooling = snap_sample_rate(5, int(sim_config.time_delta.total_seconds()))

    def offset_to_time(offset):
        if offset is not None:
@@ -71,6 +72,12 @@ def run_simulation(sim_config: ServerSimConfig):
    def parse_nodes(node_indexes: tuple[int]):
        return [engine.telemetry.node_index_to_name(i) for i in node_indexes]

    @functools.lru_cache(maxsize = 16384)
    def cdu_info(cdu_index: int):
        cdu_name = engine.telemetry.cdu_index_to_name(cdu_index)
        row, col = engine.telemetry.cdu_pos(cdu_index)
        return cdu_name, row, col

    # Keep record of how many power history steps we've emitted for each job
    power_history_counts: dict[int, int] = {}
    prev_job_hashes: set[str] = set()
@@ -86,30 +93,30 @@ def run_simulation(sim_config: ServerSimConfig):
            engine_stats = get_engine_stats(engine, fast = True)
            job_stats = get_job_stats(engine)

            scheduler_sim_system = [SchedulerSimSystem.model_validate(dict(
                timestamp = timestamp,
                down_nodes = down_nodes,
            scheduler_sim_system = [SchedulerSimSystem.model_validate({
                "timestamp": timestamp,
                "down_nodes": down_nodes,
                # TODO: Update sc.get_stats to return more easily parsable data
                num_samples = engine_stats['num_samples'],

                jobs_completed = job_stats['jobs_completed'],
                jobs_running = len(job_stats['jobs_still_running']),
                jobs_pending = len(job_stats['jobs_still_in_queue']),

                throughput = job_stats['throughput'],
                average_power = engine_stats['average_power'] * 1_000_000,
                min_loss = engine_stats['min_loss'] * 1_000_000,
                average_loss = engine_stats['average_loss'] * 1_000_000,
                max_loss = engine_stats['max_loss'] * 1_000_000,
                system_power_efficiency = engine_stats['system_power_efficiency'],
                total_energy_consumed = engine_stats['total_energy_consumed'],
                carbon_emissions = engine_stats['carbon_emissions'],
                total_cost = engine_stats['total_cost'],

                p_flops = tick.p_flops,
                g_flops_w = tick.g_flops_w,
                system_util = tick.system_util,
            ))]
                "num_samples": engine_stats['num_samples'],

                "jobs_completed": job_stats['jobs_completed'],
                "jobs_running": len(job_stats['jobs_still_running']),
                "jobs_pending": len(job_stats['jobs_still_in_queue']),

                "throughput": job_stats['throughput'],
                "average_power": engine_stats['average_power'] * 1_000_000,
                "min_loss": engine_stats['min_loss'] * 1_000_000,
                "average_loss": engine_stats['average_loss'] * 1_000_000,
                "max_loss": engine_stats['max_loss'] * 1_000_000,
                "system_power_efficiency": engine_stats['system_power_efficiency'],
                "total_energy_consumed": engine_stats['total_energy_consumed'],
                "carbon_emissions": engine_stats['carbon_emissions'],
                "total_cost": engine_stats['total_cost'],

                "p_flops": tick.p_flops,
                "g_flops_w": tick.g_flops_w,
                "system_util": tick.system_util,
            })]

        scheduler_sim_jobs: list[SchedulerSimJob] = []
        power_history: list[SchedulerSimJobPowerHistory] = []
@@ -169,7 +176,7 @@ def run_simulation(sim_config: ServerSimConfig):
                    "total_loss": point['Loss'],
                }

        if tick.fmu_outputs:
        if tick.fmu_outputs and (is_last_tick or unix_timestamp % sample_cooling == 0):
            # CDU columns are output in the dict with keys like this:
            # "simulator[1].datacenter[1].computeBlock[1].cdu[1].summary.m_flow_prim"
            # "simulator[1].datacenter[1].computeBlock[1].cdu[1].summary.V_flow_prim_GPM"
@@ -181,55 +188,50 @@ def run_simulation(sim_config: ServerSimConfig):
            cdus_data = fmu_data['simulator'][1]['datacenter'][1]['computeBlock']
            for cdu, cdu_data in cdus_data.items():
                cdu_data = cdu_data['cdu'][1]['summary']
                cooling_sim_cdu_map[cdu].update(
                    work_done_by_cdup = cdu_data['W_flow_CDUP_kW'],
                    rack_return_temp = cdu_data['T_sec_r_C'],
                    rack_supply_temp = cdu_data['T_sec_s_C'],
                    rack_supply_pressure = cdu_data['p_sec_s_psig'],
                    rack_return_pressure = cdu_data['p_sec_r_psig'],
                    rack_flowrate = cdu_data['V_flow_sec_GPM'],
                    facility_return_temp = cdu_data["T_prim_r_C"],
                    facility_supply_temp = cdu_data['T_prim_s_C'],
                    facility_supply_pressure = cdu_data['p_prim_s_psig'],
                    facility_return_pressure = cdu_data['p_prim_r_psig'],
                    facility_flowrate = cdu_data['V_flow_prim_GPM'],
                )
                cooling_sim_cdu_map[cdu] = {
                    **cooling_sim_cdu_map.get(cdu, {}),
                    "work_done_by_cdup": cdu_data['W_flow_CDUP_kW'],
                    "rack_return_temp": cdu_data['T_sec_r_C'],
                    "rack_supply_temp": cdu_data['T_sec_s_C'],
                    "rack_supply_pressure": cdu_data['p_sec_s_psig'],
                    "rack_return_pressure": cdu_data['p_sec_r_psig'],
                    "rack_flowrate": cdu_data['V_flow_sec_GPM'],
                    "facility_return_temp": cdu_data["T_prim_r_C"],
                    "facility_supply_temp": cdu_data['T_prim_s_C'],
                    "facility_supply_pressure": cdu_data['p_prim_s_psig'],
                    "facility_return_pressure": cdu_data['p_prim_r_psig'],
                    "facility_flowrate": cdu_data['V_flow_prim_GPM'],
                }

            cep_data = fmu_data['simulator'][1]['centralEnergyPlant'][1]
            cooling_sim_cep = [CoolingSimCEP.model_validate(dict(
                timestamp = timestamp,
                htw_flowrate = cep_data['hotWaterLoop'][1]['summary']['V_flow_htw_GPM'],
                ctw_flowrate = cep_data['coolingTowerLoop'][1]['summary']['V_flow_ctw_GPM'],
                htw_return_pressure = cep_data['hotWaterLoop'][1]['summary']['p_fac_htw_r_psig'],
                htw_supply_pressure = cep_data['hotWaterLoop'][1]['summary']['p_fac_htw_s_psig'],
                ctw_return_pressure = cep_data['coolingTowerLoop'][1]['summary']['p_fac_ctw_r_psig'],
                ctw_supply_pressure = cep_data['coolingTowerLoop'][1]['summary']['p_fac_ctw_s_psig'],
                htw_return_temp = cep_data['hotWaterLoop'][1]['summary']['T_fac_htw_r_C'],
                htw_supply_temp = cep_data['hotWaterLoop'][1]['summary']['T_fac_htw_s_C'],
                ctw_return_temp = cep_data['coolingTowerLoop'][1]['summary']['T_fac_ctw_r_C'],
                ctw_supply_temp = cep_data['coolingTowerLoop'][1]['summary']['T_fac_ctw_s_C'],
                power_consumption_htwps = cep_data['hotWaterLoop'][1]['summary']['W_flow_HTWP_kW'],
                power_consumption_ctwps = cep_data['coolingTowerLoop'][1]['summary']['W_flow_CTWP_kW'],
                power_consumption_fan = cep_data['coolingTowerLoop'][1]['summary']['W_flow_CT_kW'],
                htwp_speed = cep_data['hotWaterLoop'][1]['summary']['N_HTWP'],
                nctwps_staged = cep_data['coolingTowerLoop'][1]['summary']['n_CTWPs'],
                nhtwps_staged = cep_data['hotWaterLoop'][1]['summary']['n_HTWPs'],
                pue_output = fmu_data['pue'],
                nehxs_staged = cep_data['hotWaterLoop'][1]['summary']['n_EHXs'],
                ncts_staged = cep_data['coolingTowerLoop'][1]['summary']['n_CTs'],
                facility_return_temp = cep_data['hotWaterLoop'][1]['summary']['T_fac_htw_r_C'],
                cdu_loop_bypass_flowrate = fmu_data['simulator'][1]['datacenter'][1]['summary']['V_flow_bypass_GPM'],
            ))]
            cooling_sim_cep = [CoolingSimCEP.model_validate({
                "timestamp": timestamp,
                "htw_flowrate": cep_data['hotWaterLoop'][1]['summary']['V_flow_htw_GPM'],
                "ctw_flowrate": cep_data['coolingTowerLoop'][1]['summary']['V_flow_ctw_GPM'],
                "htw_return_pressure": cep_data['hotWaterLoop'][1]['summary']['p_fac_htw_r_psig'],
                "htw_supply_pressure": cep_data['hotWaterLoop'][1]['summary']['p_fac_htw_s_psig'],
                "ctw_return_pressure": cep_data['coolingTowerLoop'][1]['summary']['p_fac_ctw_r_psig'],
                "ctw_supply_pressure": cep_data['coolingTowerLoop'][1]['summary']['p_fac_ctw_s_psig'],
                "htw_return_temp": cep_data['hotWaterLoop'][1]['summary']['T_fac_htw_r_C'],
                "htw_supply_temp": cep_data['hotWaterLoop'][1]['summary']['T_fac_htw_s_C'],
                "ctw_return_temp": cep_data['coolingTowerLoop'][1]['summary']['T_fac_ctw_r_C'],
                "ctw_supply_temp": cep_data['coolingTowerLoop'][1]['summary']['T_fac_ctw_s_C'],
                "power_consumption_htwps": cep_data['hotWaterLoop'][1]['summary']['W_flow_HTWP_kW'],
                "power_consumption_ctwps": cep_data['coolingTowerLoop'][1]['summary']['W_flow_CTWP_kW'],
                "power_consumption_fan": cep_data['coolingTowerLoop'][1]['summary']['W_flow_CT_kW'],
                "htwp_speed": cep_data['hotWaterLoop'][1]['summary']['N_HTWP'],
                "nctwps_staged": cep_data['coolingTowerLoop'][1]['summary']['n_CTWPs'],
                "nhtwps_staged": cep_data['hotWaterLoop'][1]['summary']['n_HTWPs'],
                "pue_output": fmu_data['pue'],
                "nehxs_staged": cep_data['hotWaterLoop'][1]['summary']['n_EHXs'],
                "ncts_staged": cep_data['coolingTowerLoop'][1]['summary']['n_CTs'],
                "facility_return_temp": cep_data['hotWaterLoop'][1]['summary']['T_fac_htw_r_C'],
                "cdu_loop_bypass_flowrate": fmu_data['simulator'][1]['datacenter'][1]['summary']['V_flow_bypass_GPM'],
            })]

        for cdu_index, cdu_data in cooling_sim_cdu_map.items():
            cdu_name = engine.telemetry.cdu_index_to_name(cdu_index)
            row, col = engine.telemetry.cdu_pos(cdu_index)
            cdu_data.update(
                timestamp = timestamp,
                name = cdu_name,
                row = row,
                col = col,
            )
            cdu_name, row, col = cdu_info(cdu_index)
            cdu_data.update(timestamp = timestamp, name = cdu_name, row = row, col = col)
            cooling_sim_cdus.append(CoolingSimCDU.model_validate(cdu_data))

        yield SimTickOutput(