Commit 0c793187 authored by Hines, Jesse's avatar Hines, Jesse
Browse files

Support fugaku replay

parent c8df741d
Loading
Loading
Loading
Loading
+2 −0
Original line number Diff line number Diff line
class SimException(Exception):
    pass
+99 −0
Original line number Diff line number Diff line
import pandas as pd
import sqlalchemy as sqla
from loguru import logger
from datetime import timedelta
from .raps.raps.telemetry import Telemetry
from ..models.sim import SimConfig
from ..util.druid import get_druid_engine, get_table, to_timestamp
from ..util.es import get_nccs_cadence_engine
from . import SimException


def fetch_frontier_data(sim_config: SimConfig, raps_config: dict):
    """
    Fetch and parse real telemetry data
    """
    # TODO: Should consider using LVA API instead of directly querying the DB for this
    nccs_cadence_engine = get_nccs_cadence_engine()
    druid_engine = get_druid_engine()
    start, end = sim_config.start, sim_config.end

    job_query = sqla.text("""
        SELECT
            "allocation_id", "job_id", "slurm_version", "account", "group", "user", "name",
            "time_limit", "time_submission", "time_eligible", "time_start", "time_end", "time_elapsed",
            "node_count", xnames_str AS "xnames", "state_current", "state_reason",
            "time_snapshot"
        FROM "stf218.frontier.job-summary"
        WHERE
            (time_start IS NOT NULL AND time_start <= CONVERT(:end, TIMESTAMP)) AND
            (time_end IS NULL OR time_end > CONVERT(:start, TIMESTAMP))
    """).bindparams(
        start = start.isoformat(), end = end.isoformat(),
    )
    job_data = pd.read_sql_query(job_query, nccs_cadence_engine, parse_dates=[
        "time_snapshot", "time_submission", "time_eligible", "time_start", "time_end",
    ])
    # TODO: Even with sqlStringifyArrays: false, multivalue columns are returned as json strings.
    # And single rows are returned as raw strings. When we update Druid we can use ARRAYS and remove
    # this. Moving the jobs table to postgres would also fix this (and other issues).
    job_data['xnames'] = job_data['xnames'].map(lambda x: x.split(",") if x else [])

    job_profile_tbl = get_table("pub-ts-frontier-job-profile", druid_engine)
    job_profile_query = (
        sqla.select(
            job_profile_tbl.c['__time'].label("timestamp"),
            job_profile_tbl.c.allocation_id,
            job_profile_tbl.c.sum_cpu0_power,
            job_profile_tbl.c.sum_gpu_power,
        )
            .where(
                to_timestamp(start) <= job_profile_tbl.c['__time'],
                job_profile_tbl.c['__time'] < to_timestamp(end),
            )
    )
    job_profile_data = pd.read_sql(job_profile_query, druid_engine, parse_dates=[
        "timestamp",
    ])

    if (job_data.empty or job_profile_data.empty):
        raise SimException(f"No telemetry data for {start.isoformat()} -> {end.isoformat()}")

    telemetry = Telemetry(system = "frontier", config = raps_config)
    jobs = telemetry.load_data_from_df(job_data, job_profile_data,
        min_time = start,
        reschedule = sim_config.scheduler.reschedule,
        config = raps_config,
    )
    return jobs


def fetch_fugaku_data(sim_config: SimConfig, raps_config: dict):
    druid_engine = get_druid_engine()
    start, end = sim_config.start, sim_config.end

    tbl = get_table("svc-ts-exadigit-data-fugaku", druid_engine)
    query = (
        sqla.select(sqla.text("*"))
            .where(
                (tbl.c['__time'] <= to_timestamp(end)) &
                (tbl.c['__time'] >= to_timestamp(start - timedelta(days=3))) &
                (tbl.c.edt >= to_timestamp(start))
            )
    )
    data = pd.read_sql(query, druid_engine, parse_dates=[
        "adt", "qdt", "schedsdt", "deldt", "sdt", "edt",
    ])
    telemetry = Telemetry(system = "fugaku", config = raps_config)
    jobs = telemetry.load_data_from_df(data,
        min_time = start,
        reschedule = sim_config.scheduler.reschedule,
        config = raps_config,
    )
    return jobs


DATA_LOADERS = {
    "frontier": fetch_frontier_data,
    "fugaku": fetch_fugaku_data,
}
 No newline at end of file
