Commit 8fe97142 authored by Maiterth, Matthias's avatar Maiterth, Matthias
Browse files

Merge branch 'more-main-refactoring' into 'develop'

More refactoring to main

See merge request !109
parents ac3588d9 e59b2410
Loading
Loading
Loading
Loading
+11 −136
Original line number Diff line number Diff line
"""
ExaDigiT Resource Allocator & Power Simulator (RAPS)
"""
import yaml
import argparse
import sys
from pathlib import Path
from raps.helpers import check_python_version
from raps.sim_config import SimConfig
from raps.run_sim import run_sim, run_multi_part_sim
from raps.workload import run_workload
from raps.telemetry import run_telemetry, run_telemetry_add_args
from raps.utils import pydantic_add_args, yaml_dump
from pydantic_settings import SettingsConfigDict
from raps.run_sim import run_sim_add_parser, run_multi_part_sim_add_parser, show_add_parser
from raps.workload import run_workload_add_parser
from raps.telemetry import run_telemetry_add_parser

check_python_version()


def read_sim_yaml(config_file: str):
    if config_file == "-":
        return yaml.safe_load(sys.stdin.read())
    elif config_file:
        return yaml.safe_load(Path(config_file).read_text())
    else:
        return {}


CLI_CONFIG = SettingsConfigDict(
    cli_implicit_flags=True,
    cli_kebab_case=True,
)


