Verified Commit 4028c261 authored by Hines, Jesse's avatar Hines, Jesse
Browse files

Add Frontier dataloader

Had to remove elasticsearch-dbapi to resolve a dependency conflict. elasticsearch package needed to be updated
to support numpy 2.0, but elasticsearch-dbapi only supported elasticsearch<7.14
parent 180a0794
Loading
Loading
Loading
Loading
+2 −2
Original line number Diff line number Diff line
@@ -25,8 +25,8 @@ dependencies = [
    "uvicorn==0.35.0",
    "sqlparse==0.5.3",
    "kubernetes==33.1.0",
    "elasticsearch==7.13.4",
    "elasticsearch-dbapi==0.2.11",
    "tenacity==9.1.2",
    "elasticsearch==7.17.12",
    "requests==2.32.5",
    "orjson==3.11.3",
    "confluent_kafka==2.11.1",
+67 −0
Original line number Diff line number Diff line
from ...util.druid import get_druid_engine, get_table, to_timestamp
from ...util.dataloader import query_time_range
from ...util.es import get_nccs_cadence_es, es_sql_query
from ...models.sim import ServerSimConfig
from .. import SimException
import sqlalchemy as sqla
import pandas as pd

# Re-use these from the raps dataloader
from raps.dataloaders.frontier import load_data_from_df, node_index_to_name, cdu_index_to_name, cdu_pos


def load_data(_paths, **kwargs):
    # TODO: Should consider using LVA API instead of directly querying the DB for this
    druid_engine = get_druid_engine()
    es = get_nccs_cadence_es()

    sim_config: ServerSimConfig = kwargs['sim_config']
    start, end = sim_config.start, sim_config.end

    job_query = """
        SELECT
            "allocation_id", "job_id", "slurm_version", "account", "group", "user", "name",
            "time_limit", "time_submission", "time_eligible", "time_start", "time_end", "time_elapsed",
            "node_count", xnames_str AS "xnames", "state_current", "state_reason",
            "time_snapshot"
        FROM "stf218.frontier.job-summary"
        WHERE
            (time_end IS NULL OR time_end > CONVERT(?, TIMESTAMP)) AND
            (time_start IS NOT NULL AND time_start <= CONVERT(?, TIMESTAMP))
    """
    job_query_params = [start.isoformat(), end.isoformat()]
    job_data = es_sql_query(es, job_query, job_query_params, fetch_size=500)

    job_df = pd.DataFrame(job_data)
    job_df['time_snapshot'] = pd.to_datetime(job_df['time_snapshot'])
    job_df["time_submission"] = pd.to_datetime(job_df["time_submission"])
    job_df["time_eligible"] = pd.to_datetime(job_df["time_eligible"])
    job_df["time_start"] = pd.to_datetime(job_df["time_start"])
    job_df["time_end"] = pd.to_datetime(job_df["time_end"])
    job_df['xnames'] = job_df['xnames'].map(lambda x: x.split(",") if x else [])

    job_profile_tbl = get_table("pub-ts-frontier-job-profile", druid_engine)
    job_profile_query = (
        sqla.select(
            job_profile_tbl.c['__time'].label("timestamp"),
            job_profile_tbl.c.allocation_id,
            job_profile_tbl.c.sum_cpu0_power,
            job_profile_tbl.c.sum_gpu_power,
        )
            .where(
                to_timestamp(start) <= job_profile_tbl.c['__time'],
                job_profile_tbl.c['__time'] < to_timestamp(end),
            )
    )
    job_profile_df = pd.read_sql(job_profile_query, druid_engine, parse_dates=[
        "timestamp",
    ])

    from loguru import logger
    logger.info(f"job_df {job_df}")
    logger.info(f"job_profile_df {job_profile_df}")

    if (job_df.empty or job_profile_df.empty):
        raise SimException(f"No telemetry data for {start.isoformat()} -> {end.isoformat()}")

    return load_data_from_df(job_df, job_profile_df, **kwargs)
+45 −37
Original line number Diff line number Diff line
"""
Connection to Cadence ES
"""
import os, json
import urllib.parse
from datetime import datetime
import sqlalchemy as sqla
from sqlalchemy.engine import Engine, create_engine
import os
from elasticsearch import Elasticsearch
from es.elastic.sqlalchemy import ESDialect
import tenacity


def get_nccs_cadence_engine(**kwargs) -> Engine:
    import sqlalchemy.types as types
    from sqlalchemy.ext.compiler import compiles

    # For some reason sqla/pydruid renders `cast(col, sqla.TIMESTAMP)` to `CAST(col AS LONG)`. This
    # is a manual override to make sqla render them properly.
    cast_fixes = {
        types.TIMESTAMP: "TIMESTAMP",
    }

    for (sqla_type, override) in cast_fixes.items():
        compiles(sqla_type, "elasticsearch")(lambda type_, compiler, override=override, **kw: override)

    # We need to set retry_on_status to work around intermittent 401 errors from Cadence ES.
    # The query params will get passed to the Elasticsearch client, but only some specific
    # ones get parsed and the rest are left as strings. This monkey patch hacks elasticsearch-dbapi
    # to parse retry_on_status. We can remove this if the AM team fixes the auth errors
    import es.basesqlalchemy
    es.basesqlalchemy.BaseESDialect._map_parse_connection_parameters['retry_on_status'] = json.loads

    URL = urllib.parse.urlparse(os.environ["NCCS_CADENCE_URL"])
    HOST, PORT = URL.netloc.split(":")
def get_nccs_cadence_es():
    URL = os.environ["NCCS_CADENCE_URL"]
    USER = os.environ["NCCS_CADENCE_USER"]
    PASSWORD = os.environ["NCCS_CADENCE_PASSWORD"]
    # These get passed through to the internal Elasticsearch instance
    QUERY_PARAMS = 'use_ssl=false&ssl_show_warn=false&verify_certs=false&retry_on_status=[502,503,504,401]'
    return Elasticsearch(
        URL,
        http_auth=(USER, PASSWORD),
        # TODO: we need to fix the self-signed certs on ES
        use_ssl=False,
        ssl_show_warn=False,
        verify_certs=False,
    )

    engine = create_engine(f'elasticsearch+{URL.scheme}://{USER}:{PASSWORD}@{HOST}:{PORT}{URL.path}?{QUERY_PARAMS}', **kwargs)
    return engine


def to_timestamp(val: datetime):
    return sqla.func.convert(val.isoformat(), sqla.literal_column('TIMESTAMP'))
def es_sql_query(client: Elasticsearch, query: str, params: list = [], fetch_size = 100):
    """
    Runs an SQL query against ES. Use `?` format for SQL params.
    """
    # Cadence ES is a bit flaky with intermittent 401 errors
    @tenacity.retry(
        stop = tenacity.stop_after_attempt(5),
        wait = tenacity.wait_exponential(multiplier=0.5, min=1, max=30),
        reraise = True,
    )
    def _retry_query(query, params, cursor = None):
        body = {
            "query": query,
            "params": params,
            "fetch_size": fetch_size,
        }
        if cursor:
            body["cursor"] = cursor
        return client.sql.query(format = 'json', body = body)

    response = _retry_query(query, params)
    rows = response['rows']
    cursor = response.get("cursor")
    columns = [c['name'] for c in response['columns']]
    while cursor:
        response = _retry_query(query, params, cursor)
        rows.extend(response['rows'])
        cursor = response.get("cursor")

    rows = [dict(zip(columns, row)) for row in rows]
    return rows