Loading simulation_server/simulation/dataloaders.py +29 −25 Original line number Diff line number Diff line Loading @@ -2,7 +2,7 @@ import pandas as pd import numpy as np import sqlalchemy as sqla from loguru import logger from datetime import timedelta from datetime import datetime, timedelta from .raps.raps.telemetry import Telemetry from ..models.sim import SimConfig from ..util.druid import get_druid_engine, get_table, to_timestamp Loading Loading @@ -69,22 +69,37 @@ def fetch_frontier_data(sim_config: SimConfig, raps_config: dict): return jobs def fetch_fugaku_data(sim_config: SimConfig, raps_config: dict): druid_engine = get_druid_engine() start, end = sim_config.start, sim_config.end tbl = get_table("svc-ts-exadigit-data-fugaku", druid_engine) def query_time_range( tbl_name: str, start: datetime, end: datetime, end_col: str, *, druid_engine, parse_dates: list[str], ): tbl = get_table(tbl_name, druid_engine) query = ( sqla.select(sqla.text("*")) .where( (tbl.c['__time'] <= to_timestamp(end)) & (tbl.c['__time'] >= to_timestamp(start - timedelta(days=3))) & (tbl.c.edt >= to_timestamp(start)) (tbl.c[end_col] >= to_timestamp(start)) ) ) data = pd.read_sql(query, druid_engine, parse_dates=[ "adt", "qdt", "schedsdt", "deldt", "sdt", "edt", ]) data = pd.read_sql(query, druid_engine, parse_dates=parse_dates) return data def split_list(x): x = x.split(",") if x else [] return np.array([int(x) for x in x]) def fetch_fugaku_data(sim_config: SimConfig, raps_config: dict): druid_engine = get_druid_engine() start, end = sim_config.start, sim_config.end data = query_time_range( "svc-ts-exadigit-data-fugaku", start, end, 'edt', druid_engine = druid_engine, parse_dates = ["adt", "qdt", "schedsdt", "deldt", "sdt", "edt"], ) telemetry = Telemetry(system = "fugaku", config = raps_config) jobs = telemetry.load_data_from_df(data, min_time = start, Loading @@ -98,23 +113,12 @@ def fetch_marconi100_data(sim_config: SimConfig, raps_config: dict): druid_engine = get_druid_engine() start, end = sim_config.start, sim_config.end tbl = get_table("svc-ts-exadigit-data-marconi100", druid_engine) query = ( sqla.select(sqla.text("*")) .where( (tbl.c['__time'] <= to_timestamp(end)) & (tbl.c['__time'] >= to_timestamp(start - timedelta(days=3))) & (tbl.c.end_time >= to_timestamp(start)) ) ) data = pd.read_sql(query, druid_engine, data = query_time_range( "svc-ts-exadigit-data-marconi100", start, end, 'end_time', druid_engine = druid_engine, parse_dates = ["submit_time", "start_time", "end_time", "eligible_time"], ) def split_list(x): x = x.split(",") if x else [] return np.array([int(x) for x in x]) data['nodes'] = data['nodes'].map(split_list) data['node_power_consumption'] = data['node_power_consumption'].map(split_list) data['mem_power_consumption'] = data['mem_power_consumption'].map(split_list) Loading Loading
simulation_server/simulation/dataloaders.py +29 −25 Original line number Diff line number Diff line Loading @@ -2,7 +2,7 @@ import pandas as pd import numpy as np import sqlalchemy as sqla from loguru import logger from datetime import timedelta from datetime import datetime, timedelta from .raps.raps.telemetry import Telemetry from ..models.sim import SimConfig from ..util.druid import get_druid_engine, get_table, to_timestamp Loading Loading @@ -69,22 +69,37 @@ def fetch_frontier_data(sim_config: SimConfig, raps_config: dict): return jobs def fetch_fugaku_data(sim_config: SimConfig, raps_config: dict): druid_engine = get_druid_engine() start, end = sim_config.start, sim_config.end tbl = get_table("svc-ts-exadigit-data-fugaku", druid_engine) def query_time_range( tbl_name: str, start: datetime, end: datetime, end_col: str, *, druid_engine, parse_dates: list[str], ): tbl = get_table(tbl_name, druid_engine) query = ( sqla.select(sqla.text("*")) .where( (tbl.c['__time'] <= to_timestamp(end)) & (tbl.c['__time'] >= to_timestamp(start - timedelta(days=3))) & (tbl.c.edt >= to_timestamp(start)) (tbl.c[end_col] >= to_timestamp(start)) ) ) data = pd.read_sql(query, druid_engine, parse_dates=[ "adt", "qdt", "schedsdt", "deldt", "sdt", "edt", ]) data = pd.read_sql(query, druid_engine, parse_dates=parse_dates) return data def split_list(x): x = x.split(",") if x else [] return np.array([int(x) for x in x]) def fetch_fugaku_data(sim_config: SimConfig, raps_config: dict): druid_engine = get_druid_engine() start, end = sim_config.start, sim_config.end data = query_time_range( "svc-ts-exadigit-data-fugaku", start, end, 'edt', druid_engine = druid_engine, parse_dates = ["adt", "qdt", "schedsdt", "deldt", "sdt", "edt"], ) telemetry = Telemetry(system = "fugaku", config = raps_config) jobs = telemetry.load_data_from_df(data, min_time = start, Loading @@ -98,23 +113,12 @@ def fetch_marconi100_data(sim_config: SimConfig, raps_config: dict): druid_engine = get_druid_engine() start, end = sim_config.start, sim_config.end tbl = get_table("svc-ts-exadigit-data-marconi100", druid_engine) query = ( sqla.select(sqla.text("*")) .where( (tbl.c['__time'] <= to_timestamp(end)) & (tbl.c['__time'] >= to_timestamp(start - timedelta(days=3))) & (tbl.c.end_time >= to_timestamp(start)) ) ) data = pd.read_sql(query, druid_engine, data = query_time_range( "svc-ts-exadigit-data-marconi100", start, end, 'end_time', druid_engine = druid_engine, parse_dates = ["submit_time", "start_time", "end_time", "eligible_time"], ) def split_list(x): x = x.split(",") if x else [] return np.array([int(x) for x in x]) data['nodes'] = data['nodes'].map(split_list) data['node_power_consumption'] = data['node_power_consumption'].map(split_list) data['mem_power_consumption'] = data['mem_power_consumption'].map(split_list) Loading