Commit 0bcabe2a authored by Maiterth, Matthias's avatar Maiterth, Matthias
Browse files

Merge branch 'refactor-engine' into 'develop'

Refactoring to engine and entrypoint script

See merge request !106
parents f1db077a 0734cf87
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
[flake8]
exclude = .git, __pycache__, venv*, simulation_results, third_party, models
exclude = .git, __pycache__, venv*, simulation_results, third_party, models, .venv
max-line-length = 120
+1 −0
Original line number Diff line number Diff line
@@ -5,3 +5,4 @@ venv
*.npz
*.prof
simulation_results/
models/*.fmu
+23 −29
Original line number Diff line number Diff line
@@ -19,37 +19,37 @@ Note: Requires python3.12 or greater.

## Usage and help menu

    python main.py -h
    raps run -h

## Run simulator with default synthetic workload

    python main.py
    raps run

## Run simulator with telemetry replay

    # Frontier
    DATEDIR="date=2024-01-18"
    DPATH=~/data/frontier-sample-2024-01-18
    python main.py -f $DPATH/slurm/joblive/$DATEDIR,$DPATH/jobprofile/$DATEDIR
    raps run -f $DPATH/slurm/joblive/$DATEDIR,$DPATH/jobprofile/$DATEDIR

## Open Telemetry dataset

For Marconi supercomputer, download `job_table.parquet` from https://zenodo.org/records/10127767

    # Marconi100
    python main.py --system marconi100 -f ~/data/marconi100/job_table.parquet
    raps run --system marconi100 -f ~/data/marconi100/job_table.parquet

For Adastra MI250 supercomputer, download 'AdastaJobsMI250_15days.parquet' from https://zenodo.org/records/14007065

    # Adastra MI250
    python main.py --system adastraMI250 -f AdastaJobsMI250_15days.parquet
    raps run --system adastraMI250 -f AdastaJobsMI250_15days.parquet

For Google cluster trace v2

    python main.py --system gcloudv2 -f ~/data/gcloud/v2/google_cluster_data_2011_sample --ff 600
    raps run --system gcloudv2 -f ~/data/gcloud/v2/google_cluster_data_2011_sample --ff 600

    # analyze dataset
    python -m raps.telemetry --system gcloudv2 -f ~/data/gcloud/v2/google_cluster_data_2011_sample -v
    raps telemetry --system gcloudv2 -f ~/data/gcloud/v2/google_cluster_data_2011_sample -v

For MIT Supercloud

@@ -62,28 +62,28 @@ For MIT Supercloud
    python -m raps.dataloaders.mit_supercloud.cli download --start 2021-05-21T13:00 --end 2021-05-21T14:00

    # Load data and run simulation - will save data as part-cpu.npz and part-gpu.npz files
    python multi-part-sim.py -x 'mit_supercloud/*' -f $DPATH --system mit_supercloud \
    raps run-multi-part -x 'mit_supercloud/*' -f $DPATH --system mit_supercloud \
                             --start 2021-05-21T13:00 --end 2021-05-21T14:00
    # Note: if no start, end dates provided will default to run 24 hours between
    # 2021-05-21T00:00 to 2021-05-22T00:00 set by defaults in raps/dataloaders/mit_supercloud/utils.py

    # Re-run simulation using npz files (much faster load)
    python multi-part-sim.py -x mit_supercloud/* -f part-*.npz --system mit_supercloud
    raps run-multi-part -x mit_supercloud/* -f part-*.npz --system mit_supercloud

    # Synthetic tests for verification studies:
    python multi-part-sim.py -x 'mit_supercloud/*' -w multitenant
    raps run-multi-part -x 'mit_supercloud/*' -w multitenant

For Lumi

    # Synthetic test for lumi multi-part-sim:
    python multi-part-sim.py -x lumi/*
    raps run-multi-part -x lumi/*

## Perform Network Simulation

Lassen is one of the few datasets that has networking data. See `raps/dataloaders/lassen.py` for how to
get the datasets. To run a network simulation, use the following command:

    python main.py -f ~/data/lassen/Lassen-Supercomputer-Job-Dataset --system lassen --policy fcfs --backfill firstfit --ff 365d -t 12h --arrival poisson --net
    raps run -f ~/data/lassen/Lassen-Supercomputer-Job-Dataset --system lassen --policy fcfs --backfill firstfit --ff 365d -t 12h --arrival poisson --net

## Snapshot of extracted workload data

@@ -91,7 +91,7 @@ To reduce the expense of extracting the needed data from the telemetry parquet f
RAPS saves a snapshot of the extracted data in NPZ format. The NPZ file can be
given instead of the parquet files for more quickly running subsequent simulations, e.g.:

    python main.py -f jobs_2024-02-20_12-20-39.npz
    raps run -f jobs_2024-02-20_12-20-39.npz


## Cooling models
@@ -103,37 +103,31 @@ We provide several cooling models in the repo https://code.ornl.gov/exadigit/POW
Will install the POWER9CSM in the models folder. To activate cooling when running RAPS,
use `--cooling` or `-c` argument. e.g.,

    python main.py --system marconi100 -c
    raps run --system marconi100 -c

    python main.py --system lassen -c
    raps run --system lassen -c

    python main.py --system summit -c
    raps run --system summit -c

## Support for multiple system partitions

Multi-partition systems are supported by running the `multi-part-sim.py` script, where a list of configurations can be specified using the `-x` flag as follows:

    python multi-part-sim.py -x setonix/part-cpu setonix/part-gpu
    raps run-multi-part -x setonix/part-cpu setonix/part-gpu

or simply:

    python multi-part-sim.py -x setonix/* # bash
    raps run-multi-part -x setonix/* # bash

    python multi-part-sim.py -x 'setonix/*' # zsh

To run this in parallel use:

    mpiexec -n 2 python multi-part-sim-mpi.py -x setonix/part-cpu setonix/part-gpu

*Note: first install `mpi4py` via pip or conda.*
    raps run-multi-part -x 'setonix/*' # zsh

This will simulate synthetic workloads on two partitions as defined in `config/setonix-cpu` and `config/setonix-gpu`. To replay telemetry workloads from another system, e.g., Marconi100's PM100 dataset, first create a .npz snapshot of the telemetry data, e.g.,

    python main.py --system marconi100 -f /path/to/marconi100/job_table.parquet
    raps run-multi-part --system marconi100 -f /path/to/marconi100/job_table.parquet

This will dump a .npz file with a randomized name, e.g. ac23db.npz. Let's rename this file to pm100.npz for clarity. Note: can control-C when the simulation starts. Now, this pm100.npz file can be used with `multi-part-sim.py` as follows:

    python multi-part-sim.py -x setonix/* -f pm100.npz --arrival poisson --scale 192
    raps run-multi-part -x setonix/* -f pm100.npz --arrival poisson --scale 192

## Modifications to telemetry replay

@@ -151,11 +145,11 @@ python main.py -f $DPATH/slurm/joblive/$DATEDIR,$DPATH/jobprofile/$DATEDIR --pol

## Job-level power output example for replay of single job

    python main.py -f $DPATH/slurm/joblive/$DATEDIR,$DPATH/jobprofile/$DATEDIR --jid 1234567 -o
    raps run -f $DPATH/slurm/joblive/$DATEDIR,$DPATH/jobprofile/$DATEDIR --jid 1234567 -o

## Compute stats on telemetry data, e.g., average job arrival time

    python -m raps.telemetry -f $DPATH/slurm/joblive/$DATEDIR,$DPATH/jobprofile/$DATEDIR
    raps telemetry -f $DPATH/slurm/joblive/$DATEDIR,$DPATH/jobprofile/$DATEDIR

## Build and run Docker container

+144 −282
Original line number Diff line number Diff line
"""
Main driver for simulating the RAPS single-partition (homogeneous)
system in the ExaDigiT digital twin. Supports synthetic workload
generation or telemetry replay, dynamic power modeling (including
conversion losses), and optional coupling to a thermo-fluids cooling
model. Produces performance, utilization, and energy metrics, with
optional plots and output files for analysis and validation.
ExaDigiT Resource Allocator & Power Simulator (RAPS)
"""
import json
import numpy as np
import random
import pandas as pd
import os
import time
import math
#
import yaml
import argparse
import sys
from pathlib import Path
from raps.helpers import check_python_version
#
from raps.system_config import get_system_config
from raps.constants import OUTPUT_PATH, SEED
from raps.cooling import ThermoFluidsModel
from raps.ui import LayoutManager
from raps.flops import FLOPSManager
from raps.plotting import Plotter
from raps.power import (
    PowerManager,
    compute_node_power,
    compute_node_power_validate
)
from raps.power import (
    compute_node_power_uncertainties,
    compute_node_power_validate_uncertainties
)
from raps.engine import Engine
from raps.telemetry import Telemetry
from raps.workload import Workload
from raps.account import Accounts
from raps.weather import Weather
from raps.utils import write_dict_to_file
from raps.stats import (
    get_engine_stats,
    get_job_stats,
    get_scheduler_stats,
    get_network_stats,
    print_formatted_report
)

from raps.sim_config import args, args_dict
from raps.sim_config import SimConfig
from raps.run_sim import run_sim, run_multi_part_sim
from raps.workload import run_workload
from raps.telemetry import run_telemetry, run_telemetry_add_args
from raps.utils import pydantic_add_args, yaml_dump
from pydantic_settings import SettingsConfigDict

check_python_version()


def main():
    if args.verbose or args.debug:
        print(args)

    config = get_system_config(args.system).get_legacy()

    if args.seed:
        random.seed(SEED)
        np.random.seed(SEED)

    if args.cooling:
        cooling_model = ThermoFluidsModel(**config)
        cooling_model.initialize()
        args.layout = "layout2"

        if args_dict['start']:
            cooling_model.weather = Weather(args_dict['start'], config=config)
    else:
        cooling_model = None

    if args.validate:
        if args.uncertainties:
            power_manager = PowerManager(compute_node_power_validate_uncertainties, **config)
def read_sim_yaml(config_file: str):
    if config_file == "-":
        return yaml.safe_load(sys.stdin.read())
    elif config_file:
        return yaml.safe_load(Path(config_file).read_text())
    else:
            power_manager = PowerManager(compute_node_power_validate, **config)
    else:
        if args.uncertainties:
            power_manager = PowerManager(compute_node_power_uncertainties, **config)
        else:
            power_manager = PowerManager(compute_node_power, **config)
    args_dict['config'] = config
    flops_manager = FLOPSManager(**args_dict)

    if args.live and not args.replay:
        assert args.time is not None, {"--time must be set, specifing how long we want to predict"}
        td = Telemetry(**args_dict)
        jobs, timestep_start, timestep_end = \
            td.load_jobs_times_args_from_live_system()
        if args.output is not None:
            td.save_snapshot(jobs=jobs, timestep_start=timestep_start,
                             timestep_end=timestep_end, args=args, filename=td.dirname)
        return {}

    elif args.replay:

        td = Telemetry(**args_dict)
        jobs, timestep_start, timestep_end, args_from_file = \
            td.load_jobs_times_args_from_files(files=args.replay, args=args, config=config)
        # TODO: Merge args and args_from_files? see telemetry.py:97

    else:  # Synthetic jobs
        wl = Workload(args, config)
        jobs = wl.generate_jobs()

        if args.verbose:
            for job in jobs:
                print('jobid:', job.id, '\tlen(gpu_trace):',
                      len(job.gpu_trace) if isinstance(job.gpu_trace, list)
                      else job.gpu_trace, '\twall_time(s):',
                      job.wall_time)
            time.sleep(2)

        timestep_start = 0
        if hasattr(jobs[0], 'end_time'):
            timestep_end = int(math.ceil(max([job.end_time for job in jobs])))
        else:
            timestep_end = 88200  # 24 hours

        td = Telemetry(**args_dict)
        td.save_snapshot(jobs=jobs, timestep_start=timestep_start,
                         timestep_end=timestep_end, args=args, filename=td.dirname)

    if args.fastforward is not None:
        timestep_start = timestep_start + args.fastforward

    if args.time is not None:
        timestep_end = timestep_start + args.time

    if args.time_delta is not None:
        time_delta = args.time_delta
    else:
        time_delta = 1

    if args.continuous_job_generation:
        continuous_workload = wl
    else:
        continuous_workload = None

    sc = Engine(
        power_manager=power_manager,
        flops_manager=flops_manager,
        cooling_model=cooling_model,
        continuous_workload=continuous_workload,
        jobs=jobs,
        **args_dict,
CLI_CONFIG = SettingsConfigDict(
    cli_implicit_flags=True,
    cli_kebab_case=True,
)

    DIR_NAME = td.dirname
    OPATH = OUTPUT_PATH / DIR_NAME
    print("Output directory is: ", OPATH)
    sc.opath = OPATH

    if args.accounts:
        job_accounts = Accounts(jobs)
        if args.accounts_json:
            loaded_accounts = Accounts.from_json_filename(args.accounts_json)
            accounts = Accounts.merge(loaded_accounts, job_accounts)
        else:
            accounts = job_accounts
        sc.accounts = accounts

    if args.plot or args.output is not None:
        try:
            os.makedirs(OPATH)
        except OSError as error:
            print(f"Error creating directory: {error}")

    if args.verbose:
        print(jobs)

    total_timesteps = timestep_end - timestep_start

    downscale = args.downscale
    downscale_str = ""if downscale == 1 else f"/{downscale}"
    print(f"Simulating {len(jobs)} jobs for {total_timesteps}{downscale_str}"
          f" seconds from {timestep_start} to {timestep_end}.")
    print(f"Simulation time delta: {time_delta}{downscale_str} s,"
          f"Telemetry trace quanta: {jobs[0].trace_quanta}{downscale_str} s.")
    layout_manager = LayoutManager(args.layout, engine=sc, debug=args.debug,
                                   total_timesteps=total_timesteps,
                                   args_dict=args_dict, **config)
    layout_manager.run(jobs, timestep_start=timestep_start, timestep_end=timestep_end, time_delta=time_delta)

    engine_stats = get_engine_stats(sc)
    job_stats = get_job_stats(sc)
    scheduler_stats = get_scheduler_stats(sc)
    if sc.simulate_network:
        network_stats = get_network_stats(sc)
    else:
        network_stats = None

    print_formatted_report(engine_stats=engine_stats,
                           job_stats=job_stats,
                           scheduler_stats=scheduler_stats,
                           network_stats=network_stats
def main():
    parser = argparse.ArgumentParser(
        description="""
            ExaDigiT Resource Allocator & Power Simulator (RAPS)
        """,
        allow_abbrev=False,
    )

    if downscale_str:
        downscale_str = "1" + downscale_str

    if args.plot:
        if 'power' in args.plot:
            pl = Plotter(f"Time ({downscale_str}s)", 'Power (kW)', 'Power History',
                         OPATH / f'power.{args.imtype}',
                         uncertainties=args.uncertainties)
            x, y = zip(*power_manager.history)
            pl.plot_history(x, y)

        if 'util' in args.plot:
            pl = Plotter(f"Time ({downscale_str}s)", 'System Utilization (%)',
                         'System Utilization History', OPATH / f'util.{args.imtype}')
            x, y = zip(*sc.sys_util_history)
            pl.plot_history(x, y)

        if 'loss' in args.plot:
            pl = Plotter(f"Time ({downscale_str}s)", 'Power Losses (kW)', 'Power Loss History',
                         OPATH / f'loss.{args.imtype}',
                         uncertainties=args.uncertainties)
            x, y = zip(*power_manager.loss_history)
            pl.plot_history(x, y)

            pl = Plotter(f"Time ({downscale_str}s)", 'Power Losses (%)', 'Power Loss History',
                         OPATH / f'loss_pct.{args.imtype}',
                         uncertainties=args.uncertainties)
            x, y = zip(*power_manager.loss_history_percentage)
            pl.plot_history(x, y)

        if 'pue' in args.plot:
            if cooling_model:
                ylabel = 'pue'
                title = 'FMU ' + ylabel + 'History'
                pl = Plotter(f"Time ({downscale_str}s)", ylabel, title, OPATH / f'pue.{args.imtype}',
                             uncertainties=args.uncertainties)
                df = pd.DataFrame(cooling_model.fmu_history)
                df.to_parquet('cooling_model.parquet', engine='pyarrow')
                pl.plot_history(df['time'], df[ylabel])
            else:
                print('Cooling model not enabled... skipping output of plot')

        if 'temp' in args.plot:
            if cooling_model:
                ylabel = 'Tr_pri_Out[1]'
                title = 'FMU ' + ylabel + 'History'
                pl = Plotter(f"Time ({downscale_str}s)", ylabel, title, OPATH / 'temp.svg')
                df = pd.DataFrame(cooling_model.fmu_history)
                df.to_parquet('cooling_model.parquet', engine='pyarrow')
                pl.plot_compare(df['time'], df[ylabel])
            else:
                print('Cooling model not enabled... skipping output of plot')

    if args.output is not None:

        if args.uncertainties:
            # Parquet cannot handle annotated ufloat format AFAIK
            print('Data dump not implemented using uncertainties!')
        else:
            if cooling_model:
                df = pd.DataFrame(cooling_model.fmu_history)
                df.to_parquet(OPATH / 'cooling_model.parquet', engine='pyarrow')

            df = pd.DataFrame(power_manager.history)
            df.to_parquet(OPATH / 'power_history.parquet', engine='pyarrow')

            df = pd.DataFrame(power_manager.loss_history)
            df.to_parquet(OPATH / 'loss_history.parquet', engine='pyarrow')

            df = pd.DataFrame(sc.sys_util_history)
            df.to_parquet(OPATH / 'util.parquet', engine='pyarrow')

            # Schedule history
            job_history = pd.DataFrame(sc.get_job_history_dict())
            job_history.to_csv(OPATH / "job_history.csv", index=False)

            scheduler_running_history = pd.DataFrame(sc.get_scheduler_running_history())
            scheduler_running_history.to_csv(OPATH / "running_history.csv", index=False)
            scheduler_queue_history = pd.DataFrame(sc.get_scheduler_running_history())
            scheduler_queue_history.to_csv(OPATH / "queue_history.csv", index=False)

            try:
                with open(OPATH / 'stats.out', 'w') as f:
                    json.dump(engine_stats, f, indent=4)
                    json.dump(job_stats, f, indent=4)
            except TypeError:  # Is this the correct error code?
                write_dict_to_file(engine_stats, OPATH / 'stats.out')
                write_dict_to_file(job_stats, OPATH / 'stats.out')

            if args.accounts:
                try:
                    with open(OPATH / 'accounts.json', 'w') as f:
                        json_string = json.dumps(sc.accounts.to_dict())
                        f.write(json_string)
                except TypeError:
                    write_dict_to_file(sc.accounts.to_dict(), OPATH / 'accounts.json')
        print("Output directory is: ", OPATH)  # If output is enabled, the user wants this information as last output
    subparsers = parser.add_subparsers(required=True)

    # Shortcut for common sim args
    sim_shortcuts = {
        "partitions": "x",
        "cooling": "c",
        "simulate-network": "net",
        "fastforward": "ff",
        "time": "t",
        "debug": "d",
        "numjobs": "n",
        "verbose": "v",
        "output": "o",
        "uncertainties": "u",
        "plot": "p",
        "replay": "f",
        "workload": "w",
    }

    # ==== raps run ====
    cmd_run = subparsers.add_parser("run", description="""
        Run single-partition (homogeneous) systems. Supports synthetic workload generation or
        telemetry replay, dynamic power modeling (including conversion losses), and optional
        coupling to a thermo-fluids cooling model. Produces performance, utilization, and
        energy metrics, with optional plots and output files for analysis and validation.
    """)
    cmd_run.add_argument("config_file", nargs="?", default=None, help="""
        YAML sim config file, can be used to configure an experiment instead of using CLI
        flags. Pass "-" to read from stdin.
    """)
    cmd_run_validate = pydantic_add_args(cmd_run, SimConfig, model_config={
        **CLI_CONFIG,
        "cli_shortcuts": sim_shortcuts,
    })

    def cmd_run_func(args):
        sim_config = cmd_run_validate(args, read_sim_yaml(args.config_file))
        run_sim(sim_config)
    cmd_run.set_defaults(func=cmd_run_func)

    # ==== raps run-multi-part ====
    # It might make sense to combine these into a single entrypoint. Though the multi-part run
    # #doesn't support UI or the same output options.
    cmd_run_multi_part = subparsers.add_parser("run-multi-part", description="""
        Simulates multi-partition (heterogeneous) systems. Supports replaying telemetry or
        generating synthetic workloads across CPU-only, GPU, and mixed partitions. Initializes
        per-partition power, FLOPS, and scheduling models, then advances simulations in lockstep.
        Outputs per-partition performance, utilization, and energy statistics for systems such as
        MIT Supercloud, Setonix, Adastra, and LUMI.
    """)
    cmd_run_multi_part.add_argument("config_file", nargs="?", default=None, help="""
        YAML sim config file, can be used to configure an experiment instead of using CLI
        flags. Pass "-" to read from stdin.
    """)
    cmd_run_multi_part_validate = pydantic_add_args(cmd_run_multi_part, SimConfig, model_config={
        **CLI_CONFIG,
        "cli_shortcuts": sim_shortcuts,
    })

    def cmd_run_multi_part_func(args):
        sim_config = cmd_run_multi_part_validate(args, read_sim_yaml(args.config_file))
        run_multi_part_sim(sim_config)
    cmd_run_multi_part.set_defaults(func=cmd_run_multi_part_func)

    # ==== raps show ====
    cmd_show = subparsers.add_parser("show", description="""
        Outputs the given CLI args as a YAML config file that can be used to re-run the same
        simulation.
    """)
    cmd_show.add_argument("config_file", nargs="?", default=None, help="""
        Input YAML sim config file. Can be used to slightly modify an existing sim config.
    """)
    cmd_show.add_argument("--show-defaults", default=False, help="""
        If true, include defaults in the output YAML
    """)
    cmd_show_validate = pydantic_add_args(cmd_show, SimConfig, model_config={
        **CLI_CONFIG,
        "cli_shortcuts": sim_shortcuts,
    })

    def cmd_show_func(args):
        sim_config = cmd_show_validate(args, read_sim_yaml(args.config_file))
        sim_config = sim_config.model_dump(mode="json",
                                           exclude_defaults=not args.show_defaults)
        print(yaml_dump(sim_config), end="")
    cmd_show.set_defaults(func=cmd_show_func)

    # ==== raps workload ====
    # TODO: Separate the arguments for this command
    cmd_workload = subparsers.add_parser("workload", description="""
        Saves workload as a snapshot.
    """)
    cmd_workload.add_argument("config_file", nargs="?", default=None, help="""
        YAML sim config file, can be used to configure an experiment instead of using CLI
        flags. Pass "-" to read from stdin.
    """)
    cmd_workload_validate = pydantic_add_args(cmd_workload, SimConfig, model_config={
        **CLI_CONFIG,
        "cli_shortcuts": sim_shortcuts,
    })

    def cmd_workload_func(args):
        sim_config = cmd_workload_validate(args, read_sim_yaml(args.config_file))
        run_workload(sim_config)
    cmd_show.set_defaults(func=cmd_workload_func)

    # ==== raps telemetry ====
    cmd_telemetry = subparsers.add_parser("telemetry", description="""
        Telemetry data validator
    """)
    run_telemetry_add_args(cmd_telemetry)
    cmd_telemetry.set_defaults(func=run_telemetry)

    # TODO: move telemetry and other misc scripts into here

    args = parser.parse_args()
    args.func(args)


if __name__ == "__main__":

multi-part-sim-mpi.py

deleted100644 → 0
+0 −170

File deleted.

Preview size limit exceeded, changes collapsed.

Loading