def main(cli_args: list[str] | None = None):
    parser = argparse.ArgumentParser(
        description="""
@@ -40,121 +19,17 @@ def main(cli_args: list[str] | None = None):
    )
    subparsers = parser.add_subparsers(required=True)

    # Shortcut for common sim args
    sim_shortcuts = {
        "partitions": "x",
        "cooling": "c",
        "simulate-network": "net",
        "fastforward": "ff",
        "time": "t",
        "debug": "d",
        "numjobs": "n",
        "verbose": "v",
        "output": "o",
        "uncertainties": "u",
        "plot": "p",
        "replay": "f",
        "workload": "w",
    }

    # ==== raps run ====
    cmd_run = subparsers.add_parser("run", description="""
        Run single-partition (homogeneous) systems. Supports synthetic workload generation or
        telemetry replay, dynamic power modeling (including conversion losses), and optional
        coupling to a thermo-fluids cooling model. Produces performance, utilization, and
        energy metrics, with optional plots and output files for analysis and validation.
    """)
    cmd_run.add_argument("config_file", nargs="?", default=None, help="""
        YAML sim config file, can be used to configure an experiment instead of using CLI
        flags. Pass "-" to read from stdin.
    """)
    cmd_run_validate = pydantic_add_args(cmd_run, SimConfig, model_config={
        **CLI_CONFIG,
        "cli_shortcuts": sim_shortcuts,
    })

    def cmd_run_func(args):
        sim_config = cmd_run_validate(args, read_sim_yaml(args.config_file))
        run_sim(sim_config)
    cmd_run.set_defaults(func=cmd_run_func)

    # ==== raps run-multi-part ====
    # It might make sense to combine these into a single entrypoint. Though the multi-part run
    # #doesn't support UI or the same output options.
    cmd_run_multi_part = subparsers.add_parser("run-multi-part", description="""
        Simulates multi-partition (heterogeneous) systems. Supports replaying telemetry or
        generating synthetic workloads across CPU-only, GPU, and mixed partitions. Initializes
        per-partition power, FLOPS, and scheduling models, then advances simulations in lockstep.
        Outputs per-partition performance, utilization, and energy statistics for systems such as
        MIT Supercloud, Setonix, Adastra, and LUMI.
    """)
    cmd_run_multi_part.add_argument("config_file", nargs="?", default=None, help="""
        YAML sim config file, can be used to configure an experiment instead of using CLI
        flags. Pass "-" to read from stdin.
    """)
    cmd_run_multi_part_validate = pydantic_add_args(cmd_run_multi_part, SimConfig, model_config={
        **CLI_CONFIG,
        "cli_shortcuts": sim_shortcuts,
    })

    def cmd_run_multi_part_func(args):
        sim_config = cmd_run_multi_part_validate(args, read_sim_yaml(args.config_file))
        run_multi_part_sim(sim_config)
    cmd_run_multi_part.set_defaults(func=cmd_run_multi_part_func)

    # ==== raps show ====
    cmd_show = subparsers.add_parser("show", description="""
        Outputs the given CLI args as a YAML config file that can be used to re-run the same
        simulation.
    """)
    cmd_show.add_argument("config_file", nargs="?", default=None, help="""
        Input YAML sim config file. Can be used to slightly modify an existing sim config.
    """)
    cmd_show.add_argument("--show-defaults", default=False, help="""
        If true, include defaults in the output YAML
    """)
    cmd_show_validate = pydantic_add_args(cmd_show, SimConfig, model_config={
        **CLI_CONFIG,
        "cli_shortcuts": sim_shortcuts,
    })

    def cmd_show_func(args):
        sim_config = cmd_show_validate(args, read_sim_yaml(args.config_file))
        sim_config = sim_config.model_dump(mode="json",
                                           exclude_defaults=not args.show_defaults)
        print(yaml_dump(sim_config), end="")
    cmd_show.set_defaults(func=cmd_show_func)

    # ==== raps workload ====
    # TODO: Separate the arguments for this command
    cmd_workload = subparsers.add_parser("workload", description="""
        Saves workload as a snapshot.
    """)
    cmd_workload.add_argument("config_file", nargs="?", default=None, help="""
        YAML sim config file, can be used to configure an experiment instead of using CLI
        flags. Pass "-" to read from stdin.
    """)
    cmd_workload_validate = pydantic_add_args(cmd_workload, SimConfig, model_config={
        **CLI_CONFIG,
        "cli_shortcuts": sim_shortcuts,
    })

    def cmd_workload_func(args):
        sim_config = cmd_workload_validate(args, read_sim_yaml(args.config_file))
        run_workload(sim_config)
    cmd_show.set_defaults(func=cmd_workload_func)

    # ==== raps telemetry ====
    cmd_telemetry = subparsers.add_parser("telemetry", description="""
        Telemetry data validator
    """)
    run_telemetry_add_args(cmd_telemetry)
    cmd_telemetry.set_defaults(func=run_telemetry)
    run_sim_add_parser(subparsers)
    run_multi_part_sim_add_parser(subparsers)
    show_add_parser(subparsers)
    run_workload_add_parser(subparsers)
    run_telemetry_add_parser(subparsers)

    # TODO: move telemetry and other misc scripts into here
    # TODO: move other misc scripts into here

    args = parser.parse_args(cli_args)
    args.func(args)
    assert args.impl, "subparsers should add an impl function to args"
    args.impl(args)


if __name__ == "__main__":
+95 −1
Original line number Diff line number Diff line
@@ -6,11 +6,13 @@ These functions just handle rendering the terminal UI and outputting results to
import json
import pandas as pd
import sys
import yaml
from pathlib import Path
from raps.ui import LayoutManager
from raps.plotting import Plotter
from raps.engine import Engine
from raps.multi_part_engine import MultiPartEngine
from raps.utils import write_dict_to_file
from raps.utils import write_dict_to_file, pydantic_add_args, SubParsers, yaml_dump
from raps.stats import (
    get_engine_stats,
    get_job_stats,
@@ -22,6 +24,51 @@ from raps.stats import (
from raps.sim_config import SimConfig


def read_yaml(config_file: str):
    if config_file == "-":
        return yaml.safe_load(sys.stdin.read())
    elif config_file:
        return yaml.safe_load(Path(config_file).read_text())
    else:
        return {}


shortcuts = {
    "partitions": "x",
    "cooling": "c",
    "simulate-network": "net",
    "fastforward": "ff",
    "time": "t",
    "debug": "d",
    "numjobs": "n",
    "verbose": "v",
    "output": "o",
    "uncertainties": "u",
    "plot": "p",
    "replay": "f",
    "workload": "w",
}


def run_sim_add_parser(subparsers: SubParsers):
    parser = subparsers.add_parser("run", description="""
        Run single-partition (homogeneous) systems. Supports synthetic workload generation or
        telemetry replay, dynamic power modeling (including conversion losses), and optional
        coupling to a thermo-fluids cooling model. Produces performance, utilization, and
        energy metrics, with optional plots and output files for analysis and validation.
    """)
    parser.add_argument("config_file", nargs="?", default=None, help="""
        YAML sim config file, can be used to configure an experiment instead of using CLI
        flags. Pass "-" to read from stdin.
    """)
    model_validate = pydantic_add_args(parser, SimConfig, model_config={
        "cli_shortcuts": shortcuts,
    })
    parser.set_defaults(
        impl=lambda args: run_sim(model_validate(args, read_yaml(args.config_file)))
    )


def run_sim(sim_config: SimConfig):
    if sim_config.verbose or sim_config.debug:
        print(f"SimConfig: {sim_config.model_dump_json(indent=4)}")
@@ -174,6 +221,26 @@ def run_sim(sim_config: SimConfig):
        print("Output directory is: ", out)  # If output is enabled, the user wants this information as last output


def run_multi_part_sim_add_parser(subparsers: SubParsers):
    parser = subparsers.add_parser("run-multi-part", description="""
        Simulates multi-partition (heterogeneous) systems. Supports replaying telemetry or
        generating synthetic workloads across CPU-only, GPU, and mixed partitions. Initializes
        per-partition power, FLOPS, and scheduling models, then advances simulations in lockstep.
        Outputs per-partition performance, utilization, and energy statistics for systems such as
        MIT Supercloud, Setonix, Adastra, and LUMI.
    """)
    parser.add_argument("config_file", nargs="?", default=None, help="""
        YAML sim config file, can be used to configure an experiment instead of using CLI
        flags. Pass "-" to read from stdin.
    """)
    model_validate = pydantic_add_args(parser, SimConfig, model_config={
        "cli_shortcuts": shortcuts,
    })
    parser.set_defaults(
        impl=lambda args: run_multi_part_sim(model_validate(args, read_yaml(args.config_file)))
    )


def run_multi_part_sim(sim_config: SimConfig):
    multi_engine, jobs, timestep_start, timestep_end, time_delta = MultiPartEngine.from_sim_config(sim_config)

@@ -234,3 +301,30 @@ def run_multi_part_sim(sim_config: SimConfig):
            scheduler_stats=scheduler_stats,
            network_stats=network_stats,
        )


def show_add_parser(subparsers: SubParsers):
    parser = subparsers.add_parser("show", description="""
        Outputs the given CLI args as a YAML config file that can be used to re-run the same
        simulation.
    """)
    parser.add_argument("config_file", nargs="?", default=None, help="""
        Input YAML sim config file. Can be used to slightly modify an existing sim config.
    """)
    parser.add_argument("--show-defaults", default=False, help="""
        If true, include defaults in the output YAML
    """)
    model_validate = pydantic_add_args(parser, SimConfig, model_config={
        "cli_shortcuts": shortcuts,
    })

    def impl(args):
        sim_config = model_validate(args, read_yaml(args.config_file))
        show(sim_config, show_defaults=args.show_defaults)

    parser.set_defaults(impl=impl)


def show(sim_config: SimConfig, show_defaults=False):
    data = sim_config.model_dump(mode="json", exclude_defaults=not show_defaults)
    print(yaml_dump(data), end="")
+49 −26
Original line number Diff line number Diff line
@@ -6,9 +6,9 @@ parsing parquet files, and generating job state information.
The module defines a `Telemetry` class for managing telemetry data and several
helper functions for data encryption and conversion between node name and index formats.
"""
from typing import Literal
import sys
import random
import argparse
from pathlib import Path
# import json
from typing import Optional
@@ -18,6 +18,7 @@ import importlib
import numpy as np
import pandas as pd
from tqdm import tqdm
from pydantic import BaseModel
# from rich.progress import track

from raps.system_config import get_system_config
@@ -28,7 +29,9 @@ from raps.plotting import (
    plot_nodes_gantt,
    plot_network_histogram
)
from raps.utils import next_arrival_byconfargs, convert_to_time_unit
from raps.utils import (
    next_arrival_byconfargs, convert_to_time_unit, pydantic_add_args, SubParsers, ExpandedPath,
)


class Telemetry:
@@ -266,30 +269,50 @@ class Telemetry:
        return jobs, timestep_start, timestep_end, args


def run_telemetry_add_args(parser: argparse.ArgumentParser):
    parser.add_argument('--jid', type=str, default='*', help='Replay job id')
    parser.add_argument('-f', '--replay', nargs='+', type=str,
                        help='Either: path/to/joblive path/to/jobprofile'
                             ' -or- filename.npz (overrides --workload option)')
    parser.add_argument('-p', '--plot', type=str, default=None, choices=['jobs', 'nodes'], help='Output plots')
    parser.add_argument("--is-results-file", action='store_true', default=False, help='Output plots')
    parser.add_argument("--gantt-nodes", default=False, action='store_true', required=False,
                        # duplicate in workload!
                        help="Print Gannt with nodes required as line thickness (default false)")
    parser.add_argument('-t', '--time', type=str, default=None,
                        help='Length of time to simulate, e.g., 123, 123s, 27m, 3h, 7d')
    parser.add_argument('--system', type=str, default='frontier', help='System config to use')
    choices = ['prescribed', 'poisson']
    parser.add_argument('--arrival', default=choices[0], type=str, choices=choices,
                        help=f"Modify arrival distribution ({choices[1]}) "
                        f"or use the original submit times ({choices[0]})")
    parser.add_argument('-v', '--verbose', action='store_true', help='Enable verbose output')
    parser.add_argument('-o', '--output', type=str, default=None, help='Store output in --output <arg> file.')
    parser.add_argument("--live", action="store_true", help="Grab data from live system.")


def run_telemetry(args):
    args_dict = vars(args)
class TelemetryArgs(BaseModel):
    jid: str = '*'
    """ Replay job id """
    replay: list[ExpandedPath] | None = None
    """ path/to/joblive path/to/jobprofile  -or- filename.npz (overrides --workload option) """
    plot: list[Literal["jobs", "nodes"]] | None = None
    """ Output plots """
    is_results_file: bool = False
    gantt_nodes: bool = False
    """ Print Gannt with nodes required as line thickness (default false) """
    time: str | None = None
    """ Length of time to simulate, e.g., 123, 123s, 27m, 3h, 7d """
    system: str = 'frontier'
    """ System config to use """
    arrival: Literal['prescribed', 'poisson'] = "prescribed"
    """ Modify arrival distribution ({choices[1]}) or use the original submit times """
    verbose: bool = False
    output: str | None = None
    """ Store output in --output <arg> file. """
    live: bool = False
    """ Grab data from live system. """


shortcuts = {
    "replay": "f",
    "plot": "p",
    "time": "t",
    "verbose": "v",
    "output": "o",
}


def run_telemetry_add_parser(subparsers: SubParsers):
    parser = subparsers.add_parser("telemetry", description="""
        Telemetry data validator
    """)
    model_validate = pydantic_add_args(parser, TelemetryArgs, {
        "cli_shortcuts": shortcuts,
    })
    parser.set_defaults(impl=lambda args: run_telemetry(model_validate(args, {})))


def run_telemetry(args: TelemetryArgs):
    args_dict = args.model_dump()
    config = get_system_config(args.system).get_legacy()
    args_dict['config'] = config
    td = Telemetry(**args_dict)
+7 −1
Original line number Diff line number Diff line
@@ -20,7 +20,7 @@ import uuid
import json
import argparse
from pathlib import Path
from typing import Annotated as A, TypeVar, Callable
from typing import Annotated as A, TypeVar, Callable, TypeAlias
from pydantic import BaseModel, TypeAdapter, AfterValidator
from pydantic_settings import BaseSettings, SettingsConfigDict, CliApp, CliSettingsSource
import yaml
@@ -650,6 +650,8 @@ def pydantic_add_args(
    some hacks to apply the args manually.
    """
    model_config_dict = SettingsConfigDict({
        "cli_implicit_flags": True,
        "cli_kebab_case": True,
        **(model_config or {}),
        "cli_parse_args": False,  # Don't automatically parse args
    })
@@ -676,6 +678,10 @@ def pydantic_add_args(
    return model_validate_args


SubParsers: TypeAlias = "argparse._SubParsersAction[argparse.ArgumentParser]"
""" Alias for the result of argparse parser.add_subparsers """


def yaml_dump(data):
    """ Dumps yaml with pretty formatting """
    class IndentDumper(yaml.Dumper):
+17 −1
Original line number Diff line number Diff line
@@ -38,7 +38,7 @@ import numpy as np
import matplotlib.pyplot as plt
from raps.telemetry import Telemetry
from raps.job import job_dict, Job
from raps.utils import create_file_indexed
from raps.utils import create_file_indexed, SubParsers, pydantic_add_args
from raps.sim_config import SimConfig


@@ -949,6 +949,22 @@ def plot_job_hist(jobs, config=None, dist_split=None, gantt_nodes=False):
    plt.show()


def run_workload_add_parser(subparsers: SubParsers):
    from raps.run_sim import shortcuts
    # TODO: Separate the arguments for this command
    parser = subparsers.add_parser("workload", description="""
        Saves workload as a snapshot.
    """)
    parser.add_argument("config_file", nargs="?", default=None, help="""
        YAML sim config file, can be used to configure an experiment instead of using CLI
        flags. Pass "-" to read from stdin.
    """)
    model_validate = pydantic_add_args(parser, SimConfig, model_config={
        "cli_shortcuts": shortcuts,
    })
    parser.set_defaults(impl=lambda args: run_workload(model_validate(args, {})))


def run_workload(sim_config: SimConfig):
    args = sim_config.get_legacy_args()
    args_dict = sim_config.get_legacy_args()
Loading