+10 −72
Original line number Diff line number Diff line
from typing import NamedTuple
from datetime import datetime, timedelta
from pathlib import Path
import random, math, functools, json
import pandas as pd
import random, math, functools
import numpy as np
import sqlalchemy as sqla
from loguru import logger
from .raps.raps.config import ConfigManager
from .raps.raps.cooling import ThermoFluidsModel
@@ -18,16 +16,13 @@ from ..models.output import (
    JobStateEnum, SchedulerSimJob, SchedulerSimJobPowerHistory, SchedulerSimSystem, CoolingSimCDU,
    CoolingSimCEP,
)
from ..util.druid import get_druid_engine, get_table, to_timestamp
from ..util.es import get_nccs_cadence_engine
from ..util.misc import nest_dict
from . import SimException
from .dataloaders import DATA_LOADERS


PKG_PATH = Path(__file__).parent.parent.parent

class SimException(Exception):
    pass


def _offset_to_time(start, offset):
    if offset is not None:
@@ -45,66 +40,6 @@ class SimOutput(NamedTuple):
    power_history: list[SchedulerSimJobPowerHistory]



def fetch_frontier_telemetry_data(sim_config: SimConfig, raps_config: dict):
    """
    Fetch and parse real telemetry data
    """
    # TODO: Should consider using LVA API instead of directly querying the DB for this
    nccs_cadence_engine = get_nccs_cadence_engine()
    druid_engine = get_druid_engine()
    start, end = sim_config.start, sim_config.end

    job_query = sqla.text("""
        SELECT
            "allocation_id", "job_id", "slurm_version", "account", "group", "user", "name",
            "time_limit", "time_submission", "time_eligible", "time_start", "time_end", "time_elapsed",
            "node_count", xnames_str AS "xnames", "state_current", "state_reason",
            "time_snapshot"
        FROM "stf218.frontier.job-summary"
        WHERE
            (time_start IS NOT NULL AND time_start <= CONVERT(:end, TIMESTAMP)) AND
            (time_end IS NULL OR time_end > CONVERT(:start, TIMESTAMP))
    """).bindparams(
        start = start.isoformat(), end = end.isoformat(),
    )
    job_data = pd.read_sql_query(job_query, nccs_cadence_engine, parse_dates=[
        "time_snapshot", "time_submission", "time_eligible", "time_start", "time_end",
    ])
    # TODO: Even with sqlStringifyArrays: false, multivalue columns are returned as json strings.
    # And single rows are returned as raw strings. When we update Druid we can use ARRAYS and remove
    # this. Moving the jobs table to postgres would also fix this (and other issues).
    job_data['xnames'] = job_data['xnames'].map(lambda x: x.split(",") if x else [])

    job_profile_tbl = get_table("pub-ts-frontier-job-profile", druid_engine)
    job_profile_query = (
        sqla.select(
            job_profile_tbl.c['__time'].label("timestamp"),
            job_profile_tbl.c.allocation_id,
            job_profile_tbl.c.sum_cpu0_power,
            job_profile_tbl.c.sum_gpu_power,
        )
            .where(
                to_timestamp(start) <= job_profile_tbl.c['__time'],
                job_profile_tbl.c['__time'] < to_timestamp(end),
            )
    )
    job_profile_data = pd.read_sql(job_profile_query, druid_engine, parse_dates=[
        "timestamp",
    ])

    if (job_data.empty or job_profile_data.empty):
        raise SimException(f"No telemetry data for {start.isoformat()} -> {end.isoformat()}")

    telemetry = Telemetry(system = "frontier", config = raps_config)
    jobs = telemetry.load_data_from_df(job_data, job_profile_data,
        min_time = start,
        reschedule = sim_config.scheduler.reschedule,
        config = raps_config,
    )
    return jobs


def get_scheduler(
    system: SimSystem,
    down_nodes = [], cooling_enabled = False, replay = False,
@@ -182,10 +117,13 @@ def run_simulation(sim_config: SimConfig):
            workload = Workload(**sc.config)
            jobs = workload.test()
        elif sim_config.scheduler.jobs_mode == "replay":
            if sim_config.system != "frontier":
                raise SimException("Replay only supported for frontier")
            logger.info("Fetching telemetry data")
            jobs = fetch_frontier_telemetry_data(sim_config, sc.config)
            if sim_config.system not in DATA_LOADERS:
                raise SimException(f"Replay not supported for {sim_config.system}")
            logger.info("Fetching telemetry data...")
            jobs = DATA_LOADERS[sim_config.system](sim_config, sc.config)
            if len(jobs) == 0:
                raise SimException(f"No data for {sim_config.system} {sim_config.start.isoformat()} -> {sim_config.end.isoformat()}")
            logger.info(f"Fetched {len(jobs)} jobs")
        elif sim_config.scheduler.jobs_mode == "custom":
            raise SimException("Custom not supported")
        else: