diff --git a/.flake8 b/.flake8 index ce4ab0ac6d82d27416769484a719a259f75e05a4..ffffb5c4142d1ed46288d07daad6c917399287e8 100644 --- a/.flake8 +++ b/.flake8 @@ -1,3 +1,3 @@ [flake8] -exclude = .git, __pycache__, venv*, simulation_results, third_party, models +exclude = .git, __pycache__, venv*, simulation_results, third_party, models, .venv max-line-length = 120 diff --git a/.gitignore b/.gitignore index 74a41d86198870d730d0ef6d3d1306b805966a10..bf4992384792ca911820fbb085a4f85ba0969e84 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,4 @@ venv *.npz *.prof simulation_results/ +models/*.fmu diff --git a/README.md b/README.md index 665a64ac8a5fd58ac9425026cba0749c51e8b38b..a561278c7667e81cbc703bcf4902c4d4704c0329 100644 --- a/README.md +++ b/README.md @@ -19,37 +19,37 @@ Note: Requires python3.12 or greater. ## Usage and help menu - python main.py -h + raps run -h ## Run simulator with default synthetic workload - python main.py + raps run ## Run simulator with telemetry replay # Frontier DATEDIR="date=2024-01-18" DPATH=~/data/frontier-sample-2024-01-18 - python main.py -f $DPATH/slurm/joblive/$DATEDIR,$DPATH/jobprofile/$DATEDIR + raps run -f $DPATH/slurm/joblive/$DATEDIR,$DPATH/jobprofile/$DATEDIR ## Open Telemetry dataset For Marconi supercomputer, download `job_table.parquet` from https://zenodo.org/records/10127767 # Marconi100 - python main.py --system marconi100 -f ~/data/marconi100/job_table.parquet + raps run --system marconi100 -f ~/data/marconi100/job_table.parquet For Adastra MI250 supercomputer, download 'AdastaJobsMI250_15days.parquet' from https://zenodo.org/records/14007065 # Adastra MI250 - python main.py --system adastraMI250 -f AdastaJobsMI250_15days.parquet + raps run --system adastraMI250 -f AdastaJobsMI250_15days.parquet For Google cluster trace v2 - python main.py --system gcloudv2 -f ~/data/gcloud/v2/google_cluster_data_2011_sample --ff 600 + raps run --system gcloudv2 -f ~/data/gcloud/v2/google_cluster_data_2011_sample --ff 600 # analyze dataset - python -m raps.telemetry --system gcloudv2 -f ~/data/gcloud/v2/google_cluster_data_2011_sample -v + raps telemetry --system gcloudv2 -f ~/data/gcloud/v2/google_cluster_data_2011_sample -v For MIT Supercloud @@ -62,28 +62,28 @@ For MIT Supercloud python -m raps.dataloaders.mit_supercloud.cli download --start 2021-05-21T13:00 --end 2021-05-21T14:00 # Load data and run simulation - will save data as part-cpu.npz and part-gpu.npz files - python multi-part-sim.py -x 'mit_supercloud/*' -f $DPATH --system mit_supercloud \ + raps run-multi-part -x 'mit_supercloud/*' -f $DPATH --system mit_supercloud \ --start 2021-05-21T13:00 --end 2021-05-21T14:00 # Note: if no start, end dates provided will default to run 24 hours between # 2021-05-21T00:00 to 2021-05-22T00:00 set by defaults in raps/dataloaders/mit_supercloud/utils.py # Re-run simulation using npz files (much faster load) - python multi-part-sim.py -x mit_supercloud/* -f part-*.npz --system mit_supercloud + raps run-multi-part -x mit_supercloud/* -f part-*.npz --system mit_supercloud # Synthetic tests for verification studies: - python multi-part-sim.py -x 'mit_supercloud/*' -w multitenant + raps run-multi-part -x 'mit_supercloud/*' -w multitenant For Lumi # Synthetic test for lumi multi-part-sim: - python multi-part-sim.py -x lumi/* + raps run-multi-part -x lumi/* ## Perform Network Simulation Lassen is one of the few datasets that has networking data. See `raps/dataloaders/lassen.py` for how to get the datasets. To run a network simulation, use the following command: - python main.py -f ~/data/lassen/Lassen-Supercomputer-Job-Dataset --system lassen --policy fcfs --backfill firstfit --ff 365d -t 12h --arrival poisson --net + raps run -f ~/data/lassen/Lassen-Supercomputer-Job-Dataset --system lassen --policy fcfs --backfill firstfit --ff 365d -t 12h --arrival poisson --net ## Snapshot of extracted workload data @@ -91,7 +91,7 @@ To reduce the expense of extracting the needed data from the telemetry parquet f RAPS saves a snapshot of the extracted data in NPZ format. The NPZ file can be given instead of the parquet files for more quickly running subsequent simulations, e.g.: - python main.py -f jobs_2024-02-20_12-20-39.npz + raps run -f jobs_2024-02-20_12-20-39.npz ## Cooling models @@ -103,37 +103,31 @@ We provide several cooling models in the repo https://code.ornl.gov/exadigit/POW Will install the POWER9CSM in the models folder. To activate cooling when running RAPS, use `--cooling` or `-c` argument. e.g., - python main.py --system marconi100 -c + raps run --system marconi100 -c - python main.py --system lassen -c + raps run --system lassen -c - python main.py --system summit -c + raps run --system summit -c ## Support for multiple system partitions Multi-partition systems are supported by running the `multi-part-sim.py` script, where a list of configurations can be specified using the `-x` flag as follows: - python multi-part-sim.py -x setonix/part-cpu setonix/part-gpu + raps run-multi-part -x setonix/part-cpu setonix/part-gpu or simply: - python multi-part-sim.py -x setonix/* # bash + raps run-multi-part -x setonix/* # bash - python multi-part-sim.py -x 'setonix/*' # zsh - -To run this in parallel use: - - mpiexec -n 2 python multi-part-sim-mpi.py -x setonix/part-cpu setonix/part-gpu - -*Note: first install `mpi4py` via pip or conda.* + raps run-multi-part -x 'setonix/*' # zsh This will simulate synthetic workloads on two partitions as defined in `config/setonix-cpu` and `config/setonix-gpu`. To replay telemetry workloads from another system, e.g., Marconi100's PM100 dataset, first create a .npz snapshot of the telemetry data, e.g., - python main.py --system marconi100 -f /path/to/marconi100/job_table.parquet + raps run-multi-part --system marconi100 -f /path/to/marconi100/job_table.parquet This will dump a .npz file with a randomized name, e.g. ac23db.npz. Let's rename this file to pm100.npz for clarity. Note: can control-C when the simulation starts. Now, this pm100.npz file can be used with `multi-part-sim.py` as follows: - python multi-part-sim.py -x setonix/* -f pm100.npz --arrival poisson --scale 192 + raps run-multi-part -x setonix/* -f pm100.npz --arrival poisson --scale 192 ## Modifications to telemetry replay @@ -151,11 +145,11 @@ python main.py -f $DPATH/slurm/joblive/$DATEDIR,$DPATH/jobprofile/$DATEDIR --pol ## Job-level power output example for replay of single job - python main.py -f $DPATH/slurm/joblive/$DATEDIR,$DPATH/jobprofile/$DATEDIR --jid 1234567 -o + raps run -f $DPATH/slurm/joblive/$DATEDIR,$DPATH/jobprofile/$DATEDIR --jid 1234567 -o ## Compute stats on telemetry data, e.g., average job arrival time - python -m raps.telemetry -f $DPATH/slurm/joblive/$DATEDIR,$DPATH/jobprofile/$DATEDIR + raps telemetry -f $DPATH/slurm/joblive/$DATEDIR,$DPATH/jobprofile/$DATEDIR ## Build and run Docker container diff --git a/main.py b/main.py index c3ba946ad64bd66572b25e4d6487bd58718e23a4..cd7e1621d2a1e76621d28252b4e7c4fac2a00f4c 100644 --- a/main.py +++ b/main.py @@ -1,298 +1,160 @@ """ -Main driver for simulating the RAPS single-partition (homogeneous) -system in the ExaDigiT digital twin. Supports synthetic workload -generation or telemetry replay, dynamic power modeling (including -conversion losses), and optional coupling to a thermo-fluids cooling -model. Produces performance, utilization, and energy metrics, with -optional plots and output files for analysis and validation. +ExaDigiT Resource Allocator & Power Simulator (RAPS) """ -import json -import numpy as np -import random -import pandas as pd -import os -import time -import math -# +import yaml +import argparse +import sys +from pathlib import Path from raps.helpers import check_python_version -# -from raps.system_config import get_system_config -from raps.constants import OUTPUT_PATH, SEED -from raps.cooling import ThermoFluidsModel -from raps.ui import LayoutManager -from raps.flops import FLOPSManager -from raps.plotting import Plotter -from raps.power import ( - PowerManager, - compute_node_power, - compute_node_power_validate -) -from raps.power import ( - compute_node_power_uncertainties, - compute_node_power_validate_uncertainties -) -from raps.engine import Engine -from raps.telemetry import Telemetry -from raps.workload import Workload -from raps.account import Accounts -from raps.weather import Weather -from raps.utils import write_dict_to_file -from raps.stats import ( - get_engine_stats, - get_job_stats, - get_scheduler_stats, - get_network_stats, - print_formatted_report -) - -from raps.sim_config import args, args_dict +from raps.sim_config import SimConfig +from raps.run_sim import run_sim, run_multi_part_sim +from raps.workload import run_workload +from raps.telemetry import run_telemetry, run_telemetry_add_args +from raps.utils import pydantic_add_args, yaml_dump +from pydantic_settings import SettingsConfigDict check_python_version() -def main(): - if args.verbose or args.debug: - print(args) - - config = get_system_config(args.system).get_legacy() - - if args.seed: - random.seed(SEED) - np.random.seed(SEED) - - if args.cooling: - cooling_model = ThermoFluidsModel(**config) - cooling_model.initialize() - args.layout = "layout2" - - if args_dict['start']: - cooling_model.weather = Weather(args_dict['start'], config=config) +def read_sim_yaml(config_file: str): + if config_file == "-": + return yaml.safe_load(sys.stdin.read()) + elif config_file: + return yaml.safe_load(Path(config_file).read_text()) else: - cooling_model = None - - if args.validate: - if args.uncertainties: - power_manager = PowerManager(compute_node_power_validate_uncertainties, **config) - else: - power_manager = PowerManager(compute_node_power_validate, **config) - else: - if args.uncertainties: - power_manager = PowerManager(compute_node_power_uncertainties, **config) - else: - power_manager = PowerManager(compute_node_power, **config) - args_dict['config'] = config - flops_manager = FLOPSManager(**args_dict) - - if args.live and not args.replay: - assert args.time is not None, {"--time must be set, specifing how long we want to predict"} - td = Telemetry(**args_dict) - jobs, timestep_start, timestep_end = \ - td.load_jobs_times_args_from_live_system() - if args.output is not None: - td.save_snapshot(jobs=jobs, timestep_start=timestep_start, - timestep_end=timestep_end, args=args, filename=td.dirname) + return {} - elif args.replay: - td = Telemetry(**args_dict) - jobs, timestep_start, timestep_end, args_from_file = \ - td.load_jobs_times_args_from_files(files=args.replay, args=args, config=config) - # TODO: Merge args and args_from_files? see telemetry.py:97 - - else: # Synthetic jobs - wl = Workload(args, config) - jobs = wl.generate_jobs() - - if args.verbose: - for job in jobs: - print('jobid:', job.id, '\tlen(gpu_trace):', - len(job.gpu_trace) if isinstance(job.gpu_trace, list) - else job.gpu_trace, '\twall_time(s):', - job.wall_time) - time.sleep(2) - - timestep_start = 0 - if hasattr(jobs[0], 'end_time'): - timestep_end = int(math.ceil(max([job.end_time for job in jobs]))) - else: - timestep_end = 88200 # 24 hours - - td = Telemetry(**args_dict) - td.save_snapshot(jobs=jobs, timestep_start=timestep_start, - timestep_end=timestep_end, args=args, filename=td.dirname) - - if args.fastforward is not None: - timestep_start = timestep_start + args.fastforward - - if args.time is not None: - timestep_end = timestep_start + args.time - - if args.time_delta is not None: - time_delta = args.time_delta - else: - time_delta = 1 +CLI_CONFIG = SettingsConfigDict( + cli_implicit_flags=True, + cli_kebab_case=True, +) - if args.continuous_job_generation: - continuous_workload = wl - else: - continuous_workload = None - sc = Engine( - power_manager=power_manager, - flops_manager=flops_manager, - cooling_model=cooling_model, - continuous_workload=continuous_workload, - jobs=jobs, - **args_dict, +def main(): + parser = argparse.ArgumentParser( + description=""" + ExaDigiT Resource Allocator & Power Simulator (RAPS) + """, + allow_abbrev=False, ) - - DIR_NAME = td.dirname - OPATH = OUTPUT_PATH / DIR_NAME - print("Output directory is: ", OPATH) - sc.opath = OPATH - - if args.accounts: - job_accounts = Accounts(jobs) - if args.accounts_json: - loaded_accounts = Accounts.from_json_filename(args.accounts_json) - accounts = Accounts.merge(loaded_accounts, job_accounts) - else: - accounts = job_accounts - sc.accounts = accounts - - if args.plot or args.output is not None: - try: - os.makedirs(OPATH) - except OSError as error: - print(f"Error creating directory: {error}") - - if args.verbose: - print(jobs) - - total_timesteps = timestep_end - timestep_start - - downscale = args.downscale - downscale_str = ""if downscale == 1 else f"/{downscale}" - print(f"Simulating {len(jobs)} jobs for {total_timesteps}{downscale_str}" - f" seconds from {timestep_start} to {timestep_end}.") - print(f"Simulation time delta: {time_delta}{downscale_str} s," - f"Telemetry trace quanta: {jobs[0].trace_quanta}{downscale_str} s.") - layout_manager = LayoutManager(args.layout, engine=sc, debug=args.debug, - total_timesteps=total_timesteps, - args_dict=args_dict, **config) - layout_manager.run(jobs, timestep_start=timestep_start, timestep_end=timestep_end, time_delta=time_delta) - - engine_stats = get_engine_stats(sc) - job_stats = get_job_stats(sc) - scheduler_stats = get_scheduler_stats(sc) - if sc.simulate_network: - network_stats = get_network_stats(sc) - else: - network_stats = None - - print_formatted_report(engine_stats=engine_stats, - job_stats=job_stats, - scheduler_stats=scheduler_stats, - network_stats=network_stats - ) - - if downscale_str: - downscale_str = "1" + downscale_str - - if args.plot: - if 'power' in args.plot: - pl = Plotter(f"Time ({downscale_str}s)", 'Power (kW)', 'Power History', - OPATH / f'power.{args.imtype}', - uncertainties=args.uncertainties) - x, y = zip(*power_manager.history) - pl.plot_history(x, y) - - if 'util' in args.plot: - pl = Plotter(f"Time ({downscale_str}s)", 'System Utilization (%)', - 'System Utilization History', OPATH / f'util.{args.imtype}') - x, y = zip(*sc.sys_util_history) - pl.plot_history(x, y) - - if 'loss' in args.plot: - pl = Plotter(f"Time ({downscale_str}s)", 'Power Losses (kW)', 'Power Loss History', - OPATH / f'loss.{args.imtype}', - uncertainties=args.uncertainties) - x, y = zip(*power_manager.loss_history) - pl.plot_history(x, y) - - pl = Plotter(f"Time ({downscale_str}s)", 'Power Losses (%)', 'Power Loss History', - OPATH / f'loss_pct.{args.imtype}', - uncertainties=args.uncertainties) - x, y = zip(*power_manager.loss_history_percentage) - pl.plot_history(x, y) - - if 'pue' in args.plot: - if cooling_model: - ylabel = 'pue' - title = 'FMU ' + ylabel + 'History' - pl = Plotter(f"Time ({downscale_str}s)", ylabel, title, OPATH / f'pue.{args.imtype}', - uncertainties=args.uncertainties) - df = pd.DataFrame(cooling_model.fmu_history) - df.to_parquet('cooling_model.parquet', engine='pyarrow') - pl.plot_history(df['time'], df[ylabel]) - else: - print('Cooling model not enabled... skipping output of plot') - - if 'temp' in args.plot: - if cooling_model: - ylabel = 'Tr_pri_Out[1]' - title = 'FMU ' + ylabel + 'History' - pl = Plotter(f"Time ({downscale_str}s)", ylabel, title, OPATH / 'temp.svg') - df = pd.DataFrame(cooling_model.fmu_history) - df.to_parquet('cooling_model.parquet', engine='pyarrow') - pl.plot_compare(df['time'], df[ylabel]) - else: - print('Cooling model not enabled... skipping output of plot') - - if args.output is not None: - - if args.uncertainties: - # Parquet cannot handle annotated ufloat format AFAIK - print('Data dump not implemented using uncertainties!') - else: - if cooling_model: - df = pd.DataFrame(cooling_model.fmu_history) - df.to_parquet(OPATH / 'cooling_model.parquet', engine='pyarrow') - - df = pd.DataFrame(power_manager.history) - df.to_parquet(OPATH / 'power_history.parquet', engine='pyarrow') - - df = pd.DataFrame(power_manager.loss_history) - df.to_parquet(OPATH / 'loss_history.parquet', engine='pyarrow') - - df = pd.DataFrame(sc.sys_util_history) - df.to_parquet(OPATH / 'util.parquet', engine='pyarrow') - - # Schedule history - job_history = pd.DataFrame(sc.get_job_history_dict()) - job_history.to_csv(OPATH / "job_history.csv", index=False) - - scheduler_running_history = pd.DataFrame(sc.get_scheduler_running_history()) - scheduler_running_history.to_csv(OPATH / "running_history.csv", index=False) - scheduler_queue_history = pd.DataFrame(sc.get_scheduler_running_history()) - scheduler_queue_history.to_csv(OPATH / "queue_history.csv", index=False) - - try: - with open(OPATH / 'stats.out', 'w') as f: - json.dump(engine_stats, f, indent=4) - json.dump(job_stats, f, indent=4) - except TypeError: # Is this the correct error code? - write_dict_to_file(engine_stats, OPATH / 'stats.out') - write_dict_to_file(job_stats, OPATH / 'stats.out') - - if args.accounts: - try: - with open(OPATH / 'accounts.json', 'w') as f: - json_string = json.dumps(sc.accounts.to_dict()) - f.write(json_string) - except TypeError: - write_dict_to_file(sc.accounts.to_dict(), OPATH / 'accounts.json') - print("Output directory is: ", OPATH) # If output is enabled, the user wants this information as last output + subparsers = parser.add_subparsers(required=True) + + # Shortcut for common sim args + sim_shortcuts = { + "partitions": "x", + "cooling": "c", + "simulate-network": "net", + "fastforward": "ff", + "time": "t", + "debug": "d", + "numjobs": "n", + "verbose": "v", + "output": "o", + "uncertainties": "u", + "plot": "p", + "replay": "f", + "workload": "w", + } + + # ==== raps run ==== + cmd_run = subparsers.add_parser("run", description=""" + Run single-partition (homogeneous) systems. Supports synthetic workload generation or + telemetry replay, dynamic power modeling (including conversion losses), and optional + coupling to a thermo-fluids cooling model. Produces performance, utilization, and + energy metrics, with optional plots and output files for analysis and validation. + """) + cmd_run.add_argument("config_file", nargs="?", default=None, help=""" + YAML sim config file, can be used to configure an experiment instead of using CLI + flags. Pass "-" to read from stdin. + """) + cmd_run_validate = pydantic_add_args(cmd_run, SimConfig, model_config={ + **CLI_CONFIG, + "cli_shortcuts": sim_shortcuts, + }) + + def cmd_run_func(args): + sim_config = cmd_run_validate(args, read_sim_yaml(args.config_file)) + run_sim(sim_config) + cmd_run.set_defaults(func=cmd_run_func) + + # ==== raps run-multi-part ==== + # It might make sense to combine these into a single entrypoint. Though the multi-part run + # #doesn't support UI or the same output options. + cmd_run_multi_part = subparsers.add_parser("run-multi-part", description=""" + Simulates multi-partition (heterogeneous) systems. Supports replaying telemetry or + generating synthetic workloads across CPU-only, GPU, and mixed partitions. Initializes + per-partition power, FLOPS, and scheduling models, then advances simulations in lockstep. + Outputs per-partition performance, utilization, and energy statistics for systems such as + MIT Supercloud, Setonix, Adastra, and LUMI. + """) + cmd_run_multi_part.add_argument("config_file", nargs="?", default=None, help=""" + YAML sim config file, can be used to configure an experiment instead of using CLI + flags. Pass "-" to read from stdin. + """) + cmd_run_multi_part_validate = pydantic_add_args(cmd_run_multi_part, SimConfig, model_config={ + **CLI_CONFIG, + "cli_shortcuts": sim_shortcuts, + }) + + def cmd_run_multi_part_func(args): + sim_config = cmd_run_multi_part_validate(args, read_sim_yaml(args.config_file)) + run_multi_part_sim(sim_config) + cmd_run_multi_part.set_defaults(func=cmd_run_multi_part_func) + + # ==== raps show ==== + cmd_show = subparsers.add_parser("show", description=""" + Outputs the given CLI args as a YAML config file that can be used to re-run the same + simulation. + """) + cmd_show.add_argument("config_file", nargs="?", default=None, help=""" + Input YAML sim config file. Can be used to slightly modify an existing sim config. + """) + cmd_show.add_argument("--show-defaults", default=False, help=""" + If true, include defaults in the output YAML + """) + cmd_show_validate = pydantic_add_args(cmd_show, SimConfig, model_config={ + **CLI_CONFIG, + "cli_shortcuts": sim_shortcuts, + }) + + def cmd_show_func(args): + sim_config = cmd_show_validate(args, read_sim_yaml(args.config_file)) + sim_config = sim_config.model_dump(mode="json", + exclude_defaults=not args.show_defaults) + print(yaml_dump(sim_config), end="") + cmd_show.set_defaults(func=cmd_show_func) + + # ==== raps workload ==== + # TODO: Separate the arguments for this command + cmd_workload = subparsers.add_parser("workload", description=""" + Saves workload as a snapshot. + """) + cmd_workload.add_argument("config_file", nargs="?", default=None, help=""" + YAML sim config file, can be used to configure an experiment instead of using CLI + flags. Pass "-" to read from stdin. + """) + cmd_workload_validate = pydantic_add_args(cmd_workload, SimConfig, model_config={ + **CLI_CONFIG, + "cli_shortcuts": sim_shortcuts, + }) + + def cmd_workload_func(args): + sim_config = cmd_workload_validate(args, read_sim_yaml(args.config_file)) + run_workload(sim_config) + cmd_show.set_defaults(func=cmd_workload_func) + + # ==== raps telemetry ==== + cmd_telemetry = subparsers.add_parser("telemetry", description=""" + Telemetry data validator + """) + run_telemetry_add_args(cmd_telemetry) + cmd_telemetry.set_defaults(func=run_telemetry) + + # TODO: move telemetry and other misc scripts into here + + args = parser.parse_args() + args.func(args) if __name__ == "__main__": diff --git a/multi-part-sim-mpi.py b/multi-part-sim-mpi.py deleted file mode 100644 index eabb19b002f28f97329ed13eed7261bcf811912e..0000000000000000000000000000000000000000 --- a/multi-part-sim-mpi.py +++ /dev/null @@ -1,170 +0,0 @@ -""" -MPI-enabled driver for simulating multi-partition RAPS systems. -Distributes partitions across ranks with mpi4py for parallel run. -Supports telemetry replay or synthetic workloads with per-rank -power, FLOPS, and scheduling models. Outputs debug and summary -stats for heterogeneous systems (e.g., LUMI, Setonix, Adastra). -""" - -from tqdm import tqdm -from mpi4py import MPI -from raps.utils import next_arrival -from raps.workload import Workload -from raps.telemetry import Telemetry -from raps.power import PowerManager, compute_node_power -from raps.flops import FLOPSManager -from raps.engine import Engine -from raps.ui import LayoutManager -from raps.system_config import get_partition_configs -from raps.sim_config import args -import random -from raps.helpers import check_python_version -check_python_version() - - -def main(): - comm = MPI.COMM_WORLD - rank = comm.Get_rank() - size = comm.Get_size() - - # 3) Load configs for every partition (all ranks do this) - multi_config = get_partition_configs(args.partitions) - partition_names = multi_config.partition_names - configs = [c.get_legacy() for c in multi_config.partitions] - args_dicts = [{**vars(args), 'config': cfg} for cfg in configs] - - # 4) Each rank decides which partition‐indices it owns (round-robin): - local_partition_indices = [i for i in range(len(partition_names)) if (i % size) == rank] - local_partition_names = [partition_names[i] for i in local_partition_indices] - # local_configs = [configs[i] for i in local_partition_indices] # Unused - # local_args_dicts = [args_dicts[i] for i in local_partition_indices] # Unused - - # 5) Rank 0 builds (or loads) the entire job list, assigns partitions, groups by partition, - # then scatters exactly those jobs to each rank. Other ranks just sit in the scatter: - if rank == 0: - # --- a) “REPLAY” branch? - if args.replay: - td = Telemetry(**args_dicts[0]) - print(f"[rank 0] Loading telemetry from {args.replay[0]}…") - jobs_full = td.load_snapshot(args.replay[0]) - available_nodes = [c['AVAILABLE_NODES'] for c in configs] - for job in jobs_full: - job['partition'] = random.choices(partition_names, weights=available_nodes, k=1)[0] - if args.scale: - for job in tqdm(jobs_full, desc="[rank 0] Scaling jobs…"): - job['nodes_required'] = random.randint(1, args.scale) - job['requested_nodes'] = None - if args.arrival == 'poisson': - for job in tqdm(jobs_full, desc="[rank 0] Rescheduling arrivals…"): - p_name = job['partition'] - p_cfg = configs[partition_names.index(p_name)] - job['requested_nodes'] = None - job['submit_time'] = next_arrival(1 / p_cfg['JOB_ARRIVAL_TIME']) - - # --- b) “SYNTHETIC WORKLOAD” branch: - else: - wl = Workload(*configs) - jobs_full = getattr(wl, args.workload)(num_jobs=args.numjobs) - available_nodes = [c['AVAILABLE_NODES'] for c in configs] - for job in jobs_full: - job['partition'] = random.choices(partition_names, weights=available_nodes, k=1)[0] - - # --- c) Group “jobs_full” by partition name: - jobs_by_partition = {p: [] for p in partition_names} - for job in jobs_full: - jobs_by_partition[job['partition']].append(job) - - # --- d) Build a list-of-lists, one list per rank, containing the union - # of all jobs for that rank’s partitions: - jobs_for_rank = [[] for _ in range(size)] - for p_idx, p_name in enumerate(partition_names): - tgt = p_idx % size - jobs_for_rank[tgt].extend(jobs_by_partition[p_name]) - else: - jobs_for_rank = None - - # 6) Scatter the per-rank job lists: - local_jobs = comm.scatter(jobs_for_rank, root=0) - - # 7) Re‐group each rank’s “local_jobs” into a dict keyed by its local_partition_names: - local_jobs_by_partition = {p: [] for p in local_partition_names} - for job in local_jobs: - local_jobs_by_partition[job['partition']].append(job) - - # 8) Build one LayoutManager (and Engine/PowerManager/FLOPSManager) per local partition: - layout_managers = {} - for idx, p_name in enumerate(local_partition_names): - global_idx = local_partition_indices[idx] - cfg = configs[global_idx] - ad = args_dicts[global_idx] - - pm = PowerManager(compute_node_power, **cfg) - fm = FLOPSManager(**ad) - sc = Engine(power_manager=pm, flops_manager=fm, - cooling_model=None, **ad) - - layout_managers[p_name] = LayoutManager(args.layout, - engine=sc, - debug=args.debug, - **cfg) - - # 9) Compute timestep_start / timestep_end (all ranks agree): - if args.fastforward: - fastforward = args.fastforward - else: - fastforward = 0 - - if args.time: - timesteps = args.time - else: - timesteps = 88200 # default 24 hours - - timestep_start = fastforward - timestep_end = timestep_start + timesteps - - # 10) Build a generator for each partition that this rank owns: - local_generators = {} - for p_name in local_partition_names: - gen = layout_managers[p_name].run_stepwise( - local_jobs_by_partition[p_name], - timestep_start=timestep_start, - timestep_end=timestep_end - ) - local_generators[p_name] = gen - - # 11) Main simulation loop (every rank steps its own partitions in lockstep): - UIF = configs[0]['UI_UPDATE_FREQ'] # assume same for all configs - for t in range(timesteps): - # --- a) Advance each local partition’s generator - for gen in local_generators.values(): - try: - next(gen) - except StopIteration: - pass - - # --- b) Every UI_UPDATE_FREQ, do per-rank prints + one global reduction - if (t % UIF) == 0: - # 1) sum our local sys_power - local_sys_power = sum(lm.engine.sys_power for lm in layout_managers.values()) - - # 2) print *our* partition‐level info now (so rank 0 and rank 1 will both print): - for p_name, lm in layout_managers.items(): - sys_util = lm.engine.sys_util_history[-1] if lm.engine.sys_util_history else 0.0 - print(f"[DEBUG][rank {rank}] {p_name} – Timestep {t} – " - f"Jobs running: {len(lm.engine.running)} – " - f"Utilization: {sys_util[1]:.2f}% – " - f"Power: {lm.engine.sys_power:.1f}kW") - - # 3) do an MPI reduce so that rank 0 knows the total across all ranks: - total_sys_power = comm.reduce(local_sys_power, op=MPI.SUM, root=0) - if rank == 0: - print(f"[DEBUG][rank {rank}] TOTAL system power (all partitions): {total_sys_power:.1f}kW") - - # 12) Final barrier + exit message on rank 0 - comm.Barrier() - if rank == 0: - print("Simulation complete (all ranks).") - - -if __name__ == "__main__": - main() diff --git a/multi-part-sim.py b/multi-part-sim.py deleted file mode 100644 index 587dffb7d93f42dd0880f6931e548066b5d3e7d8..0000000000000000000000000000000000000000 --- a/multi-part-sim.py +++ /dev/null @@ -1,185 +0,0 @@ -""" -Main driver for simulating multi-partition (heterogeneous) systems in the RAPS -module of ExaDigiT. Supports replaying telemetry or generating synthetic -workloads across CPU-only, GPU, and mixed partitions. Initializes per- -partition power, FLOPS, and scheduling models, then advances simulations in -lockstep. Outputs per-partition performance, utilization, and energy -statistics for systems such as MIT Supercloud, Setonix, Adastra, and LUMI. -""" - -from tqdm import tqdm -from raps.stats import get_engine_stats, get_job_stats, get_scheduler_stats, get_network_stats -from raps.utils import next_arrival -from raps.workload import Workload -from raps.telemetry import Telemetry -from raps.power import PowerManager, compute_node_power -from raps.flops import FLOPSManager -from raps.engine import Engine -from raps.ui import LayoutManager -from raps.system_config import get_partition_configs -from raps.sim_config import args -import random -import os -from raps.helpers import check_python_version -check_python_version() - - -# Load configurations for each partition -multi_config = get_partition_configs(args.partitions) -partition_names = multi_config.partition_names -configs = [c.get_legacy() for c in multi_config.partitions] -args.system = multi_config.system_name - -args_dicts = [ - {**vars(args), 'config': config, 'partition': partition_names[i]} - for i, config in enumerate(configs) -] - -# Initialize Workload -if args.replay: - - jobs_by_partition = {} - t0_by_partition = {} - t1_by_partition = {} - - if args.replay[0].endswith('.npz'): - # snapshot mode: pick the right .npz for each partition - snap_map = {os.path.basename(p): p for p in args.replay} - for ad in args_dicts: - part = ad['partition'] # e.g. 'mit_supercloud/part-cpu' - short = part.split('/')[-1] # 'part-cpu' - snap_file = f"{short}.npz" - if snap_file not in snap_map: - raise RuntimeError(f"Snapshot '{snap_file}' not in {args.replay}") - td = Telemetry(**ad) - print(f"[{part}] loading snapshot {snap_file} …") - jobs_part, t0, t1, args_from_file = td.load_snapshot(snap_map[snap_file]) - jobs_by_partition[part] = jobs_part - else: - # raw load_data mode - for ad in args_dicts: - part = ad['partition'] - td = Telemetry(**ad) - print(f"\n[{part}] loading traces from {args.replay[0]} …") - jobs_part, t0, t1 = td.load_data(args.replay) - jobs_by_partition[part] = jobs_part - # td.save_snapshot(jobs_part, t0, t1, args_from_file, filename=part.split('/')[-1]) - # Check if args need to be extracted or merged! Not implemented yet! - td.save_snapshot(jobs=jobs_part, timestep_start=t0, timestep_end=t1, - filename=part.split('/')[-1], args=args) - - # --- report how many jobs per partition --- - for part, jl in jobs_by_partition.items(): - print(f"[INFO] Partition '{part}': {len(jl)} jobs loaded") - - # now flatten into a single job list (or keep separate for your engine) - all_jobs_flat = [] - for part in partition_names: - for job in jobs_by_partition[part]: - job.partition = part - all_jobs_flat.append(job) - - total_initial_jobs = len(all_jobs_flat) - jobs = all_jobs_flat - - if args.scale: - for job in tqdm(jobs, desc=f"Scaling jobs to {args.scale} nodes"): - job.nodes_required = random.randint(1, args.scale) - - if args.arrival == 'poisson': - for job in tqdm(jobs, desc="Adjusting job submission time"): - partition = job.partition - partition_config = configs[partition_names.index(partition)] - job.submit_time = next_arrival(1 / partition_config['JOB_ARRIVAL_TIME']) - -else: # Synthetic workload - wl = Workload(args, *configs) - - total_initial_jobs = args.numjobs - - # Generate jobs based on workload type - jobs = getattr(wl, args.workload)(args=args) - -# Group jobs by partition -jobs_by_partition = {partition: [] for partition in partition_names} -for job in jobs: - jobs_by_partition[job.partition].append(job) - -# Initialize layout managers for each partition -layout_managers = {} -for i, (config, ad) in enumerate(zip(configs, args_dicts)): - pm = PowerManager(compute_node_power, **configs[i]) - fm = FLOPSManager(**args_dicts[i]) - sc = Engine(power_manager=pm, flops_manager=fm, cooling_model=None, - jobs=jobs_by_partition[config['system_name']], total_initial_jobs=total_initial_jobs, **args_dicts[i]) - layout_managers[config['system_name']] = LayoutManager( - args.layout, engine=sc, debug=args.debug, args_dict=ad, **config) - -# Set simulation timesteps -if args.fastforward: - fastfoward = args.fastforward -else: - fastforward = 0 -if args.time: - timesteps = args.time -else: - timesteps = 88200 # Default to 24 hours - -timestep_start = fastforward -timestep_end = timestep_start + timesteps - -if args.time_delta: - time_delta = args.time_delta -else: - time_delta = config['TRACE_QUANTA'] - -# Create generators for each layout manager -generators = {name: lm.run_stepwise(jobs_by_partition[name], - timestep_start=timestep_start, - timestep_end=timestep_end, - time_delta=time_delta) - for name, lm in layout_managers.items()} - -# Step through all generators in lockstep -for timestep in range(timesteps): - for name, gen in generators.items(): - next(gen) # Advance each generator - - # Print debug info every UI_UPDATE_FREQ - if timestep % configs[0]['UI_UPDATE_FREQ'] == 0: # Assuming same frequency for all partitions - sys_power = 0 - for name, lm in layout_managers.items(): - sys_util = lm.engine.sys_util_history[-1] if lm.engine.sys_util_history else (0, 0.0) - if hasattr(lm.engine.resource_manager, 'allocated_cpu_cores'): - allocated_cores = lm.engine.resource_manager.allocated_cpu_cores - print(f"[DEBUG] {name} - Timestep {timestep} - Jobs running: {len(lm.engine.running)} -", - f"Utilization: {sys_util[1]:.2f}% - Allocated Cores: {allocated_cores} - ", - f"Power: {lm.engine.sys_power:.1f}kW", flush=True) - sys_power += lm.engine.sys_power - print(f"system power: {sys_power:.1f}kW", flush=True) - -print("Simulation complete.", flush=True) - -# Print statistics for each partition -for name, lm in layout_managers.items(): - print(f"\n=== Partition: {name} ===") - - engine_stats = get_engine_stats(lm.engine) - job_stats = get_job_stats(lm.engine) - scheduler_stats = get_scheduler_stats(lm.engine) - if args.simulate_network: - network_stats = get_network_stats(lm.engine) - - # Print a formatted report - print("\n--- Simulation Report ---") - for key, value in engine_stats.items(): - print(f"{key.replace('_', ' ').title()}: {value}") - print("-------------------------\n") - print("\n--- Job Stat Report ---") - for key, value in job_stats.items(): - print(f"{key.replace('_', ' ').title()}: {value}") - print("-------------------------\n") - print("\n--- Scheduler Report ---") - for key, value in scheduler_stats.items(): - print(f"{key.replace('_', ' ').title()}: {value}") - print("-------------------------") diff --git a/pyproject.toml b/pyproject.toml index b7fbb99fdf5ee3848be4f37d06c637b36d81da80..f396280caab9911fba348a2eca34713804312fcf 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -32,3 +32,6 @@ dependencies = [ "pydantic-settings>=2.10.1", "pre-commit" ] + +[project.scripts] +raps = "main:main" diff --git a/raps/constants.py b/raps/constants.py index 0cdd2fd5f90296df9d4388a252573a32505712b0..53711e1c0bfc38df15662219864c81d7974b9fef 100644 --- a/raps/constants.py +++ b/raps/constants.py @@ -5,4 +5,3 @@ from pathlib import Path ELLIPSES = '\u2026' OUTPUT_PATH = Path('simulation_results') -SEED = 42 diff --git a/raps/downtime.py b/raps/downtime.py index 97c9139114a004ed7c275852c4eddbd30b57b69c..ae8b82dd7b54a9fe4f3b8bb380ace74fe2e69b64 100644 --- a/raps/downtime.py +++ b/raps/downtime.py @@ -1,7 +1,6 @@ from __future__ import annotations from typing import TYPE_CHECKING from raps.job import JobState -from raps.sim_config import args, sim_config import numpy as np @@ -15,6 +14,7 @@ class Downtime: first_downtime, downtime_interval, downtime_length, + debug=False ): self.skip = False if downtime_length == 0 or downtime_interval == 0 or \ @@ -25,6 +25,7 @@ class Downtime: self.start: int = first_downtime self.end: int = 0 self.down: bool = False + self.debug = debug def check_and_trigger(self, *, timestep: int, @@ -46,7 +47,7 @@ class Downtime: def simulate_down(self, *, engine: Engine ): - if args.debug: + if self.debug: print("Simulated downtime: before downtime start") print(f"Running: {len(engine.running)}, queued: {len(engine.queue)}") @@ -66,7 +67,7 @@ class Downtime: engine.queue += engine.running engine.running = [] - if args.debug: + if self.debug: print("Simulated downtime: after downtime start") print(f"Running: {len(engine.running)}, queued: {len(engine.queue)}") self.down = True diff --git a/raps/engine.py b/raps/engine.py index f79b140c8ddb262e8f12dc26a80618b5b238ce93..f6da02c749cd76b30ddf48eb50f8196a56cf49f9 100644 --- a/raps/engine.py +++ b/raps/engine.py @@ -1,6 +1,7 @@ from typing import Optional, List import dataclasses import pandas as pd +import numpy as np import threading import sys import tty @@ -8,7 +9,8 @@ import termios import os import select import time - +import random +import math from raps.job import Job, JobState from raps.policy import PolicyType from raps.utils import ( @@ -17,14 +19,28 @@ from raps.utils import ( ) from raps.resmgr import ResourceManager from raps.schedulers import load_scheduler -from raps.power import record_power_stats_foreach_job +from raps.power import ( + PowerManager, + compute_node_power, + compute_node_power_validate, + record_power_stats_foreach_job, + compute_node_power_uncertainties, + compute_node_power_validate_uncertainties, +) from raps.network import ( NetworkModel, apply_job_slowdown, compute_system_network_stats ) -from raps.workload import continuous_job_generation +from raps.telemetry import Telemetry +from raps.cooling import ThermoFluidsModel +from raps.flops import FLOPSManager +from raps.workload import Workload, continuous_job_generation +from raps.account import Accounts from raps.downtime import Downtime +from raps.weather import Weather +from raps.sim_config import SimConfig +from raps.system_config import SystemConfig @dataclasses.dataclass @@ -109,15 +125,20 @@ def keyboard_listener(state): class Engine: """Job scheduling simulation engine.""" - def __init__(self, *, power_manager, - flops_manager, - cooling_model=None, - config, + def __init__(self, *, + power_manager: PowerManager, + flops_manager: FLOPSManager, + telemetry: Telemetry, + cooling_model: ThermoFluidsModel | None = None, jobs=None, total_initial_jobs=0, - continuous_workload=None, # Workload class to generate from for continuous generation - **kwargs): - self.config = config + # Workload class to generate from for continuous generation + continuous_workload: Workload | None = None, + accounts=None, + sim_config: SimConfig, + system_config: SystemConfig, + ): + self.config = system_config.get_legacy() self.down_nodes = summarize_ranges(self.config['DOWN_NODES']) self.resource_manager = ResourceManager( total_nodes=self.config['TOTAL_NODES'], @@ -127,7 +148,8 @@ class Engine: # Initialize running and queue, etc. self.running = [] self.queue = [] - self.accounts = None + self.accounts = accounts + self.telemetry = telemetry self.job_history_dict = [] self.jobs_completed = 0 self.jobs_killed = 0 @@ -137,12 +159,12 @@ class Engine: self.sys_power = 0 self.power_manager = power_manager self.flops_manager = flops_manager - self.debug = kwargs.get('debug') + self.debug = sim_config.debug self.continuous_workload = continuous_workload - self.output = kwargs.get('output') - self.replay = kwargs.get('replay') - self.downscale = kwargs.get('downscale', 1) # Factor to downscale the 1s timesteps (power of 10) - self.simulate_network = kwargs.get('simulate_network') + self.output = sim_config.output + self.replay = sim_config.replay + self.downscale = sim_config.downscale # Factor to downscale the 1s timesteps (power of 10) + self.simulate_network = sim_config.simulate_network self.sys_util_history = [] self.scheduler_queue_history = [] self.scheduler_running_history = [] @@ -152,18 +174,20 @@ class Engine: self.avg_slowdown_history = [] self.max_slowdown_history = [] self.node_occupancy_history = [] - self.downtime = Downtime(first_downtime=kwargs.get('downtime_first'), - downtime_interval=kwargs.get('downtime_interval'), - downtime_length=kwargs.get('downtime_length')) + self.downtime = Downtime(first_downtime=sim_config.downtime_first, + downtime_interval=sim_config.downtime_interval, + downtime_length=sim_config.downtime_length, + debug=sim_config.debug, + ) # Set scheduler type - either based on config or command-line args - defaults to 'default' if self.config['multitenant']: scheduler_type = 'multitenant' else: - scheduler_type = kwargs.get('scheduler', 'default') + scheduler_type = sim_config.scheduler - policy_type = kwargs.get('policy') - backfill_type = kwargs.get('backfill') + policy_type = sim_config.policy + backfill_type = sim_config.backfill self.scheduler = load_scheduler(scheduler_type)( config=self.config, @@ -172,7 +196,7 @@ class Engine: resource_manager=self.resource_manager, jobs=jobs ) - if kwargs.get('live'): + if sim_config.live: assert self.scheduler.policy != PolicyType.REPLAY, \ "Cannot replay from a live system. Choose a scheduling policy!" print(f"Using scheduler: {str(self.scheduler.__class__).split('.')[2]}" @@ -181,10 +205,141 @@ class Engine: if self.simulate_network: available_nodes = self.resource_manager.available_nodes - self.network_model = NetworkModel(available_nodes=available_nodes, config=config, kwargs=kwargs) + self.network_model = NetworkModel( + available_nodes=available_nodes, + config=self.config, + ) else: self.network_model = None + @staticmethod + def from_sim_config(sim_config: SimConfig, partition: str | None = None): + if partition: + system_config_by_name = {s.system_name: s for s in sim_config.system_configs} + system_config = system_config_by_name.get(partition) + if not system_config: + raise ValueError(f"Partition {partition} isn't in SimConfig") + elif len(sim_config.system_configs) > 1: + raise ValueError( + "Engine can only run single-partition simulations. Use MultiPartEngine for " + + "multi-partition simulations, or pass partition to select the partition to run." + ) + else: + system_config = sim_config.system_configs[0] + + # Some temporary backwards/compatibility wrappers + system_config_dict = system_config.get_legacy() + sim_config_args = sim_config.get_legacy_args() + sim_config_dict = sim_config.get_legacy_args_dict() + sim_config_dict['config'] = system_config_dict + if partition: + sim_config_dict["system"] = sim_config.system_name + + if sim_config.seed: + random.seed(sim_config.seed) + np.random.seed(sim_config.seed + 1) + + if sim_config.cooling: + cooling_model = ThermoFluidsModel(**system_config_dict) + cooling_model.initialize() + if sim_config.start: + cooling_model.weather = Weather(sim_config.start, config=system_config_dict) + else: + cooling_model = None + + if sim_config.power_scope == 'node': + if sim_config.uncertainties: + power_manager = PowerManager(compute_node_power_validate_uncertainties, **system_config_dict) + else: + power_manager = PowerManager(compute_node_power_validate, **system_config_dict) + else: + if sim_config.uncertainties: + power_manager = PowerManager(compute_node_power_uncertainties, **system_config_dict) + else: + power_manager = PowerManager(compute_node_power, **system_config_dict) + + flops_manager = FLOPSManager( + config=system_config_dict, + validate=(sim_config.power_scope == "node"), + ) + + if sim_config.live and not sim_config.replay: + td = Telemetry(**sim_config_dict) + jobs, timestep_start, timestep_end = \ + td.load_jobs_times_args_from_live_system() + elif sim_config.replay: + # TODO: this will have issues if running separate systems or custom systems + partition_short = partition.split("/")[-1] if partition else None + td = Telemetry( + **sim_config_dict, + partition=partition, + ) + if partition: + snap_map = {p.stem: p for p in sim_config.replay[0].glob("*.npz")} + if len(snap_map) > 0: + if partition_short not in snap_map: + raise RuntimeError(f"Snapshot '{partition_short}.npz' not in {sim_config.replay[0]}") + replay_files = snap_map[partition_short] + else: + replay_files = sim_config.replay + else: + replay_files = sim_config.replay + + jobs, timestep_start, timestep_end, args_from_file = td.load_jobs_times_args_from_files( + files=replay_files, + args=sim_config_args, config=system_config_dict, + ) + else: # Synthetic jobs + wl = Workload(sim_config_args, system_config_dict) + jobs = wl.generate_jobs() + timestep_start = 0 + if hasattr(jobs[0], 'end_time'): + timestep_end = int(math.ceil(max([job.end_time for job in jobs]))) + else: + timestep_end = 88200 # 24 hours + + td = Telemetry(**sim_config_dict) + + # TODO refactor how stat/end/fastforward/time work + if sim_config.fastforward is not None: + timestep_start = timestep_start + sim_config.fastforward + + if sim_config.time is not None: + timestep_end = timestep_start + sim_config.time + + if sim_config.time_delta is not None: + time_delta = sim_config.time_delta + else: + time_delta = 1 + + if sim_config.continuous_job_generation: + continuous_workload = wl + else: + continuous_workload = None + + accounts = None + if sim_config.accounts: + job_accounts = Accounts(jobs) + if sim_config.accounts_json: + loaded_accounts = Accounts.from_json_filename(sim_config.accounts_json) + accounts = Accounts.merge(loaded_accounts, job_accounts) + else: + accounts = job_accounts + + engine = Engine( + power_manager=power_manager, + flops_manager=flops_manager, + cooling_model=cooling_model, + continuous_workload=continuous_workload, + jobs=jobs, + accounts=accounts, + telemetry=td, + sim_config=sim_config, + system_config=system_config, + ) + + return engine, jobs, timestep_start, timestep_end, time_delta + def add_running_jobs_to_queue(self, jobs_to_submit: List): """ Modifies jobs_to_submit and self.queue diff --git a/raps/multi_part_engine.py b/raps/multi_part_engine.py new file mode 100644 index 0000000000000000000000000000000000000000..461425b4cadaac906c641c6c33adbe65e4d1079e --- /dev/null +++ b/raps/multi_part_engine.py @@ -0,0 +1,54 @@ +from collections.abc import Iterable +from raps.engine import Engine, TickData +from raps.sim_config import SimConfig + + +class MultiPartEngine: + def __init__(self, engines: dict[str, Engine], jobs: dict[str, list]): + self.partition_names = sorted(engines.keys()) + self.engines = engines + self.jobs = jobs + + @staticmethod + def from_sim_config(sim_config: SimConfig): + if sim_config.replay: + root_systems = set(s.system_name.split("/")[0] for s in sim_config.system_configs) + # TODO should consider how to pass separate replay values for separate systems + if len(root_systems) > 1: + raise ValueError("Replay for multi-system runs is not supported") + + jobs_by_partition = {} + engines: dict[str, Engine] = {} + + timestep_start, timestep_end, time_delta = 0, 0, 0 + for partition in sim_config.system_configs: + name = partition.system_name + engine, jobs, timestep_start, timestep_end, time_delta = Engine.from_sim_config( + sim_config, partition=name, + ) + for job in jobs: + job.partition = name + jobs_by_partition[name] = jobs + engines[name] = engine + total_initial_jobs = sum(len(j) for j in jobs_by_partition.values()) + for engine in engines.values(): + engine.total_initial_jobs = total_initial_jobs + + multi_engine = MultiPartEngine( + engines=engines, + jobs=jobs_by_partition, + ) + + return multi_engine, jobs_by_partition, timestep_start, timestep_end, time_delta + + def run_simulation(self, jobs: dict, timestep_start, timestep_end, time_delta=1 + ) -> Iterable[dict[str, TickData | None]]: + generators = [] + for part in self.partition_names: + generators.append(self.engines[part].run_simulation( + jobs[part], timestep_start, timestep_end, time_delta, + )) + for tick_datas in zip(*generators, strict=True): + yield dict(zip(self.partition_names, tick_datas)) + + # TODO need to add a mode to run the partitions in parallel diff --git a/raps/network.py b/raps/network.py index 9c457d49225b9f1943c9fd25dc98e6c8380edcaa..b4340e40dee66f0ce5bbeed4798153957c7e539d 100644 --- a/raps/network.py +++ b/raps/network.py @@ -8,7 +8,7 @@ from pathlib import Path class NetworkModel: """ """ - def __init__(self, *, available_nodes, config, **kwargs): + def __init__(self, *, available_nodes, config): self.topology = config.get("TOPOLOGY") # if fat-tree, build the graph once if self.topology == "fat-tree": diff --git a/raps/plotting.py b/raps/plotting.py index 61a8f969a6857087535f1719cf52aaf539076c2c..606ac7a5d3de1bcf948e8ea2b227683c984f6de2 100644 --- a/raps/plotting.py +++ b/raps/plotting.py @@ -14,7 +14,7 @@ Plotter """ import itertools - +from pathlib import Path import matplotlib.pyplot as plt import matplotlib.ticker as ticker from matplotlib.ticker import MaxNLocator @@ -95,7 +95,7 @@ class Plotter(BasePlotter): The path to save the plot. """ - def __init__(self, xlabel='', ylabel='', title='', save_path='out.svg', uncertainties=False): + def __init__(self, xlabel='', ylabel='', title='', save_path: Path | str = 'out.svg', uncertainties=False): """ Constructs all the necessary attributes for the Plotter object. diff --git a/raps/run_sim.py b/raps/run_sim.py new file mode 100644 index 0000000000000000000000000000000000000000..68229ada2b7286027642ea411794ecf38c40011e --- /dev/null +++ b/raps/run_sim.py @@ -0,0 +1,236 @@ +""" +Module containing the primary commands for use in the CLI. The simulation logic itself is kept in +Engine and MultiPartEngine so that it can be used programmatically such as in the simulation server. +These functions just handle rendering the terminal UI and outputting results to files etc. +""" +import json +import pandas as pd +import sys +from raps.ui import LayoutManager +from raps.plotting import Plotter +from raps.engine import Engine +from raps.multi_part_engine import MultiPartEngine +from raps.utils import write_dict_to_file +from raps.stats import ( + get_engine_stats, + get_job_stats, + get_scheduler_stats, + get_network_stats, + print_formatted_report +) + +from raps.sim_config import SimConfig + + +def run_sim(sim_config: SimConfig): + if sim_config.verbose or sim_config.debug: + print(f"SimConfig: {sim_config.model_dump_json(indent=4)}") + if len(sim_config.system_configs) > 1: + print("Use run-multi-part to run multi-partition simulations") + sys.exit(1) + + engine, jobs, timestep_start, timestep_end, time_delta = Engine.from_sim_config(sim_config) + + out = sim_config.output + if out: + out.mkdir(parents=True) + engine.telemetry.save_snapshot( + jobs=jobs, + timestep_start=timestep_start, + timestep_end=timestep_end, + args=sim_config.get_legacy_args(), filename=str(out), + ) + + total_timesteps = timestep_end - timestep_start + + downscale = sim_config.downscale + downscale_str = ""if downscale == 1 else f"/{downscale}" + print(f"Simulating {len(jobs)} jobs for {total_timesteps}{downscale_str}" + f" seconds from {timestep_start} to {timestep_end}.") + print(f"Simulation time delta: {time_delta}{downscale_str} s," + f"Telemetry trace quanta: {jobs[0].trace_quanta}{downscale_str} s.") + layout_manager = LayoutManager( + sim_config.layout, engine=engine, + debug=sim_config.debug, total_timesteps=total_timesteps, + args_dict=sim_config.get_legacy_args_dict(), **sim_config.system_configs[0].get_legacy(), + ) + layout_manager.run( + jobs, + timestep_start=timestep_start, timestep_end=timestep_end, time_delta=time_delta, + ) + + engine_stats = get_engine_stats(engine) + job_stats = get_job_stats(engine) + scheduler_stats = get_scheduler_stats(engine) + if engine.simulate_network: + network_stats = get_network_stats(engine) + else: + network_stats = None + + print_formatted_report( + engine_stats=engine_stats, + job_stats=job_stats, + scheduler_stats=scheduler_stats, + network_stats=network_stats, + ) + + if downscale_str: + downscale_str = "1" + downscale_str + + if sim_config.plot: + assert out # SimConfig validation should check this + if 'power' in sim_config.plot: + pl = Plotter(f"Time ({downscale_str}s)", 'Power (kW)', 'Power History', + out / f'power.{sim_config.imtype}', + uncertainties=sim_config.uncertainties) + x, y = zip(*engine.power_manager.history) + pl.plot_history(x, y) + + if 'util' in sim_config.plot: + pl = Plotter(f"Time ({downscale_str}s)", 'System Utilization (%)', + 'System Utilization History', out / f'util.{sim_config.imtype}') + x, y = zip(*engine.sys_util_history) + pl.plot_history(x, y) + + if 'loss' in sim_config.plot: + pl = Plotter(f"Time ({downscale_str}s)", 'Power Losses (kW)', 'Power Loss History', + out / f'loss.{sim_config.imtype}', + uncertainties=sim_config.uncertainties) + x, y = zip(*engine.power_manager.loss_history) + pl.plot_history(x, y) + + pl = Plotter(f"Time ({downscale_str}s)", 'Power Losses (%)', 'Power Loss History', + out / f'loss_pct.{sim_config.imtype}', + uncertainties=sim_config.uncertainties) + x, y = zip(*engine.power_manager.loss_history_percentage) + pl.plot_history(x, y) + + if 'pue' in sim_config.plot: + if engine.cooling_model: + ylabel = 'pue' + title = 'FMU ' + ylabel + 'History' + pl = Plotter(f"Time ({downscale_str}s)", ylabel, title, + out / f'pue.{sim_config.imtype}', + uncertainties=sim_config.uncertainties) + df = pd.DataFrame(engine.cooling_model.fmu_history) + df.to_parquet('cooling_model.parquet', engine='pyarrow') + pl.plot_history(df['time'], df[ylabel]) + else: + print('Cooling model not enabled... skipping output of plot') + + if 'temp' in sim_config.plot: + if engine.cooling_model: + ylabel = 'Tr_pri_Out[1]' + title = 'FMU ' + ylabel + 'History' + pl = Plotter(f"Time ({downscale_str}s)", ylabel, title, out / 'temp.svg') + df = pd.DataFrame(engine.cooling_model.fmu_history) + df.to_parquet('cooling_model.parquet', engine='pyarrow') + pl.plot_compare(df['time'], df[ylabel]) + else: + print('Cooling model not enabled... skipping output of plot') + + if out: + if sim_config.uncertainties: + # Parquet cannot handle annotated ufloat format AFAIK + print('Data dump not implemented using uncertainties!') + else: + if engine.cooling_model: + df = pd.DataFrame(engine.cooling_model.fmu_history) + df.to_parquet(out / 'cooling_model.parquet', engine='pyarrow') + + df = pd.DataFrame(engine.power_manager.history) + df.to_parquet(out / 'power_history.parquet', engine='pyarrow') + + df = pd.DataFrame(engine.power_manager.loss_history) + df.to_parquet(out / 'loss_history.parquet', engine='pyarrow') + + df = pd.DataFrame(engine.sys_util_history) + df.to_parquet(out / 'util.parquet', engine='pyarrow') + + # Schedule history + job_history = pd.DataFrame(engine.get_job_history_dict()) + job_history.to_csv(out / "job_history.csv", index=False) + + scheduler_running_history = pd.DataFrame(engine.get_scheduler_running_history()) + scheduler_running_history.to_csv(out / "running_history.csv", index=False) + scheduler_queue_history = pd.DataFrame(engine.get_scheduler_running_history()) + scheduler_queue_history.to_csv(out / "queue_history.csv", index=False) + + try: + with open(out / 'stats.out', 'w') as f: + json.dump(engine_stats, f, indent=4) + json.dump(job_stats, f, indent=4) + except TypeError: # Is this the correct error code? + write_dict_to_file(engine_stats, out / 'stats.out') + write_dict_to_file(job_stats, out / 'stats.out') + + if sim_config.accounts: + try: + with open(out / 'accounts.json', 'w') as f: + json_string = json.dumps(engine.accounts.to_dict()) + f.write(json_string) + except TypeError: + write_dict_to_file(engine.accounts.to_dict(), out / 'accounts.json') + print("Output directory is: ", out) # If output is enabled, the user wants this information as last output + + +def run_multi_part_sim(sim_config: SimConfig): + multi_engine, jobs, timestep_start, timestep_end, time_delta = MultiPartEngine.from_sim_config(sim_config) + + # TODO: The mit_supercloud dataloader seems to be outputting the wrong timesteps? mit_supercloud + # is the only multi-partition system with replay, so just manually overriding the timesteps here + # to fix it for now. The original multi-part-sim.py always started from timestep 0 as well. + timestep_end = timestep_end - timestep_start + timestep_start = 0 + + if sim_config.output: + for part, engine in multi_engine.engines.items(): + engine.telemetry.save_snapshot( + jobs=jobs[part], + timestep_start=timestep_start, timestep_end=timestep_end, + filename=part.split('/')[-1], + args=sim_config.get_legacy_args(), + ) + + ui_update_freq = sim_config.system_configs[0].scheduler.ui_update_freq + gen = multi_engine.run_simulation(jobs, timestep_start, timestep_end, time_delta) + + for tick_datas in gen: + sys_power = 0 + tick_datas = {k: v for k, v in tick_datas.items() if v} # Filter nones + timestep = list(tick_datas.values())[0].current_timestep if tick_datas else None + + if timestep and timestep % ui_update_freq == 0: + for part, tick_data in tick_datas.items(): + engine = multi_engine.engines[part] + + sys_util = engine.sys_util_history[-1] if engine.sys_util_history else (0, 0.0) + if hasattr(engine.resource_manager, 'allocated_cpu_cores'): + allocated_cores = engine.resource_manager.allocated_cpu_cores + print( + f"[DEBUG] {part} - Timestep {timestep} - Jobs running: {len(engine.running)} -", + f"Utilization: {sys_util[1]:.2f}% - Allocated Cores: {allocated_cores} - ", + f"Power: {engine.sys_power:.1f}kW", + flush=True, + ) + sys_power += engine.sys_power + print(f"system power: {sys_power:.1f}kW", flush=True) + + print("Simulation complete.", flush=True) + + # Print statistics for each partition + for part, engine in multi_engine.engines.items(): + print(f"\n=== Partition: {part} ===") + + engine_stats = get_engine_stats(engine) + job_stats = get_job_stats(engine) + scheduler_stats = get_scheduler_stats(engine) + network_stats = get_network_stats(engine) if sim_config.simulate_network else None + + # Print a formatted report + print_formatted_report( + engine_stats=engine_stats, + job_stats=job_stats, + scheduler_stats=scheduler_stats, + network_stats=network_stats, + ) diff --git a/raps/sim_config.py b/raps/sim_config.py index 127cec302babc4833ce93df48dd1201cddf03c38..036ae8bdb5ac6d25948357e89cf15f75ba654a8b 100644 --- a/raps/sim_config.py +++ b/raps/sim_config.py @@ -1,18 +1,13 @@ import argparse -import sys -import yaml +from functools import cached_property from datetime import timedelta -from pathlib import Path from typing import Literal from raps.schedulers.default import PolicyType, BackfillType - from raps.utils import ( - parse_time_unit, convert_to_time_unit, infer_time_unit, ExpandedPath, - pydantic_add_args, yaml_dump, parse_td, + parse_time_unit, convert_to_time_unit, infer_time_unit, ExpandedPath, parse_td, ) - -from pydantic import BaseModel, model_validator, computed_field -from pydantic_settings import SettingsConfigDict +from raps.system_config import SystemConfig, get_partition_configs +from pydantic import BaseModel, model_validator Distribution = Literal['uniform', 'weibull', 'normal'] @@ -44,13 +39,12 @@ class SimConfig(BaseModel): Step size (unit specified by `time_unit`, default seconds). Can pass a string like 15s, 1m, 1h, 1ms """ - time_unit: timedelta + time_unit: timedelta = timedelta(seconds=1) """ Units all time delta ints are measured in (default seconds) """ - @computed_field - @property + @cached_property def downscale(self) -> int: return int(timedelta(seconds=1) / self.time_unit) @@ -65,7 +59,7 @@ class SimConfig(BaseModel): uncertainties: bool = False """ Use float-with-uncertainties (much slower) """ - seed: bool = False + seed: int | None = None """ Set RNG seed for deterministic simulation """ output: ExpandedPath | None = None """ Output power, cooling, and loss models for later analysis. Argument specifies name. """ @@ -209,7 +203,10 @@ class SimConfig(BaseModel): """ Specify the max queue length for continuous job generation """ @model_validator(mode="before") - def _parse_times(cls, data): + def _validate_before(cls, data): + # This is called with the raw input, before Pydantic parses it, so data is just a dict and + # contain any data types. + time_fields = [ "time_delta", "time", "fastforward", "downtime_first", "downtime_interval", "downtime_length", @@ -236,7 +233,8 @@ class SimConfig(BaseModel): return data @model_validator(mode="after") - def _validate(self): + def _validate_after(self): + # This is called after Pydantic has parsed everything into the model if self.system and self.partitions: raise ValueError("system and partitions are mutually exclusive") elif not self.system and not self.partitions: @@ -245,11 +243,40 @@ class SimConfig(BaseModel): if not self.replay and not self.workload: self.workload = "random" + if self.cooling: + self.layout = "layout2" + if self.jobsize_is_power_of is not None and self.jobsize_is_of_degree is not None: raise ValueError("jobsize_is_power_of and jobsize_is_of_degree are mutually exclusive") + if self.plot and not self.output: + raise ValueError("plot requires an output directory to be set") + + if self.live and not self.replay and self.time is None: + raise ValueError("--time must be set, specifing how long we want to predict") + return self + @property + def system_name(self) -> str: + """ + Name of the system. + Note, this is different than system, as system can be a file or None if partition is set. + """ + return self._multi_partition_system_config.system_name + + @property + def system_configs(self) -> list[SystemConfig]: + """ + Return the SystemConfigs for the selected systems. + Will be a single element array unless multiple `partitions` are selected. + """ + return self._multi_partition_system_config.partitions + + @cached_property + def _multi_partition_system_config(self): + return get_partition_configs(self.partitions if self.partitions else [self.system]) + def get_legacy_args(self): """ Return as an argparse.Namespace object for backwards compatability @@ -265,6 +292,7 @@ class SimConfig(BaseModel): args_dict = self.model_dump(mode="json") # validate has been renamed to power_scope args_dict['validate'] = args_dict["power_scope"] == "node" + args_dict['downscale'] = self.downscale # Convert Path objects to str if args_dict['output']: @@ -276,56 +304,3 @@ class SimConfig(BaseModel): args_dict['sim_config'] = self return args_dict - - -def parse_args(cli_args=None) -> SimConfig: - parser = argparse.ArgumentParser( - description="Resource Allocator & Power Simulator (RAPS)", - allow_abbrev=False, - formatter_class=argparse.ArgumentDefaultsHelpFormatter, - ) - parser.add_argument( - "config_file", nargs="?", default=None, - help=( - 'YAML sim config file, can be used to configure an experiment instead of using CLI ' + - 'flags. Pass "-" to read from stdin.' - ) - ) - - model_validate_args = pydantic_add_args(parser, SimConfig, model_config=SettingsConfigDict( - cli_implicit_flags=True, - cli_kebab_case=True, - cli_shortcuts={ - "partitions": "x", - "cooling": "c", - "simulate-network": "net", - "fastforward": "ff", - "time": "t", - "debug": "d", - "numjobs": "n", - "verbose": "v", - "output": "o", - "uncertainties": "u", - "plot": "p", - "replay": "f", - "workload": "w", - }, - )) - - args = parser.parse_args(cli_args) - if args.config_file == "-": - config_file_data = yaml.safe_load(sys.stdin.read()) - elif args.config_file: - config_file_data = yaml.safe_load(Path(args.config_file).read_text()) - else: - config_file_data = {} - - return model_validate_args(args, config_file_data) - - -sim_config = parse_args() -args = sim_config.get_legacy_args() -args_dict = sim_config.get_legacy_args_dict() - -if __name__ == "__main__": - print(yaml_dump(sim_config.model_dump(mode="json"))) diff --git a/raps/system_config.py b/raps/system_config.py index e458c68fb01af2b46855a50ff9d1569b2475b62e..642bb983386c6ea183d84b6cdd69c8e18ff7ea0e 100644 --- a/raps/system_config.py +++ b/raps/system_config.py @@ -3,6 +3,7 @@ import glob import fnmatch from typing import Any, Literal from pathlib import Path +from functools import cached_property import yaml from pydantic import BaseModel, computed_field, model_validator, field_validator from raps.raps_config import raps_config @@ -41,27 +42,27 @@ class SystemSystemConfig(BaseModel): return self @computed_field - @property + @cached_property def num_racks(self) -> int: return self.num_cdus * self.racks_per_cdu - len(self.missing_racks) @computed_field - @property + @cached_property def sc_shape(self) -> list[int]: return [self.num_cdus, self.racks_per_cdu, self.nodes_per_rack] @computed_field - @property + @cached_property def total_nodes(self) -> int: return self.num_cdus * self.racks_per_cdu * self.nodes_per_rack @computed_field - @property + @cached_property def blades_per_chassis(self) -> int: return int(self.nodes_per_rack / self.chassis_per_rack / self.nodes_per_blade) @computed_field - @property + @cached_property def power_df_header(self) -> list[str]: power_df_header = ["CDU"] for i in range(1, self.racks_per_cdu + 1): @@ -73,7 +74,7 @@ class SystemSystemConfig(BaseModel): return power_df_header @computed_field - @property + @cached_property def available_nodes(self) -> int: return self.total_nodes - len(self.down_nodes) @@ -120,7 +121,7 @@ class SystemSchedulerConfig(BaseModel): trace_quanta: int min_wall_time: int max_wall_time: int - ui_update_freq: int + ui_update_freq: int # TODO should be moved to raps_config max_nodes_per_job: int job_end_probs: dict[JobEndStates, float] multitenant: bool = False diff --git a/raps/telemetry.py b/raps/telemetry.py index f485daa9e9b68965bd80fac79e5e7943a2d7b6d9..5a09eb9d7d91930329bc41735ca9efafd10d7d1c 100644 --- a/raps/telemetry.py +++ b/raps/telemetry.py @@ -6,43 +6,14 @@ parsing parquet files, and generating job state information. The module defines a `Telemetry` class for managing telemetry data and several helper functions for data encryption and conversion between node name and index formats. """ -import re import sys import random import argparse -# import itertools +from pathlib import Path # import json -import os.path from typing import Optional from types import ModuleType - -if __name__ == "__main__": - # from raps.sim_config import args, args_dict - parser = argparse.ArgumentParser(description='Telemetry data validator') - parser.add_argument('--jid', type=str, default='*', help='Replay job id') - parser.add_argument('-f', '--replay', nargs='+', type=str, - help='Either: path/to/joblive path/to/jobprofile' - ' -or- filename.npz (overrides --workload option)') - parser.add_argument('-p', '--plot', type=str, default=None, choices=['jobs', 'nodes'], help='Output plots') - parser.add_argument("--is-results-file", action='store_true', default=False, help='Output plots') - parser.add_argument("--gantt-nodes", default=False, action='store_true', required=False, - # duplicate in workload! - help="Print Gannt with nodes required as line thickness (default false)") - parser.add_argument('-t', '--time', type=str, default=None, - help='Length of time to simulate, e.g., 123, 123s, 27m, 3h, 7d') - parser.add_argument('--system', type=str, default='frontier', help='System config to use') - choices = ['prescribed', 'poisson'] - parser.add_argument('--arrival', default=choices[0], type=str, choices=choices, - help=f"Modify arrival distribution ({choices[1]}) " - f"or use the original submit times ({choices[0]})") - parser.add_argument('-v', '--verbose', action='store_true', help='Enable verbose output') - parser.add_argument('-o', '--output', type=str, default=None, help='Store output in --output file.') - parser.add_argument("--live", action="store_true", help="Grab data from live system.") - - args = parser.parse_args() - args_dict = vars(args) - import importlib import numpy as np import pandas as pd @@ -57,8 +28,7 @@ from raps.plotting import ( plot_nodes_gantt, plot_network_histogram ) -from raps.utils import next_arrival_byconfargs, create_casename, convert_to_time_unit -# from raps.sim_config import args, args_dict +from raps.utils import next_arrival_byconfargs, convert_to_time_unit class Telemetry: @@ -69,18 +39,6 @@ class Telemetry: self.kwargs = kwargs self.system = kwargs.get('system') self.config = kwargs.get('config') - outname = kwargs.get('output') - if outname: - self.dirname = outname - elif kwargs.get("replay"): - # Try to extract date from given name to use as case directory - matched_date = re.search(r"\d{4}-\d{2}-\d{2}", kwargs['replay'][0]) - if matched_date: - self.dirname = f"sim={matched_date.group(0)}" - else: - self.dirname = create_casename() - else: - self.dirname = create_casename() try: self.dataloader = importlib.import_module(f"raps.dataloaders.{self.system}", package=__package__) @@ -119,8 +77,7 @@ class Telemetry: timestep_end = int(data['timestep_end']) else: timestep_end = np.inf - print(timestep_end) - exit() + raise ValueError("Invalid timestep_end in snapshot") if 'args' in data: args_from_file = data['args'].tolist() else: @@ -254,7 +211,7 @@ class Telemetry: jobs = [] trigger_custom_dataloader = False for i, file in enumerate(files): - file = os.path.normpath(file.lstrip('"').rstrip('"')) + file = str(Path(file)) if hasattr(args, 'is_results_file') and args.is_results_file: if file.endswith(".csv"): jobs, timestep_start, timestep, _ = self.load_csv_results(file) @@ -295,17 +252,12 @@ class Telemetry: break if trigger_custom_dataloader: # custom data loader - print(*args.replay) try: jobs, timestep_start_from_data, timestep_end_from_data = self.load_data(args.replay) except AssertionError: raise ValueError("Forgot --is-results-file ?") timestep_start = min(timestep_start, timestep_start_from_data) timestep_end = max(timestep_end, timestep_end_from_data) - self.save_snapshot(jobs=jobs, - timestep_start=timestep_start, - timestep_end=timestep_end, - args=args, filename=self.dirname) if args.time: timestep_end = timestep_start + convert_to_time_unit(args.time) elif not timestep_end: @@ -314,7 +266,30 @@ class Telemetry: return jobs, timestep_start, timestep_end, args -def run_telemetry(): +def run_telemetry_add_args(parser: argparse.ArgumentParser): + parser.add_argument('--jid', type=str, default='*', help='Replay job id') + parser.add_argument('-f', '--replay', nargs='+', type=str, + help='Either: path/to/joblive path/to/jobprofile' + ' -or- filename.npz (overrides --workload option)') + parser.add_argument('-p', '--plot', type=str, default=None, choices=['jobs', 'nodes'], help='Output plots') + parser.add_argument("--is-results-file", action='store_true', default=False, help='Output plots') + parser.add_argument("--gantt-nodes", default=False, action='store_true', required=False, + # duplicate in workload! + help="Print Gannt with nodes required as line thickness (default false)") + parser.add_argument('-t', '--time', type=str, default=None, + help='Length of time to simulate, e.g., 123, 123s, 27m, 3h, 7d') + parser.add_argument('--system', type=str, default='frontier', help='System config to use') + choices = ['prescribed', 'poisson'] + parser.add_argument('--arrival', default=choices[0], type=str, choices=choices, + help=f"Modify arrival distribution ({choices[1]}) " + f"or use the original submit times ({choices[0]})") + parser.add_argument('-v', '--verbose', action='store_true', help='Enable verbose output') + parser.add_argument('-o', '--output', type=str, default=None, help='Store output in --output file.') + parser.add_argument("--live", action="store_true", help="Grab data from live system.") + + +def run_telemetry(args): + args_dict = vars(args) config = get_system_config(args.system).get_legacy() args_dict['config'] = config td = Telemetry(**args_dict) @@ -324,8 +299,10 @@ def run_telemetry(): jobs, timestep_start, timestep_end = \ td.load_jobs_times_args_from_live_system() if args.output: - td.save_snapshot(jobs=jobs, timestep_start=timestep_start, - timestep_end=timestep_end, args=args, filename=td.dirname) + td.save_snapshot( + jobs=jobs, timestep_start=timestep_start, + timestep_end=timestep_end, args=args, filename=args.output, + ) elif args.replay: jobs, timestep_start, timestep_end, _ = \ @@ -334,8 +311,8 @@ def run_telemetry(): config=config) else: - parser.print_help() - exit() + print("Either --live or --replay is required") + sys.exit(1) timesteps = timestep_end - timestep_start @@ -416,14 +393,10 @@ def run_telemetry(): plot_network_histogram(ax=ax, data=net_means) if args.output is not None: if args.output == "": - filename = f"{td.dirname}.svg" + filename = f"{args.output}.svg" else: filename = args.output plt.savefig(f'{filename}') print(f"Saved to: {filename}") else: plt.show() - - -if __name__ == "__main__": - run_telemetry() diff --git a/raps/ui.py b/raps/ui.py index d9c3bbe199a60c67391054ac6200cee8c6dcda21..5be35238d21fe1e62505d0585934dd589d246bd8 100644 --- a/raps/ui.py +++ b/raps/ui.py @@ -576,7 +576,3 @@ class LayoutManager: self.update_progress_bar(1) finally: os.system("stty sane") - - def run_stepwise(self, jobs, timestep_start, timestep_end, time_delta): - """ Prepares the UI and returns a generator for the simulation """ - return self.engine.run_simulation(jobs, timestep_start, timestep_end, time_delta) diff --git a/raps/workload.py b/raps/workload.py index 151e2c3590498b38c1c32accadc889c5e886ca01..563071d3e10de848c47f21f6aa0169c612c26977 100644 --- a/raps/workload.py +++ b/raps/workload.py @@ -39,6 +39,7 @@ import matplotlib.pyplot as plt from raps.telemetry import Telemetry from raps.job import job_dict, Job from raps.utils import create_file_indexed +from raps.sim_config import SimConfig JOB_NAMES = ["LAMMPS", "GROMACS", "VASP", "Quantum ESPRESSO", "NAMD", @@ -799,18 +800,23 @@ def plot_job_hist(jobs, config=None, dist_split=None, gantt_nodes=False): plt.show() -def run_workload(): - from raps.sim_config import args, args_dict - from raps.system_config import get_system_config - config = get_system_config(args.system).get_legacy() - if args.replay: +def run_workload(sim_config: SimConfig): + args = sim_config.get_legacy_args() + args_dict = sim_config.get_legacy_args() + config = sim_config.system_configs[0].get_legacy() + + if sim_config.replay: td = Telemetry(**args_dict) - jobs, _, _, _ = td.load_jobs_times_args_from_files(files=args.replay, args=args, config=config) + jobs, _, _, _ = td.load_jobs_times_args_from_files(files=sim_config.replay, args=args, config=config) else: workload = Workload(args, config) - jobs = getattr(workload, args.workload)(args=args) - plot_job_hist(jobs, config=config, dist_split=args.multimodal, gantt_nodes=args.gantt_nodes) - if args.output: + jobs = getattr(workload, sim_config.workload)(args=sim_config.get_legacy_args) + plot_job_hist(jobs, + config=config, + dist_split=sim_config.multimodal, + gantt_nodes=sim_config.gantt_nodes) + + if sim_config.output: timestep_start = min([x.submit_time for x in jobs]) timestep_end = math.ceil(max([x.submit_time for x in jobs]) + max([x.expected_run_time for x in jobs])) filename = create_file_indexed('wl', create=False, ending="npz").split(".npz")[0] @@ -970,7 +976,3 @@ def continuous_job_generation(*, engine, timestep, jobs): if len(engine.queue) <= engine.continuous_workload.args.maxqueue: new_jobs = engine.continuous_workload.generate_jobs() jobs.extend(new_jobs) - - -if __name__ == "__main__": - run_workload() diff --git a/tests/conftest.py b/tests/conftest.py index 8f05879cd5ff1de5275b4540fe33bf655cd9b43a..477588ab47724f27096e3f03023175d3f84427ae 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -10,7 +10,7 @@ def pytest_addoption(parser): def pytest_runtest_setup(item): if "long" in item.keywords and not item.config.getoption("--runlong"): - #reason = f"Skipping {item.nodeid} because it requires --runlong" + # reason = f"Skipping {item.nodeid} because it requires --runlong" reason = "Skipping test because it requires --runlong" pytest.skip(reason) diff --git a/tests/smoke.py b/tests/smoke.py index 0f9f4cada09d535591296bcac18c353026d66339..7548de3982c2ed2c9464aa72ed5420eaeb3fbadd 100644 --- a/tests/smoke.py +++ b/tests/smoke.py @@ -32,7 +32,7 @@ def run_command(command): def build_command(system, file_paths, additional_args=""): """Build the command string for the given system and file paths.""" full_paths = " ".join([os.path.join(DATAPATH, path) for path in file_paths.split()]) - return f"python main.py --system {system} -f {full_paths} -t {DEFAULT_TIME} {additional_args}".strip() + return f"python main.py run --system {system} -f {full_paths} -t {DEFAULT_TIME} {additional_args}".strip() def execute_system_tests(systems): @@ -45,16 +45,16 @@ def execute_system_tests(systems): def synthetic_workload_tests(): """Run synthetic workload tests.""" print("Starting synthetic workload tests...") - run_command(f"python main.py -t {DEFAULT_TIME}") - run_command(f"python main.py -w benchmark -t {BENCH_TIME}") - run_command(f"python main.py -w peak -t {DEFAULT_TIME}") - run_command(f"python main.py -w idle -t {DEFAULT_TIME}") + run_command(f"python main.py run -t {DEFAULT_TIME}") + run_command(f"python main.py run -w benchmark -t {BENCH_TIME}") + run_command(f"python main.py run -w peak -t {DEFAULT_TIME}") + run_command(f"python main.py run -w idle -t {DEFAULT_TIME}") def hetero_tests(): """Run heterogeneous workload tests.""" print("Starting heterogeneous workload tests...") - run_command(f"python multi-part-sim.py -x setonix/part-cpu setonix/part-gpu -t {DEFAULT_TIME}") + run_command(f"python main.py run-multi-part -x setonix/part-cpu setonix/part-gpu -t {DEFAULT_TIME}") def main(): diff --git a/tests/systems/conftest.py b/tests/systems/conftest.py index bcde0297894008febf94bcf2d8e0514f05f4043a..8e361e9b8ce79eefb53128d3260c84ec5bda10bb 100644 --- a/tests/systems/conftest.py +++ b/tests/systems/conftest.py @@ -99,7 +99,7 @@ def system_config(system): "time_delta": True, "net": False, }, - "lassen":{ + "lassen": { "main": True, "telemetry": False, # Takes very long! "multi-part-sim": False, @@ -111,7 +111,7 @@ def system_config(system): "time_delta": True, "net": True, }, - "marconi100":{ + "marconi100": { "main": True, "telemetry": True, "multi-part-sim": False, @@ -182,16 +182,16 @@ def system_config(system): @pytest.fixture def system_file(system): files = { - "40frontiers":[], - "adastraMI250":["AdastaJobsMI250_15days.parquet"], - "frontier":["slurm/joblive/date=2024-01-18/","jobprofile/date=2024-01-18/"], - "fugaku":["21_04.parquet"], - "gcloudv2":["/v2/google_cluster_data_2011_sample"], - "lassen":["Lassen-Supercomputer-Job-Dataset"], - "marconi100":["job_table.parquet"], - "mit_supercloud":["202201"], - "setonix":[""], - "summit":[], - "lumi":[] + "40frontiers": [], + "adastraMI250": ["AdastaJobsMI250_15days.parquet"], + "frontier": ["slurm/joblive/date=2024-01-18/", "jobprofile/date=2024-01-18/"], + "fugaku": ["21_04.parquet"], + "gcloudv2": ["/v2/google_cluster_data_2011_sample"], + "lassen": ["Lassen-Supercomputer-Job-Dataset"], + "marconi100": ["job_table.parquet"], + "mit_supercloud": ["202201"], + "setonix": [""], + "summit": [], + "lumi": [] } - return files.get(system,files) + return files.get(system, files) diff --git a/tests/systems/test_engine.py b/tests/systems/test_engine.py new file mode 100644 index 0000000000000000000000000000000000000000..974ed2e766ea80e8c0ccc4c4f1ff5a87e5541769 --- /dev/null +++ b/tests/systems/test_engine.py @@ -0,0 +1,39 @@ +import gc +import pytest +from raps.engine import Engine +from raps.sim_config import SimConfig +from raps.stats import ( + get_engine_stats, + # get_job_stats, + # get_scheduler_stats, + # get_network_stats, +) + +pytestmark = [ + pytest.mark.system, + pytest.mark.nodata +] + + +def test_engine(system, system_config): + if not system_config.get("main", False): + pytest.skip(f"{system} does not support basic main run.") + + sim_config = SimConfig.model_validate({ + "system": system, + "time": "2m", + }) + engine, jobs, timestep_start, timestep_end, time_delta = Engine.from_sim_config(sim_config) + ticks = list(engine.run_simulation(jobs, timestep_start, timestep_end, time_delta)) + + assert len(ticks) == 120 + + engine_stats = get_engine_stats(engine) + # job_stats = get_job_stats(engine) + # scheduler_stats = get_scheduler_stats(engine) + # network_stats = get_network_stats(engine) + + assert engine_stats['time simulated'] == '0:02:00' + # TODO: More specific tests of values + + gc.collect() diff --git a/tests/systems/test_main_basic_run.py b/tests/systems/test_main_basic_run.py index 8e319521908739a74032b4baec387c70158a2687..c420b5921829a2c664c6d1d005451c6cf9a9a519 100644 --- a/tests/systems/test_main_basic_run.py +++ b/tests/systems/test_main_basic_run.py @@ -11,13 +11,13 @@ pytestmark = [ ] -def test_main_basic_run(system, system_config,random_id): +def test_main_basic_run(system, system_config, random_id): if not system_config.get("main", False): pytest.skip(f"{system} does not support basic main run.") os.chdir(PROJECT_ROOT) result = subprocess.run([ - "python", "main.py", + "python", "main.py", "run", "--time", "1m", "--system", system, "-o", random_id diff --git a/tests/systems/test_main_cooling_run.py b/tests/systems/test_main_cooling_run.py index 1411d8cbbb1189f676fd2a7a78fb134ccf5c3932..62d862121306843009cd92f451d17fc22670c8bd 100644 --- a/tests/systems/test_main_cooling_run.py +++ b/tests/systems/test_main_cooling_run.py @@ -18,7 +18,7 @@ def test_main_cooling_run(system, system_config, random_id): os.chdir(PROJECT_ROOT) result = subprocess.run([ - "python", "main.py", + "python", "main.py", "run", "--time", "1h", "--system", system, "-c", diff --git a/tests/systems/test_main_cooling_uncertainty_run.py b/tests/systems/test_main_cooling_uncertainty_run.py index 2491d7a509109beb7c879eabd75aced86a37a2e0..742fe87a87944d01fcbd2fbed206caf74e88c9e9 100644 --- a/tests/systems/test_main_cooling_uncertainty_run.py +++ b/tests/systems/test_main_cooling_uncertainty_run.py @@ -19,7 +19,7 @@ def test_main_cooling_uncertainty_run(request, system, system_config, random_id) os.chdir(PROJECT_ROOT) result = subprocess.run([ - "python", "main.py", + "python", "main.py", "run", "--time", "3m", "--system", system, "-c", diff --git a/tests/systems/test_main_fastforward_run.py b/tests/systems/test_main_fastforward_run.py index 4b0584b2f634dddbd3c054fdcd2f0efe21506037..3eb567c0276d77ff191e5161613e7d185687d08d 100644 --- a/tests/systems/test_main_fastforward_run.py +++ b/tests/systems/test_main_fastforward_run.py @@ -24,11 +24,10 @@ def test_main_fastforward_run(system, system_config, ff_arg, random_id): os.chdir(PROJECT_ROOT) result = subprocess.run([ - "python", "main.py", + "python", "main.py", "run", "-t 1", "--fastforward", ff_arg, "--system", system, - #--"-f", system_file, "--noui", "-o", random_id ], capture_output=True, text=True, stdin=subprocess.DEVNULL) diff --git a/tests/systems/test_main_help.py b/tests/systems/test_main_help.py index f84c63df2bac5d1eca0a1d0876f454166580f9ac..a651a382a426534c7adcfa05179e9a52095b7b24 100644 --- a/tests/systems/test_main_help.py +++ b/tests/systems/test_main_help.py @@ -11,13 +11,13 @@ pytestmark = [ ] -def test_main_help(system, system_config,random_id): +def test_main_help(system, system_config, random_id): if not system_config.get("main", False): pytest.skip(f"{system} does not support basic main run.") os.chdir(PROJECT_ROOT) result = subprocess.run([ - "python", "main.py", + "python", "main.py", "run", "-h" ], capture_output=True, text=True, stdin=subprocess.DEVNULL) diff --git a/tests/systems/test_main_network_run.py b/tests/systems/test_main_network_run.py index 8b80d5dbdc1a9c19d32408facb67fc671cd44642..8c7db1e87bfbb7fbcb5bb335420aca099858739a 100644 --- a/tests/systems/test_main_network_run.py +++ b/tests/systems/test_main_network_run.py @@ -21,7 +21,7 @@ def test_main_network_run(system, system_config, random_id): os.chdir(PROJECT_ROOT) result = subprocess.run([ - "python", "main.py", + "python", "main.py", "run", "--time", "1m", "--system", system, "--net", diff --git a/tests/systems/test_main_network_withdata_run.py b/tests/systems/test_main_network_withdata_run.py index 31db05ebdbb93dd825b3e118f384a4c464803dfd..1dcfee029ad6f51d11c2b70f32088808cca24d88 100644 --- a/tests/systems/test_main_network_withdata_run.py +++ b/tests/systems/test_main_network_withdata_run.py @@ -23,11 +23,12 @@ def test_main_network_withdata_run(system, system_config, system_file, random_id else: file_list = [DATA_PATH / system / system_file] for file in file_list: - assert os.path.isfile(file) or os.path.isdir(file), "File does not exist. does ./data exist or is RAPS_DATA_DIR set?" + assert os.path.isfile(file) or os.path.isdir(file), \ + "File does not exist. does ./data exist or is RAPS_DATA_DIR set?" os.chdir(PROJECT_ROOT) result = subprocess.run([ - "python", "main.py", + "python", "main.py", "run", "--time", "1m", "--system", system, "-f", *file_list, diff --git a/tests/systems/test_main_noui_run.py b/tests/systems/test_main_noui_run.py index 5b12b55256ff5d7b2bbbdf42afc86a30d87f65da..af8bea80d88195e44ea17b91caaa61d4e1c1f09f 100644 --- a/tests/systems/test_main_noui_run.py +++ b/tests/systems/test_main_noui_run.py @@ -17,7 +17,7 @@ def test_main_noui_run(system, system_config, random_id): os.chdir(PROJECT_ROOT) result = subprocess.run([ - "python", "main.py", + "python", "main.py", "run", "--time", "1m", "--system", system, "--noui", diff --git a/tests/systems/test_main_time_delta_run.py b/tests/systems/test_main_time_delta_run.py index 9cb87a229af27a3f5094a23e14d30d09a0057d31..880805282785f7757ec19026668430429390fdcf 100644 --- a/tests/systems/test_main_time_delta_run.py +++ b/tests/systems/test_main_time_delta_run.py @@ -21,18 +21,17 @@ pytestmark = [ ("10h", "1h"), ("10h", "3h"), ("3d", "1d") -], ids=["1","1s","10s","1m","1h","3h","1d"]) +], ids=["1", "1s", "10s", "1m", "1h", "3h", "1d"]) def test_main_time_delta_run(system, system_config, time_arg, tdelta_arg, random_id): if not system_config.get("time_delta", False): pytest.skip(f"{system} does not support time_delta run.") os.chdir(PROJECT_ROOT) result = subprocess.run([ - "python", "main.py", + "python", "main.py", "run", "-t", time_arg, "--time-delta", tdelta_arg, "--system", system, - #--"-f", system_file, "--noui", "-o", random_id ], capture_output=True, text=True, stdin=subprocess.DEVNULL) diff --git a/tests/systems/test_main_time_delta_sub_second_run.py b/tests/systems/test_main_time_delta_sub_second_run.py index 9276011296d76ba3bc9da9d2cd219e38d69e6a40..0bedee88d80b41af59edf94318af62dfa6f39399 100644 --- a/tests/systems/test_main_time_delta_sub_second_run.py +++ b/tests/systems/test_main_time_delta_sub_second_run.py @@ -22,18 +22,17 @@ pytestmark = [ ("10cs", "1ms"), ("100ms", "1ms"), ("100ms", "1s"), -], ids=["1ds","3ds","1cs","1ms","1cs-for-10ds","1ms-for-10cs","1ms-for-100ms","1s-for-100ms"]) +], ids=["1ds", "3ds", "1cs", "1ms", "1cs-for-10ds", "1ms-for-10cs", "1ms-for-100ms", "1s-for-100ms"]) def test_main_time_delta_sub_second_run(system, system_config, time_arg, tdelta_arg, random_id): if not system_config.get("time_delta", False): pytest.skip(f"{system} does not support time_delta run.") os.chdir(PROJECT_ROOT) result = subprocess.run([ - "python", "main.py", + "python", "main.py", "run", "-t", time_arg, "--time-delta", tdelta_arg, "--system", system, - #--"-f", system_file, "--noui", "-o", random_id ], capture_output=True, text=True, stdin=subprocess.DEVNULL) diff --git a/tests/systems/test_main_time_ff_delta_run.py b/tests/systems/test_main_time_ff_delta_run.py index a1366158543626d0bb2b0d8bb546e4a4d084164e..a6c87639d0ec02bc95e82929248f75202db1943c 100644 --- a/tests/systems/test_main_time_ff_delta_run.py +++ b/tests/systems/test_main_time_ff_delta_run.py @@ -20,20 +20,19 @@ pytestmark = [ ("10h", "1h", "2h"), ("10h", "3h", "1h"), pytest.param("3d", "1d", "1d", marks=pytest.mark.long, id="1d (long)"), -], ids=["1","1s","10s","1m","1h","3h","1d"]) +], ids=["1", "1s", "10s", "1m", "1h", "3h", "1d"]) def test_main_time_ff_delta_run(system, system_config, time_arg, tdelta_arg, - ff_arg, random_id): + ff_arg, random_id): if not system_config.get("time_delta", False): pytest.skip(f"{system} does not support time_delta run.") os.chdir(PROJECT_ROOT) result = subprocess.run([ - "python", "main.py", + "python", "main.py", "run", "-t", time_arg, "--ff", ff_arg, "--time-delta", tdelta_arg, "--system", system, - #--"-f", system_file, "--noui", "-o", random_id ], capture_output=True, text=True, stdin=subprocess.DEVNULL) diff --git a/tests/systems/test_main_time_run.py b/tests/systems/test_main_time_run.py index e87e3310e360afd22bb55e47028363d9d992684e..c8e00b14293ff6d2820680743585801e0c1f7841 100644 --- a/tests/systems/test_main_time_run.py +++ b/tests/systems/test_main_time_run.py @@ -27,10 +27,9 @@ def test_main_time_run(system, system_config, time_args, random_id): os.chdir(PROJECT_ROOT) result = subprocess.run([ - "python", "main.py", + "python", "main.py", "run", "--time", time_args, "--system", system, - #--"-f", system_file, "--noui", "-o", random_id ], capture_output=True, text=True, stdin=subprocess.DEVNULL) diff --git a/tests/systems/test_main_uncertainty_run.py b/tests/systems/test_main_uncertainty_run.py index 815a66143ed78deda34404dd41ca1e8cc6ddd440..effdcc64d1bdfb0c789f30eda4e95a45ab9dd534 100644 --- a/tests/systems/test_main_uncertainty_run.py +++ b/tests/systems/test_main_uncertainty_run.py @@ -19,7 +19,7 @@ def test_main_uncertainty_run(system, system_config, random_id): os.chdir(PROJECT_ROOT) result = subprocess.run([ - "python", "main.py", + "python", "main.py", "run", "--time", "3m", "--system", system, "-u", diff --git a/tests/systems/test_main_withdata_run.py b/tests/systems/test_main_withdata_run.py index 299d34cbfd7e58888102ce6f218daad9c62b039b..a4cbd55cb37f3773d7d272e32d8113009fe09da5 100644 --- a/tests/systems/test_main_withdata_run.py +++ b/tests/systems/test_main_withdata_run.py @@ -22,10 +22,11 @@ def test_main_withdata_run(system, system_config, system_file, random_id): else: file_list = [DATA_PATH / system / system_file] for file in file_list: - assert os.path.isfile(file) or os.path.isdir(file), f"File `{file}' does not exist. does ./data exist or is RAPS_DATA_DIR set?" + assert os.path.isfile(file) or os.path.isdir(file), \ + f"File `{file}' does not exist. does ./data exist or is RAPS_DATA_DIR set?" os.chdir(PROJECT_ROOT) result = subprocess.run([ - "python", "main.py", + "python", "main.py", "run", "--time", "1m", "--system", system, "-f", ','.join(str(p) for p in file_list), diff --git a/tests/systems/test_multi_part_sim_basic_run.py b/tests/systems/test_multi_part_sim_basic_run.py index e8e64e95bc653c01884354de84f8ef70d5a95d77..3ea2a9caa1b6d5b156524a8c6f9b4c2666869382 100644 --- a/tests/systems/test_multi_part_sim_basic_run.py +++ b/tests/systems/test_multi_part_sim_basic_run.py @@ -18,10 +18,9 @@ def test_multi_part_sim_basic_run(system, system_config): os.chdir(PROJECT_ROOT) result = subprocess.run([ - "python", "multi-part-sim.py", + "python", "main.py", "run-multi-part", "--time", "1h", "-x", f"{system}/*", - #"--noui" ], capture_output=True, text=True, stdin=subprocess.DEVNULL) assert result.returncode == 0, f"Failed on {system}: {result.stderr}" del result diff --git a/tests/systems/test_multi_part_sim_network_run.py b/tests/systems/test_multi_part_sim_network_run.py index 3f53e99c656b28b15ed04fb0848f11e4be922587..ccbadaaac11b063886d553a8f04b08b2053b8827 100644 --- a/tests/systems/test_multi_part_sim_network_run.py +++ b/tests/systems/test_multi_part_sim_network_run.py @@ -21,16 +21,15 @@ def test_multi_part_sim_network_run(system, system_config, random_id): os.chdir(PROJECT_ROOT) result = subprocess.run([ - "python", "multi-part-sim.py", + "python", "main.py", "run-multi-part", "--time", "1h", "-x", f"{system}/*", "--net", - #"--noui" ], capture_output=True, text=True, stdin=subprocess.DEVNULL) assert result.returncode == 0, f"Failed on {system}: {result.stderr}" - #TODO: - #Cleanup files after test! + # TODO: + # Cleanup files after test! del result gc.collect() diff --git a/tests/systems/test_multi_part_sim_withdata_run.py b/tests/systems/test_multi_part_sim_withdata_run.py index f862acacd7cfd768aa853b813f034d132ca6ed48..2b183054ea418196642290da9e65a4866fbf4f3c 100644 --- a/tests/systems/test_multi_part_sim_withdata_run.py +++ b/tests/systems/test_multi_part_sim_withdata_run.py @@ -22,15 +22,15 @@ def test_multi_part_sim_withdata_run(system, system_config, system_file): else: file_list = [DATA_PATH / system / system_file] for file in file_list: - assert os.path.isfile(file) or os.path.isdir(file), f"File `{file}' does not exist. does ./data exist or is RAPS_DATA_DIR set?" + assert os.path.isfile(file) or os.path.isdir(file), \ + f"File `{file}' does not exist. does ./data exist or is RAPS_DATA_DIR set?" os.chdir(PROJECT_ROOT) result = subprocess.run([ - "python", "multi-part-sim.py", + "python", "main.py", "run-multi-part", "--time", "1h", "-x", f"{system}/*", "-f", *file_list, - #"--noui" ], capture_output=True, text=True, stdin=subprocess.DEVNULL) assert result.returncode == 0, f"Failed on {system}: {result.stderr}" del result diff --git a/tests/systems/test_telemetry_withdata_run.py b/tests/systems/test_telemetry_withdata_run.py index 415fbfe011562a46c8333edefcc1ad9b8894beef..e9685f7f3a01a21c8eadd989957511cdbf2294bd 100644 --- a/tests/systems/test_telemetry_withdata_run.py +++ b/tests/systems/test_telemetry_withdata_run.py @@ -22,10 +22,11 @@ def test_telemetry_main_withdata_run(system, system_config, system_file, random_ else: file_list = [DATA_PATH / system / system_file] for file in file_list: - assert os.path.isfile(file) or os.path.isdir(file), f"File `{file}' does not exist. does ./data exist or is RAPS_DATA_DIR set?" + assert os.path.isfile(file) or os.path.isdir(file), \ + f"File `{file}' does not exist. does ./data exist or is RAPS_DATA_DIR set?" os.chdir(PROJECT_ROOT) result = subprocess.run([ - "python", "raps/telemetry.py", + "python", "main.py", "telemetry", "--system", system, "-f", *file_list, "-o", random_id diff --git a/tests/systems/test_workload_synthetic.py b/tests/systems/test_workload_synthetic.py index dd5f8cf6c395f6aa8c7463101c57b11c4d270c56..107b01521e6205d1eb60945192fb73c79baf5234 100644 --- a/tests/systems/test_workload_synthetic.py +++ b/tests/systems/test_workload_synthetic.py @@ -13,29 +13,26 @@ def flatten(dist): name, args = dist return [name, *args] -def _build_args(dist_name, params): - return [dist_name, *params] - jobdist_case = [ ("weibull", ["--jobsize-weibull-shape", "0.75", "--jobsize-weibull-scale", "16"]), ("normal", ["--jobsize-normal-stddev", "100", "--jobsize-normal-mean", "16"]), - ("uniform",[]), + ("uniform", []), ] cpudist_case = [ ("weibull", ["--cpuutil-weibull-shape", "0.75", "--cpuutil-weibull-scale", "16"]), ("normal", ["--cpuutil-normal-stddev", "100", "--cpuutil-normal-mean", "16"]), - ("uniform",[]), + ("uniform", []), ] gpudist_case = [ ("weibull", ["--gpuutil-weibull-shape", "0.75", "--gpuutil-weibull-scale", "16"]), ("normal", ["--gpuutil-normal-stddev", "100", "--gpuutil-normal-mean", "16"]), - ("uniform",[]), + ("uniform", []), ] wtimedist_case = [ ("weibull", ["--walltime-weibull-shape", "0.75", "--walltime-weibull-scale", "16"]), ("normal", ["--walltime-normal-stddev", "100", "--walltime-normal-mean", "16"]), - ("uniform",[]), + ("uniform", []), ] additional_params_cases = [ "", # nothing @@ -47,16 +44,16 @@ additional_params_cases = [ @pytest.mark.parametrize( - "jobdist", jobdist_case, ids=lambda d:d[0] + "jobdist", jobdist_case, ids=lambda d: d[0] ) @pytest.mark.parametrize( - "cpudist", cpudist_case, ids=lambda d:d[0] + "cpudist", cpudist_case, ids=lambda d: d[0] ) @pytest.mark.parametrize( - "gpudist", gpudist_case, ids=lambda d:d[0] + "gpudist", gpudist_case, ids=lambda d: d[0] ) @pytest.mark.parametrize( - "wtimedist", wtimedist_case, ids=lambda d:d[0] + "wtimedist", wtimedist_case, ids=lambda d: d[0] ) @pytest.mark.parametrize( "additional_params", additional_params_cases, ids=lambda p: (p or "none") @@ -75,7 +72,7 @@ def test_workload_synthetic_run( # Build the command line. Each distribution tuple expands into: # dist_name, , , ... cmd = [ - "python", "raps/workload.py", + "python", "main.py", "workload", "--system", system, "-w", "synthetic", "--jobsize-distribution", *flatten(jobdist), @@ -90,7 +87,7 @@ def test_workload_synthetic_run( cmd.extend(additional_params) cmd1 = ["python", "-c \"exit()\""] - result = subprocess.run(cmd1,capture_output=True,text=True,stdin=subprocess.DEVNULL) + result = subprocess.run(cmd1, capture_output=True, text=True, stdin=subprocess.DEVNULL) try: result = subprocess.run( cmd, diff --git a/tests/test_main.py b/tests/test_main.py index 76f48a361c744e5c4f650f3ce0fed59e5bdbab86..5c08182e6300789d100a53306050b0ea8d8c5041 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -13,10 +13,10 @@ PROJECT_ROOT = Path(__file__).resolve().parent.parent # adjust if needed def test_main_withui(): os.chdir(PROJECT_ROOT) result = subprocess.run([ - "python", "main.py", + "python", "main.py", "run", "--time", "1h", ], capture_output=True, - text=True + text=True ) assert result.returncode == 0 @@ -25,11 +25,11 @@ def test_main_withui(): def test_main_noui(): os.chdir(PROJECT_ROOT) result = subprocess.run([ - "python", "main.py", + "python", "main.py", "run", "--time", "1h", "--noui" ], capture_output=True, - text=True + text=True ) assert result.returncode == 0 @@ -39,8 +39,8 @@ def test_main_noui(): def test_main_long(): os.chdir(PROJECT_ROOT) result = subprocess.run([ - "python", "main.py", + "python", "main.py", "run", ], capture_output=True, - text=True + text=True ) assert result.returncode == 0 diff --git a/tests/util.py b/tests/util.py index 96609c7723811da9201fbc4f305e2c5f5a8afec5..6ee1df764bdf971f4c54066068814fc823a35ef1 100644 --- a/tests/util.py +++ b/tests/util.py @@ -13,10 +13,13 @@ def find_project_root(): PROJECT_ROOT = find_project_root() CONFIG_PATH = PROJECT_ROOT / "config" -DATA_PATH = Path(os.getenv("RAPS_DATA_DIR",PROJECT_ROOT / "data")).resolve() +DATA_PATH = Path(os.getenv("RAPS_DATA_DIR", PROJECT_ROOT / "data")).resolve() -#Maybe usefull but now all systems are listed explicitly! -system_list = [entry for entry in os.listdir(CONFIG_PATH) if os.path.isfile(os.path.join(CONFIG_PATH,entry,'system.json'))] +# Maybe usefull but now all systems are listed explicitly! +system_list = [ + entry for entry in os.listdir(CONFIG_PATH) + if os.path.isfile(os.path.join(CONFIG_PATH, entry, 'system.json')) +] def requires_all_markers(request, required_markers):