Commit 26f4c773 authored by Hines, Jesse's avatar Hines, Jesse
Browse files

Get marconi100 replay working

parent 5bcf07e4
Loading
Loading
Loading
Loading
+37 −0
Original line number Diff line number Diff line
import pandas as pd
import numpy as np
import sqlalchemy as sqla
from loguru import logger
from datetime import timedelta
@@ -93,7 +94,43 @@ def fetch_fugaku_data(sim_config: SimConfig, raps_config: dict):
    return jobs


def fetch_marconi100_data(sim_config: SimConfig, raps_config: dict):
    druid_engine = get_druid_engine()
    start, end = sim_config.start, sim_config.end

    tbl = get_table("svc-ts-exadigit-data-marconi100", druid_engine)
    query = (
        sqla.select(sqla.text("*"))
            .where(
                (tbl.c['__time'] <= to_timestamp(end)) &
                (tbl.c['__time'] >= to_timestamp(start - timedelta(days=3))) &
                (tbl.c.end_time >= to_timestamp(start))
            )
    )
    data = pd.read_sql(query, druid_engine,
        parse_dates=["submit_time", "start_time", "end_time", "eligible_time"],
    )
    
    def split_list(x):
        x = x.split(",") if x else []
        return np.array([int(x) for x in x])

    data['nodes'] = data['nodes'].map(split_list)
    data['node_power_consumption'] = data['node_power_consumption'].map(split_list)
    data['mem_power_consumption'] = data['mem_power_consumption'].map(split_list)
    data['cpu_power_consumption'] = data['cpu_power_consumption'].map(split_list)

    telemetry = Telemetry(system = "marconi100", config = raps_config)
    jobs = telemetry.load_data_from_df(data,
        min_time = start,
        reschedule = sim_config.scheduler.reschedule,
        config = raps_config,
    )
    return jobs


DATA_LOADERS = {
    "frontier": fetch_frontier_data,
    "fugaku": fetch_fugaku_data,
    "marconi100": fetch_marconi100_data,
}
 No newline at end of file