diff --git a/.flake8 b/.flake8 index bd4851160ef5224bd298304c7ca2aa0c84e99a30..ce4ab0ac6d82d27416769484a719a259f75e05a4 100644 --- a/.flake8 +++ b/.flake8 @@ -1,3 +1,3 @@ [flake8] -exclude = .git, __pycache__, venv*, simulation_results, third_party +exclude = .git, __pycache__, venv*, simulation_results, third_party, models max-line-length = 120 diff --git a/README.md b/README.md index f46e23afce060be2fa12a562073837746bca4fce..665a64ac8a5fd58ac9425026cba0749c51e8b38b 100644 --- a/README.md +++ b/README.md @@ -13,7 +13,7 @@ Instructions for setup and usage are given below. An online documentation of Exa ## Setup environment -Note: Requires python3.11 or greater. +Note: Requires python3.12 or greater. pip install -e . @@ -30,7 +30,7 @@ Note: Requires python3.11 or greater. # Frontier DATEDIR="date=2024-01-18" DPATH=~/data/frontier-sample-2024-01-18 - python main.py -f $DPATH/slurm/joblive/$DATEDIR $DPATH/jobprofile/$DATEDIR + python main.py -f $DPATH/slurm/joblive/$DATEDIR,$DPATH/jobprofile/$DATEDIR ## Open Telemetry dataset @@ -46,7 +46,7 @@ For Adastra MI250 supercomputer, download 'AdastaJobsMI250_15days.parquet' from For Google cluster trace v2 - python main.py --system gcloudv2 -f ~/data/gcloud/v2/google_cluster_data_2011_sample -ff 600 + python main.py --system gcloudv2 -f ~/data/gcloud/v2/google_cluster_data_2011_sample --ff 600 # analyze dataset python -m raps.telemetry --system gcloudv2 -f ~/data/gcloud/v2/google_cluster_data_2011_sample -v @@ -83,7 +83,7 @@ For Lumi Lassen is one of the few datasets that has networking data. See `raps/dataloaders/lassen.py` for how to get the datasets. To run a network simulation, use the following command: - python main.py -f ~/data/lassen/Lassen-Supercomputer-Job-Dataset --system lassen --policy fcfs --backfill firstfit -ff 365d -t 12h --arrival poisson -net + python main.py -f ~/data/lassen/Lassen-Supercomputer-Job-Dataset --system lassen --policy fcfs --backfill firstfit --ff 365d -t 12h --arrival poisson --net ## Snapshot of extracted workload data @@ -140,10 +140,10 @@ This will dump a .npz file with a randomized name, e.g. ac23db.npz. Let's rename There are three ways to modify replaying of telemetry data: 1. `--arrival`. Changing the arrival time distribution - replay cases will default to `--arrival prescribed`, where the jobs will be submitted exactly as they were submitted on the physical machine. This can be changed to `--arrival poisson` to change when the jobs arrive, which is especially useful in cases where there may be gaps in time, e.g., when the system goes down for several days, or the system is is underutilized. -python main.py -f $DPATH/slurm/joblive/$DATEDIR $DPATH/jobprofile/$DATEDIR --arrival poisson +python main.py -f $DPATH/slurm/joblive/$DATEDIR,$DPATH/jobprofile/$DATEDIR --arrival poisson 2. `--policy`. Changing the way the jobs are scheduled. The `--policy` flag will be set by default to `replay` in cases where a telemetry file is provided, in which case the jobs will be scheduled according to the start times provided. Changing the `--policy` to `fcfs` or `backfill` will use the internal scheduler. -python main.py -f $DPATH/slurm/joblive/$DATEDIR $DPATH/jobprofile/$DATEDIR --policy fcfs --backfill firstfit -t 12h +python main.py -f $DPATH/slurm/joblive/$DATEDIR,$DPATH/jobprofile/$DATEDIR --policy fcfs --backfill firstfit -t 12h 3. `--scale`. Changing the scale of each job in the telemetry data. The `--scale` flag will specify the maximum number of nodes for each job (generally set this to the max number of nodes of the smallest partition), and randomly select the number of nodes for each job from one to max nodes. This flag is useful when replaying telemetry from a larger system onto a smaller system. @@ -151,11 +151,11 @@ python main.py -f $DPATH/slurm/joblive/$DATEDIR $DPATH/jobprofile/$DATEDIR --pol ## Job-level power output example for replay of single job - python main.py -f $DPATH/slurm/joblive/$DATEDIR $DPATH/jobprofile/$DATEDIR --jid 1234567 -o + python main.py -f $DPATH/slurm/joblive/$DATEDIR,$DPATH/jobprofile/$DATEDIR --jid 1234567 -o ## Compute stats on telemetry data, e.g., average job arrival time - python -m raps.telemetry -f $DPATH/slurm/joblive/$DATEDIR $DPATH/jobprofile/$DATEDIR + python -m raps.telemetry -f $DPATH/slurm/joblive/$DATEDIR,$DPATH/jobprofile/$DATEDIR ## Build and run Docker container diff --git a/main.py b/main.py index 9655b1bd2b04f6edd5d53b00e7817534125eb879..c3ba946ad64bd66572b25e4d6487bd58718e23a4 100644 --- a/main.py +++ b/main.py @@ -16,7 +16,7 @@ import math # from raps.helpers import check_python_version # -from raps.config import get_system_config +from raps.system_config import get_system_config from raps.constants import OUTPUT_PATH, SEED from raps.cooling import ThermoFluidsModel from raps.ui import LayoutManager @@ -45,7 +45,7 @@ from raps.stats import ( print_formatted_report ) -from raps.args import args, args_dict +from raps.sim_config import args, args_dict check_python_version() diff --git a/multi-part-sim-mpi.py b/multi-part-sim-mpi.py index fed038e2f329074c53898cfd185c8fce9328ba52..eabb19b002f28f97329ed13eed7261bcf811912e 100644 --- a/multi-part-sim-mpi.py +++ b/multi-part-sim-mpi.py @@ -8,18 +8,16 @@ stats for heterogeneous systems (e.g., LUMI, Setonix, Adastra). from tqdm import tqdm from mpi4py import MPI -from raps.utils import convert_to_seconds, next_arrival +from raps.utils import next_arrival from raps.workload import Workload from raps.telemetry import Telemetry from raps.power import PowerManager, compute_node_power from raps.flops import FLOPSManager from raps.engine import Engine from raps.ui import LayoutManager -from raps.config import get_system_config, CONFIG_PATH -from args import args +from raps.system_config import get_partition_configs +from raps.sim_config import args import random -import os -import glob from raps.helpers import check_python_version check_python_version() @@ -29,20 +27,10 @@ def main(): rank = comm.Get_rank() size = comm.Get_size() - # 1) Expand “partitions” (on rank 0) if the user used a glob: - if rank == 0: - partition_names = args.partitions - if '*' in partition_names[0]: - paths = glob.glob(os.path.join(CONFIG_PATH, partition_names[0])) - partition_names = [os.path.join(*p.split(os.sep)[-2:]) for p in paths] - else: - partition_names = None - - # 2) Broadcast the final list of partition_names to everyone - partition_names = comm.bcast(partition_names, root=0) - # 3) Load configs for every partition (all ranks do this) - configs = [get_system_config(p).get_legacy() for p in partition_names] + multi_config = get_partition_configs(args.partitions) + partition_names = multi_config.partition_names + configs = [c.get_legacy() for c in multi_config.partitions] args_dicts = [{**vars(args), 'config': cfg} for cfg in configs] # 4) Each rank decides which partition‐indices it owns (round-robin): @@ -122,12 +110,12 @@ def main(): # 9) Compute timestep_start / timestep_end (all ranks agree): if args.fastforward: - fastforward = convert_to_seconds(args.fastforward) + fastforward = args.fastforward else: fastforward = 0 if args.time: - timesteps = convert_to_seconds(args.time) + timesteps = args.time else: timesteps = 88200 # default 24 hours diff --git a/multi-part-sim.py b/multi-part-sim.py index 350462e48538a380e16a0f5ca632ae5aedf3dfd9..587dffb7d93f42dd0880f6931e548066b5d3e7d8 100644 --- a/multi-part-sim.py +++ b/multi-part-sim.py @@ -9,33 +9,27 @@ statistics for systems such as MIT Supercloud, Setonix, Adastra, and LUMI. from tqdm import tqdm from raps.stats import get_engine_stats, get_job_stats, get_scheduler_stats, get_network_stats -from raps.utils import convert_to_seconds, next_arrival +from raps.utils import next_arrival from raps.workload import Workload from raps.telemetry import Telemetry from raps.power import PowerManager, compute_node_power from raps.flops import FLOPSManager from raps.engine import Engine from raps.ui import LayoutManager -from raps.config import get_system_config, CONFIG_PATH -from raps.args import args +from raps.system_config import get_partition_configs +from raps.sim_config import args import random import os -import glob from raps.helpers import check_python_version check_python_version() # Load configurations for each partition -partition_names = args.partitions +multi_config = get_partition_configs(args.partitions) +partition_names = multi_config.partition_names +configs = [c.get_legacy() for c in multi_config.partitions] +args.system = multi_config.system_name -print(args.partitions) -if '*' in args.partitions[0]: - paths = glob.glob(os.path.join(CONFIG_PATH, args.partitions[0].replace("'", ""))) - partition_names = [os.path.join(*p.split(os.sep)[-2:]) for p in paths] - - args.system = partition_names[0].split(os.sep)[0] - -configs = [get_system_config(partition).get_legacy() for partition in partition_names] args_dicts = [ {**vars(args), 'config': config, 'partition': partition_names[i]} for i, config in enumerate(configs) @@ -123,11 +117,11 @@ for i, (config, ad) in enumerate(zip(configs, args_dicts)): # Set simulation timesteps if args.fastforward: - fastfoward = convert_to_seconds(args.fastforward) + fastfoward = args.fastforward else: fastforward = 0 if args.time: - timesteps = convert_to_seconds(args.time) + timesteps = args.time else: timesteps = 88200 # Default to 24 hours @@ -135,7 +129,7 @@ timestep_start = fastforward timestep_end = timestep_start + timesteps if args.time_delta: - time_delta = convert_to_seconds(args.time_delta) + time_delta = args.time_delta else: time_delta = config['TRACE_QUANTA'] diff --git a/pyproject.toml b/pyproject.toml index d46a62103e1409ad0ff8dc6806b112d9588a292b..b7fbb99fdf5ee3848be4f37d06c637b36d81da80 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,7 +5,7 @@ build-backend = "hatchling.build" [project] name = "raps" version = "0.0.1" -requires-python = ">=3.11" +requires-python = ">=3.12" description = "RAPS" readme = "README.md" # license = {file = "LICENSE.txt"} diff --git a/raps/args.py b/raps/args.py deleted file mode 100644 index 9c4142b0a9fc077c27b55bd9baa25d79849f4886..0000000000000000000000000000000000000000 --- a/raps/args.py +++ /dev/null @@ -1,310 +0,0 @@ -import argparse -import os -import sys -import yaml -from raps.schedulers.default import PolicyType, BackfillType - -from raps.workload import add_workload_to_parser, check_workload_args -from raps.utils import convert_to_seconds - - -def load_config(path): - if path and os.path.exists(path): - with open(path, "r") as f: - return yaml.safe_load(f) or {} - return {} - - -def _expand_path(p): - if isinstance(p, str): - # expand ~ and $VARS - return os.path.expanduser(os.path.expandvars(p)) - return p - - -def apply_config_to_args(cfg, args): - # Merge supported sections or top-level keys - merged = {} - for k, v in (cfg or {}).items(): - if isinstance(v, dict) and k in { - "shared", "simulate", "telemetry", "scheduler", "output" - }: - merged.update(v) - else: - # Enter the commandline argument, but _underscores as the -dashes - # are replaced when reading from the commandline, but not in the yaml. - merged[k.replace('-', '_')] = v - - # Apply to argparse namespace - for k, v in merged.items(): - setattr(args, k, v) - - # Coerce certain keys to lists if YAML provided strings - list_keys = { - "cluster_var", "output_vars", "input_vars", "partitions", "plot" - } - for key in list_keys: - if hasattr(args, key): - val = getattr(args, key) - if isinstance(val, str): - setattr(args, key, [val]) - - # Expand paths (tilde + env vars) - for key in ("path", "output_dir", "plot_dir", "config_file"): - if hasattr(args, key): - setattr(args, key, _expand_path(getattr(args, key))) - - # Normalize enums if provided as strings in YAML - if getattr(args, "policy", None): - try: - # Accept exact values or case-insensitive - val = str(args.policy) - opts = {p.value.lower(): p.value for p in PolicyType} - if val.lower() in opts: - args.policy = opts[val.lower()] - except Exception: - pass - - if getattr(args, "backfill", None): - try: - val = str(args.backfill) - opts = {b.value.lower(): b.value for b in BackfillType} - if val.lower() in opts: - args.backfill = opts[val.lower()] - except Exception: - pass - - -parser = argparse.ArgumentParser( - description="Resource Allocator & Power Simulator (RAPS)", - formatter_class=argparse.ArgumentDefaultsHelpFormatter, -) -parser.add_argument( - "config_file", nargs="?", default=None, - help="YAML config file; overrides defaults/flags." -) - -# System configurations -parser.add_argument("--system", type=str, default="frontier", - help="System config to use") -parser.add_argument( - "-x", "--partitions", nargs="+", default=None, - help="List of machine configurations, e.g., -x setonix-cpu setonix-gpu" -) -parser.add_argument("-c", "--cooling", action="store_true", - help="Include FMU cooling model") -parser.add_argument("-net", "--simulate-network", default=False, - action="store_true", help="Include Network model") -parser.add_argument("--noui", default=False, action="store_true", - help="Run without UI") - -# Simulation runtime options -parser.add_argument("-ff", "--fastforward", type=str, default=None, - help="Fast-forward by time amount (uses same units as -t)") -parser.add_argument("-t", "--time", type=str, default=None, - help="Length of time to simulate, e.g., 123, 27m, 3h, 7d") -parser.add_argument("--time-delta", type=str, default="1s", - help="Step size, e.g., 15s, 1m, 1h, 1ms (default: 1s)") -parser.add_argument("-d", "--debug", action="store_true", - help="Enable debug mode and disable rich layout") -parser.add_argument("-n", "--numjobs", type=int, default=100, - help="Number of jobs to schedule") -parser.add_argument("-v", "--verbose", action="store_true", - help="Enable verbose output") -parser.add_argument("--start", type=str, default="2021-05-21T13:00", - help="ISO8601 start of simulation") -parser.add_argument("--end", type=str, default="2021-05-21T14:00", - help="ISO8601 end of simulation") -parser.add_argument("--seed", action="store_true", - help="Set RNG seed for deterministic simulation") -parser.add_argument( - "-u", "--uncertainties", action="store_true", - help=("Use float-with-uncertainties (much slower).") -) - -# UI -ui_layout_choices = ["layout1", "layout2"] -parser.add_argument("--layout", type=str, choices=ui_layout_choices, - default=ui_layout_choices[0], help="UI layout") - -# Output -parser.add_argument('-o', '--output', type=str, nargs="?", - const="", # Used if -o is given without a value - default=None, # Used if -o is not provided at all - help=("Output power, cooling, and loss models for later " - "analysis. Argumment specifies name.") - ) -plot_choices = ["power", "loss", "pue", "temp", "util"] -parser.add_argument("-p", "--plot", nargs="+", choices=plot_choices, - help="Plots to generate") -img_choices = ["png", "svg", "jpg", "pdf", "eps"] -parser.add_argument("--imtype", type=str, choices=img_choices, - default=img_choices[0], help="Plot image type") - -# Telemetry -parser.add_argument( - "-f", "--replay", nargs="+", type=str, - help=("Either: path/to/joblive path/to/jobprofile OR filename.npz " - "(overrides --workload)") -) -parser.add_argument("-e", "--encrypt", action="store_true", - help="Encrypt sensitive data in telemetry") -parser.add_argument("--validate", action="store_true", - help="Use node power instead of CPU/GPU utilizations") -parser.add_argument("--jid", type=str, default="*", - help="Replay job id") -parser.add_argument("--scale", type=int, default=0, - help=("Scale telemetry to a smaller target system, " - "e.g., --scale 192")) -parser.add_argument("--live", action="store_true", - help="Grab data from live system.") - - -# Synthetic workloads -parser = add_workload_to_parser(parser) - -# Scheduling -sched_choices = ["default", "scheduleflow", "nrel", "anl", "flux", - "experimental", "multitenant"] -parser.add_argument("--scheduler", type=str, choices=sched_choices, - default=sched_choices[0], help="Scheduler name") -parser.add_argument("--policy", type=str, default=None, - help=f"Schedule policy: {[p.value for p in PolicyType]}") -parser.add_argument("--backfill", type=str, default=None, - help=f"Backfill policy: {[b.value for b in BackfillType]}") - -# Arrival -arr_choices = ["prescribed", "poisson"] -parser.add_argument("--arrival", default=arr_choices[0], type=str, - choices=arr_choices, - help=("Modify arrival distribution (poisson) or use " - "original submit times (prescribed)")) -parser.add_argument("--job-arrival-time", type=int, - help=("Poisson arrival (seconds). Overrides " - "config/*/scheduler.json")) -parser.add_argument("--job-arrival-rate", type=float, - help="Modify Poisson rate (default 1)") - -# Accounts -parser.add_argument("--accounts", action="store_true", - help="Track accounts") -parser.add_argument("--accounts-json", type=str, - help="Accounts JSON from previous run") - -# Downtime -parser.add_argument("--downtime-first", type=str, default=None, - help="First downtime, e.g., after 123, 27m, 3h, 7d") -parser.add_argument("--downtime-interval", type=str, default=None, - help="Interval between downtimes, e.g., every 123, 27m, 3h, 7d") -parser.add_argument("--downtime-length", type=str, default=None, - help="Downtime length, e.g., 123, 27m, 3h, 7d") - -# Continous Job Generation -parser.add_argument("--continuous-job-generation", action="store_true", - help="Activate continuous job generation.") -parser.add_argument("--maxqueue", type=int, default=50, - help="Specify the max queue length for continuous job generation.") - - -def post_process_args(args): - if args.time_delta: - tdelta_raw, tdelta_down = convert_to_seconds(args.time_delta) - else: - tdelta_raw, tdelta_down = None, 1 - - if args.time: - time_raw, time_down = convert_to_seconds(args.time) - else: - time_raw, time_down = None, 1 - - if args.fastforward: - ff_raw, ff_down = convert_to_seconds(args.fastforward) - else: - ff_raw, ff_down = None, 1 - - if args.downtime_first: - dtf_raw, dtf_down = convert_to_seconds(args.downtime_first) - if args.downtime_interval: - dti_raw, dti_down = convert_to_seconds(args.downtime_interval) - if args.downtime_length: - dtl_raw, dtl_down = convert_to_seconds(args.downtime_length) - - max_down = max(tdelta_down, time_down, ff_down) - args.downscale = max_down - - if args.time_delta: - args.time_delta = int((tdelta_raw / tdelta_down) * max_down) - if args.time: - args.time = int((time_raw / time_down) * max_down) - if args.fastforward: - args.fastforward = int((ff_raw / ff_down) * max_down) - - if args.downtime_first: - args.downtime_first = int((dtf_raw / dtf_down) * max_down) - if args.downtime_interval: - args.downtime_interval = int((dti_raw / dti_down) * max_down) - if args.downtime_length: - args.downtime_length = int((dtl_raw / dtl_down) * max_down) - - return args - - -# ---- Parse + YAML merge ---- -args = parser.parse_args() - -# Config file existence check -if args.config_file and not os.path.isfile(args.config_file): - print(f"Error: '{args.config_file}' not found.", file=sys.stderr) - sys.exit(1) - -cfg = load_config(args.config_file) - -apply_config_to_args(cfg, args) - -# Optional: format fileprefix after config merge (if provided by workload parser) -if hasattr(args, "fileprefix") and isinstance(args.fileprefix, str): - try: - args.fileprefix = args.fileprefix.format(**vars(args)) - except KeyError as e: - print(f"Warning: missing placeholder {e} in fileprefix; skipping.") - -# Expand paths inside list fields (e.g., replay) -if getattr(args, "replay", None): - if isinstance(args.replay, str): - args.replay = [args.replay] - args.replay = [_expand_path(p) for p in args.replay] - -# Prefer replay if both replay and workload got set -if getattr(args, "replay", None) and getattr(args, "workload", None): - print("Info: --replay provided; ignoring --workload.", file=sys.stderr) - print("Info: --replay provided; ignoring --workload.", file=sys.stderr) - args.workload = None - -# Enforce valid policy/backfill values (after normalization in apply_config_to_args) -if getattr(args, "policy", None): - _valid_policies = {p.value for p in PolicyType} - if args.policy not in _valid_policies: - sys.exit(f"Error: Unknown policy '{args.policy}'. " - f"Valid: {sorted(_valid_policies)}") -if getattr(args, "backfill", None): - _valid_backfills = {b.value for b in BackfillType} - if args.backfill not in _valid_backfills: - sys.exit(f"Error: Unknown backfill '{args.backfill}'. " - f"Valid: {sorted(_valid_backfills)}") - -# Multi-partition guard for single-part driver (check merged args incl. CLI) -if os.path.basename(sys.argv[0]) == "main.py": - _parts = args.partitions or [] - if isinstance(_parts, str): - _parts = [_parts] - if len(_parts) > 1: - sys.exit("Error: Use multi-part-sim.py for multi-partition runs.") - -# Validate workload args before time conversions -check_workload_args(args) - -# Convert time-like args and compute downscale -args = post_process_args(args) - -# Expose dict form -args_dict = vars(args) diff --git a/raps/dataloaders/adastraMI250.py b/raps/dataloaders/adastraMI250.py index f0f576b0fe667ccfb5937b0e97ae63c8a9b3ab56..90201c84caba695e419092f1f1661a6df7687760 100644 --- a/raps/dataloaders/adastraMI250.py +++ b/raps/dataloaders/adastraMI250.py @@ -12,7 +12,7 @@ python main.py -f /path/to/AdastaJobsMI250_15days.parquet --system adastraMI250 --policy priority --backfill easy # to fast-forward 60 days and replay for 1 day - python main.py -f /path/to/AdastaJobsMI250_15days.parquet --system adastraMI250 -ff 60d -t 1d + python main.py -f /path/to/AdastaJobsMI250_15days.parquet --system adastraMI250 --ff 60d -t 1d # to analyze dataset python -m raps.telemetry -f /path/to/AdastaJobsMI250_15days.parquet --system adastraMI250 -v diff --git a/raps/dataloaders/frontier.py b/raps/dataloaders/frontier.py index 506dab9bb83bcffd3ffc3ff5f7616583d85e4ee8..8491617e8886afbfda35930ddcd49cf0e24a0e9a 100644 --- a/raps/dataloaders/frontier.py +++ b/raps/dataloaders/frontier.py @@ -4,12 +4,11 @@ # To simulate DATEDIR="date=2024-01-18" DPATH=/path/to/data - python main.py -f $DPATH/slurm/joblive/$DATEDIR $DPATH/jobprofile/$DATEDIR + python main.py -f $DPATH/slurm/joblive/$DATEDIR,$DPATH/jobprofile/$DATEDIR # To analyze the data - python -m raps.telemetry -f $DPATH/slurm/joblive/$DATEDIR $DPATH/jobprofile/$DATEDIR + python -m raps.telemetry -f $DPATH/slurm/joblive/$DATEDIR,$DPATH/jobprofile/$DATEDIR """ -import ast import time import numpy as np import pandas as pd diff --git a/raps/dataloaders/lassen.py b/raps/dataloaders/lassen.py index f777f7957f13c463ddd5c915688c31996fe39438..c9aae0d4411e0c786ef11a12c25cb18d51b056af 100644 --- a/raps/dataloaders/lassen.py +++ b/raps/dataloaders/lassen.py @@ -23,7 +23,7 @@ Usage Instructions: python main.py -f /path/to/LAST/Lassen-Supercomputer-Job-Dataset --system lassen --arrival poisson # to fast-forward 365 days and replay for 1 day. This region day has 2250 jobs with 1650 jobs executed. - python main.py -f /path/to/LAST/Lassen-Supercomputer-Job-Dataset --system lassen -ff 365d -t 1d + python main.py -f /path/to/LAST/Lassen-Supercomputer-Job-Dataset --system lassen --ff 365d -t 1d # For the network replay this command gives suiteable snapshots: python main.py -f /path/to/LAST/Lassen-Supercomputer-Job-Dataset --system lassen --policy fcfs --backfill firstfit -t 12h --arrival poisson # noqa @@ -38,7 +38,7 @@ from tqdm import tqdm from datetime import timedelta from ..job import job_dict, Job -from ..utils import power_to_utilization, next_arrival_byconfkwargs, convert_to_seconds +from ..utils import power_to_utilization, next_arrival_byconfkwargs, parse_td def load_data(path, **kwargs): @@ -80,7 +80,7 @@ def load_data_from_df(allocation_df, node_df, step_df, **kwargs): time_to_simulate = 31536000 # a year time_to_simulate_timedelta = timedelta(seconds=time_to_simulate) # timedelta else: - time_to_simulate_timedelta = timedelta(seconds=convert_to_seconds(time_to_simulate)) # timedelta + time_to_simulate_timedelta = parse_td(time_to_simulate) # timedelta telemetry_start_timestamp = allocation_df['begin_timestamp'].min() telemetry_start_time = 0 @@ -190,7 +190,7 @@ def load_data_from_df(allocation_df, node_df, step_df, **kwargs): ib_tx_per_node = total_ib_tx / n # average bytes per node ib_rx_per_node = total_ib_rx / n # average bytes per node - # net_tx, net_rx = [],[] # generate_network_sequences generates errors (e.g. -ff 800d -t 1d ) + # net_tx, net_rx = [],[] # generate_network_sequences generates errors (e.g. --ff 800d -t 1d ) # net_tx, net_rx = generate_network_sequences(ib_tx, ib_rx, samples, lambda_poisson=0.3) net_tx, net_rx = throughput_traces(ib_tx_per_node, ib_rx_per_node, samples) diff --git a/raps/dataloaders/marconi100.py b/raps/dataloaders/marconi100.py index 6222c7c9845ab14b3e4151ce31008bcd66028747..fef8ec0357547f2f2e05519e42c998cedd9c154a 100644 --- a/raps/dataloaders/marconi100.py +++ b/raps/dataloaders/marconi100.py @@ -16,7 +16,7 @@ python main.py -f /path/to/job_table.parquet --system marconi100 --policy priority --backfill firstfit # to fast-forward 60 days and replay for 1 day - python main.py -f /path/to/job_table.parquet --system marconi100 -ff 60d -t 1d + python main.py -f /path/to/job_table.parquet --system marconi100 --ff 60d -t 1d # to analyze dataset python -m raps.telemetry -f /path/to/job_table.parquet --system marconi100 -v diff --git a/raps/downtime.py b/raps/downtime.py index 7c5bf1fea70722889070ff35cfb4ea002e9dd715..97c9139114a004ed7c275852c4eddbd30b57b69c 100644 --- a/raps/downtime.py +++ b/raps/downtime.py @@ -1,7 +1,7 @@ from __future__ import annotations from typing import TYPE_CHECKING from raps.job import JobState -from raps.args import args +from raps.sim_config import args, sim_config import numpy as np diff --git a/raps/raps_config.py b/raps/raps_config.py new file mode 100644 index 0000000000000000000000000000000000000000..6eddca80a7507f3fdc782fedee4d522d5d582319 --- /dev/null +++ b/raps/raps_config.py @@ -0,0 +1,35 @@ +from pathlib import Path +from raps.utils import ExpandedPath +from pydantic_settings import BaseSettings, SettingsConfigDict, YamlConfigSettingsSource +ROOT_DIR = Path(__file__).parent.parent + + +class RapsConfig(BaseSettings): + """ + General settings for raps. Pydantic will automatically populate this model from env vars or a + .env file. + """ + # TODO I think we should move more of general/ui related settings from SimConfig into here. + # We'll be using SimConfig in the simulation server and those settings aren't applicable there, + # so it makes sense to keep SimConfig scoped to the logical operation of the sim. + + system_config_dir: ExpandedPath = ROOT_DIR / 'config' + """ Directory containing system configuration files """ + + model_config = SettingsConfigDict( + yaml_file="raps_config.yaml", + env_prefix='raps_', + env_nested_delimiter='__', + nested_model_default_partial_update=True, + ) + + # Customize setting sources, we'll use yaml config file instead of the default .env + @classmethod + def settings_customise_sources( + cls, settings_cls, + init_settings, env_settings, dotenv_settings, file_secret_settings, + ): + return (init_settings, env_settings, YamlConfigSettingsSource(settings_cls),) + + +raps_config = RapsConfig() diff --git a/raps/sim_config.py b/raps/sim_config.py new file mode 100644 index 0000000000000000000000000000000000000000..127cec302babc4833ce93df48dd1201cddf03c38 --- /dev/null +++ b/raps/sim_config.py @@ -0,0 +1,331 @@ +import argparse +import sys +import yaml +from datetime import timedelta +from pathlib import Path +from typing import Literal +from raps.schedulers.default import PolicyType, BackfillType + +from raps.utils import ( + parse_time_unit, convert_to_time_unit, infer_time_unit, ExpandedPath, + pydantic_add_args, yaml_dump, parse_td, +) + +from pydantic import BaseModel, model_validator, computed_field +from pydantic_settings import SettingsConfigDict + +Distribution = Literal['uniform', 'weibull', 'normal'] + + +class SimConfig(BaseModel): + system: str | None = None + """ System config to use """ + partitions: list[str] = [] + """ List of multiple system configurations for a multi-partition run. Can contain wildcards """ + + cooling: bool = False + """ Include the FMU cooling model """ + simulate_network: bool = False + """ Include network model """ + + # Simulation runtime options + fastforward: int | None = None + """ + Fast-forward by time amount (unit specified by `time_unit`, default seconds). + Can pass a string like 15s, 1m, 1h + """ + time: int | None = None + """ + Length of time to simulate (unit specified by `time_unit`, default seconds). + Can pass a string like 123, 27m, 3h, 7d + """ + time_delta: int = 1 + """ + Step size (unit specified by `time_unit`, default seconds). + Can pass a string like 15s, 1m, 1h, 1ms + """ + time_unit: timedelta + """ + Units all time delta ints are measured in (default seconds) + """ + + @computed_field + @property + def downscale(self) -> int: + return int(timedelta(seconds=1) / self.time_unit) + + start: str = "2021-05-21T13:00" + """ ISO8601 start of simulation """ + end: str = "2021-05-21T14:00" + """ ISO8601 end of simulation """ + + numjobs: int = 100 + """ Number of jobs to schedule """ + + uncertainties: bool = False + """ Use float-with-uncertainties (much slower) """ + + seed: bool = False + """ Set RNG seed for deterministic simulation """ + output: ExpandedPath | None = None + """ Output power, cooling, and loss models for later analysis. Argument specifies name. """ + + debug: bool = False + """ Enable debug mode and disable rich layout """ + noui: bool = False + """ Run without UI """ + verbose: bool = False + """ Enable verbose output """ + layout: Literal["layout1", "layout2"] = "layout1" + """ UI layout """ + plot: list[Literal["power", "loss", "pue", "temp", "util"]] | None = None + """ Plots to generate """ + + imtype: Literal["png", "svg", "jpg", "pdf", "eps"] = "png" + """ Plot image type """ + + replay: list[ExpandedPath] | None = None + """ Either: path/to/joblive path/to/jobprofile OR filename.npz """ + + encrypt: bool = False + """ Encrypt sensitive data in telemetry """ + + power_scope: Literal['node', 'chip'] = "chip" + """ node mode will use node power instead of CPU/GPU utilizations """ + + jid: str = "*" + """ Replay job id """ + + scale: int = 0 + """ Scale telemetry to a smaller target system, --scale 192 """ + + live: bool = False + """ Grab data from live system. """ + + # Workload arguments (TODO split into separate model) + workload: Literal['random', 'benchmark', 'peak', 'idle', 'synthetic', 'multitenant'] | None = None + + """ Type of synthetic workload """ + multimodal: list[float] = [1.0] + """ + Percentage to draw from each distribution (list of floats). e.g. '0.2 0.8' percentages apply + in order to the list of the --distribution argument list. + """ + # Jobsize + jobsize_distribution: list[Distribution] | None = None + """ Distribution type """ + jobsize_normal_mean: float | None = None + """ Mean (mu) for Normal distribution """ + jobsize_normal_stddev: float | None = None + """ Standard deviation (sigma) for Normal distribution """ + jobsize_weibull_shape: float | None = None + """ Jobsize shape of weibull """ + jobsize_weibull_scale: float | None = None + """ Jobsize scale of weibull """ + jobsize_is_of_degree: int | None = None + """ Draw jobsizes from distribution of degree N (squared,cubed). """ + jobsize_is_power_of: int | None = None + """ Draw jobsizes from distribution of power of N (2->2^x,3->3^x). """ + + # Walltime + walltime_distribution: list[Distribution] | None = None + """ Distribution type """ + walltime_normal_mean: float | None = None + """ Walltime mean (mu) for Normal distribution """ + walltime_normal_stddev: float | None = None + """ Walltime standard deviation (sigma) for Normal distribution """ + walltime_weibull_shape: float | None = None + """ Walltime shape of weibull """ + walltime_weibull_scale: float | None = None + """ Walltime scale of weibull """ + # Utilizations (TODO should probably make a reusable "Distribution" submodel) + cpuutil_distribution: list[Distribution] = ['uniform'] + """ Distribution type """ + cpuutil_normal_mean: float | None = None + """ Walltime mean (mu) for Normal distribution """ + cpuutil_normal_stddev: float | None = None + """ Walltime standard deviation (sigma) for Normal distribution """ + cpuutil_weibull_shape: float | None = None + """ Walltime shape of weibull """ + cpuutil_weibull_scale: float | None = None + """ Walltime scale of weibull """ + gpuutil_distribution: list[Distribution] = ['uniform'] + """ Distribution type """ + gpuutil_normal_mean: float | None = None + """ Walltime mean (mu) for Normal distribution """ + gpuutil_normal_stddev: float | None = None + """ Walltime standard deviation (sigma) for Normal distribution """ + gpuutil_weibull_shape: float | None = None + """ Walltime shape of weibull """ + gpuutil_weibull_scale: float | None = None + """ Walltime scale of weibull """ + gantt_nodes: bool = False + """ Print Gannt with nodes required as line thickness (default false) """ + + # Synthetic workloads + scheduler: Literal[ + "default", "scheduleflow", "nrel", "anl", "flux", "experimental", "multitenant", + ] = "default" + """ Scheduler name """ + policy: PolicyType | None = None + """ Schedule policy """ + backfill: BackfillType | None = None + """ Backfill policy """ + + # Arrival + arrival: Literal["prescribed", "poisson"] = "prescribed" + """ Modify arrival distribution (poisson) or use original submit times (prescribed) """ + job_arrival_time: int | None = None + """ Poisson arrival (seconds). Overrides system config scheduler.job_arrival_time """ + job_arrival_rate: float | None = None # TODO define default here + """ Modify Poisson rate (default 1) """ + + # Accounts + accounts: bool = False + accounts_json: ExpandedPath | None = None + """ Path to accounts JSON file from previous run """ + + # Downtime + downtime_first: int | None = None + """ + First downtime (unit specified by `time_unit`, default seconds). + Can pass a string like 27m, 3h, 7d + """ + downtime_interval: str | None = None + """ + Interval between downtimes (unit specified by `time_unit`, default seconds). + Can pass a string like 123, 27m, 3h, 7d + """ + downtime_length: str | None = None + """ + Downtime length (unit specified by `time_unit`, default seconds). + Can pass a string like 123, 27m, 3h, 7d + """ + + # Continous Job Generation + continuous_job_generation: bool = False + """ Activate continuous job generation """ + maxqueue: int = 50 + """ Specify the max queue length for continuous job generation """ + + @model_validator(mode="before") + def _parse_times(cls, data): + time_fields = [ + "time_delta", "time", "fastforward", + "downtime_first", "downtime_interval", "downtime_length", + ] + + if data.get('time_unit') is not None: + time_unit = parse_time_unit(data['time_unit']) + input_time_unit = time_unit + else: + time_unit = min( + [infer_time_unit(data[f]) for f in time_fields if data.get(f)], + default=timedelta(seconds=1) + ) + # When "inferring" time unit interpret raw numbers as seconds. + # E.g. `-t 10 --time-delta 1ds` should be `-t 10s --time-delta 1ds` + input_time_unit = timedelta(seconds=1) + + data['time_unit'] = time_unit + for field in time_fields: + if data.get(field) is not None: + td = parse_td(data[field], input_time_unit) + data[field] = convert_to_time_unit(td, time_unit) + + return data + + @model_validator(mode="after") + def _validate(self): + if self.system and self.partitions: + raise ValueError("system and partitions are mutually exclusive") + elif not self.system and not self.partitions: + self.system = "frontier" + + if not self.replay and not self.workload: + self.workload = "random" + + if self.jobsize_is_power_of is not None and self.jobsize_is_of_degree is not None: + raise ValueError("jobsize_is_power_of and jobsize_is_of_degree are mutually exclusive") + + return self + + def get_legacy_args(self): + """ + Return as an argparse.Namespace object for backwards compatability + """ + return argparse.Namespace(**self.get_legacy_args_dict()) + + def get_legacy_args_dict(self): + """ + Return as a dict object. This is for backwards compatibility with the rest of RAPS code so + we can migrate to the new config gradually. The dict also has a "sim_config" key that + contains the SimConfig object itself. + """ + args_dict = self.model_dump(mode="json") + # validate has been renamed to power_scope + args_dict['validate'] = args_dict["power_scope"] == "node" + + # Convert Path objects to str + if args_dict['output']: + args_dict['output'] = str(args_dict['output']) + if args_dict['replay']: + args_dict['replay'] = [str(p) for p in args_dict['replay']] + if args_dict['accounts_json']: + args_dict['accounts_json'] = str(args_dict['accounts_json']) + + args_dict['sim_config'] = self + return args_dict + + +def parse_args(cli_args=None) -> SimConfig: + parser = argparse.ArgumentParser( + description="Resource Allocator & Power Simulator (RAPS)", + allow_abbrev=False, + formatter_class=argparse.ArgumentDefaultsHelpFormatter, + ) + parser.add_argument( + "config_file", nargs="?", default=None, + help=( + 'YAML sim config file, can be used to configure an experiment instead of using CLI ' + + 'flags. Pass "-" to read from stdin.' + ) + ) + + model_validate_args = pydantic_add_args(parser, SimConfig, model_config=SettingsConfigDict( + cli_implicit_flags=True, + cli_kebab_case=True, + cli_shortcuts={ + "partitions": "x", + "cooling": "c", + "simulate-network": "net", + "fastforward": "ff", + "time": "t", + "debug": "d", + "numjobs": "n", + "verbose": "v", + "output": "o", + "uncertainties": "u", + "plot": "p", + "replay": "f", + "workload": "w", + }, + )) + + args = parser.parse_args(cli_args) + if args.config_file == "-": + config_file_data = yaml.safe_load(sys.stdin.read()) + elif args.config_file: + config_file_data = yaml.safe_load(Path(args.config_file).read_text()) + else: + config_file_data = {} + + return model_validate_args(args, config_file_data) + + +sim_config = parse_args() +args = sim_config.get_legacy_args() +args_dict = sim_config.get_legacy_args_dict() + +if __name__ == "__main__": + print(yaml_dump(sim_config.model_dump(mode="json"))) diff --git a/raps/config.py b/raps/system_config.py similarity index 54% rename from raps/config.py rename to raps/system_config.py index 4f9f7097c8874edba1406432c1eb63c80414fb93..e458c68fb01af2b46855a50ff9d1569b2475b62e 100644 --- a/raps/config.py +++ b/raps/system_config.py @@ -1,14 +1,15 @@ -import os, functools +import functools +import glob +import fnmatch from typing import Any, Literal from pathlib import Path import yaml -from pydantic import BaseModel, computed_field, model_validator - -ROOT_DIR = Path(__file__).parent.parent -CONFIG_PATH = Path(os.environ.get("RAPS_CONFIG", ROOT_DIR / 'config')).resolve() +from pydantic import BaseModel, computed_field, model_validator, field_validator +from raps.raps_config import raps_config # Define Pydantic models for the config to handle parsing and validation + class SystemSystemConfig(BaseModel): num_cdus: int racks_per_cdu: int @@ -27,8 +28,8 @@ class SystemSystemConfig(BaseModel): gpu_peak_flops: float cpu_fp_ratio: float gpu_fp_ratio: float - threads_per_core: int|None = None - cores_per_cpu: int|None = None + threads_per_core: int | None = None + cores_per_cpu: int | None = None @model_validator(mode='after') def _update_down_nodes(self): @@ -76,15 +77,16 @@ class SystemSystemConfig(BaseModel): def available_nodes(self) -> int: return self.total_nodes - len(self.down_nodes) + class SystemPowerConfig(BaseModel): power_gpu_idle: float power_gpu_max: float power_cpu_idle: float power_cpu_max: float power_mem: float - power_nic: float|None = None - power_nic_idle: float|None = None - power_nic_max: float|None = None + power_nic: float | None = None + power_nic_idle: float | None = None + power_nic_max: float | None = None power_nvme: float power_switch: float power_cdu: float @@ -96,6 +98,7 @@ class SystemPowerConfig(BaseModel): rectifier_efficiency: float power_cost: float + class SystemUqConfig(BaseModel): power_gpu_uncertainty: float power_cpu_uncertainty: float @@ -107,8 +110,10 @@ class SystemUqConfig(BaseModel): power_switch_uncertainty: float rectifier_power_uncertainty: float + JobEndStates = Literal["COMPLETED", "FAILED", "CANCELLED", "TIMEOUT", "NODE_FAIL"] + class SystemSchedulerConfig(BaseModel): job_arrival_time: int mtbf: int @@ -120,11 +125,12 @@ class SystemSchedulerConfig(BaseModel): job_end_probs: dict[JobEndStates, float] multitenant: bool = False + class SystemCoolingConfig(BaseModel): cooling_efficiency: float wet_bulb_temp: float - zip_code: str|None = None - country_code: str|None = None + zip_code: str | None = None + country_code: str | None = None fmu_path: str fmu_column_mapping: dict[str, str] w_htwps_key: str @@ -132,70 +138,91 @@ class SystemCoolingConfig(BaseModel): w_cts_key: str temperature_keys: list[str] + class SystemNetworkConfig(BaseModel): topology: Literal["fat-tree", "dragonfly", "torus3d"] network_max_bw: float - latency: float|None = None + latency: float | None = None + + fattree_k: int | None = None - fattree_k: int|None = None + dragonfly_d: int | None = None + dragonfly_a: int | None = None + dragonfly_p: int | None = None - dragonfly_d: int|None = None - dragonfly_a: int|None = None - dragonfly_p: int|None = None + torus_x: int | None = None + torus_y: int | None = None + torus_z: int | None = None + torus_wrap: bool | None = None + torus_link_bw: float | None = None + torus_routing: str | None = None - torus_x: int|None = None - torus_y: int|None = None - torus_z: int|None = None - torus_wrap: bool|None = None - torus_link_bw: float|None = None - torus_routing: str|None = None + hosts_per_router: int | None = None + latency_per_hop: float | None = None + node_coords_csv: str | None = None - hosts_per_router: int|None = None - latency_per_hop: float|None = None - node_coords_csv: str|None = None class SystemConfig(BaseModel): system_name: str + """ Name of the system, defaults to the yaml file name """ + system: SystemSystemConfig power: SystemPowerConfig scheduler: SystemSchedulerConfig - uq: SystemUqConfig|None = None - cooling: SystemCoolingConfig|None = None - network: SystemNetworkConfig|None = None + uq: SystemUqConfig | None = None + cooling: SystemCoolingConfig | None = None + network: SystemNetworkConfig | None = None def get_legacy(self) -> dict[str, Any]: """ Return the system config as a flattened, uppercased dict. This is for backwards compatibility with the rest of RAPS code so we can migrate to the new config format - gradually. The dict also as a "config" key that contains the SystemConfig object itself. + gradually. The dict also as a "system_config" key that contains the SystemConfig object + itself. """ - renames = { # fields that need to be renamed to something other than just .upper() + renames = { # fields that need to be renamed to something other than just .upper() "system_name": "system_name", "w_htwps_key": "W_HTWPs_KEY", "w_ctwps_key": "W_CTWPs_KEY", "w_cts_key": "W_CTs_KEY", "multitenant": "multitenant", } - dump = self.model_dump(mode = "json", exclude_none = True) + dump = self.model_dump(mode="json", exclude_none=True) config_dict: dict[str, Any] = {} - for k, v in dump.items(): # flatten + for k, v in dump.items(): # flatten if isinstance(v, dict): config_dict.update(v) else: config_dict[k] = v # rename keys config_dict = {renames.get(k, k.upper()): v for k, v in config_dict.items()} - config_dict['config'] = self + config_dict['system_config'] = self return config_dict +class MultiPartitionSystemConfig(BaseModel): + system_name: str + partitions: list[SystemConfig] + + @field_validator("partitions") + def _validate_partitions(cls, partitions: list[SystemConfig]): + partition_names = [c.system_name for c in partitions] + if len(set(partition_names)) != len(partition_names): + raise ValueError(f"Duplicate system names: {','.join(partition_names)}") + return partitions + + @property + def partition_names(self): + return [c.system_name for c in self.partitions] + + @functools.cache def list_systems() -> list[str]: """ Lists all available systems """ return sorted([ - str(p.relative_to(CONFIG_PATH)).removesuffix(".yaml") - for p in CONFIG_PATH.rglob("*.yaml") + str(p.relative_to(raps_config.system_config_dir)).removesuffix(".yaml") + for p in raps_config.system_config_dir.rglob("*.yaml") ]) @@ -204,20 +231,57 @@ def get_system_config(system: str) -> SystemConfig: """ Returns the system config as a Pydantic object. system can either be a path to a custom .yaml file, or the name of one of the pre-configured - systems defined in RAPS_CONFIG. + systems defined in RAPS_SYSTEM_CONFIG_DIR. """ - config_path = Path(system.removesuffix(".yaml") + ".yaml") - if config_path.exists() or config_path.is_absolute(): - system_name = config_path.resolve() - else: # assume it's a pre-configured system - system_name = system.removesuffix(".yaml") - config_path = CONFIG_PATH / config_path + if system in list_systems(): + config_path = raps_config.system_config_dir / f"{system}.yaml" + system_name = system + else: + config_path = Path(system).resolve() + system_name = config_path.stem + if not config_path.is_file(): - raise FileNotFoundError( - f'"{system}" not found. Known systems are: {list_systems()}' - ) + raise FileNotFoundError(f'"{system}" not found. Valid systems are: {list_systems()}') config = { - "system_name": system_name, + "system_name": system_name, # You can override system_name in the yaml as well **yaml.safe_load(config_path.read_text()), } return SystemConfig.model_validate(config) + + +def get_partition_configs(partitions: list[str]) -> MultiPartitionSystemConfig: + """ + Resolves multiple partition config files. Can pass globs, or directories to include all yaml + files under the directory. + """ + systems = list_systems() + multi_partition_systems = set(s.split("/")[0] for s in systems if "/" in s) + combined_system_name = [] + + parsed_configs: list[SystemConfig] = [] + for pat in partitions: + if pat in multi_partition_systems: + matched_systems = fnmatch.filter(systems, f"{pat}/*") + combined_system_name.append(pat) + elif fnmatch.filter(systems, pat): + matched_systems = fnmatch.filter(systems, pat) + combined_system_name.extend(s.split("/")[0] for s in matched_systems) + elif Path(pat).is_dir(): + matched_systems = sorted(Path(pat).glob("*.yaml")) + combined_system_name.append(Path(pat).name) + else: + matched_systems = sorted(glob.glob(pat)) + combined_system_name.extend(Path(s).stem for s in matched_systems) + + if not matched_systems: + raise FileNotFoundError(f'No config files match "{pat}"') + parsed_configs.extend(get_system_config(s) for s in sorted(matched_systems)) + + if len(parsed_configs) == 1: + combined_system_name = parsed_configs[0].system_name + else: + combined_system_name = "+".join(dict.fromkeys(combined_system_name)) # dedup, keep order + return MultiPartitionSystemConfig( + system_name=combined_system_name, + partitions=parsed_configs, + ) diff --git a/raps/telemetry.py b/raps/telemetry.py index e78a40a9a70f3eca3184f5860f32da052275d9ad..c6815cdd92d064862e264f303ff2344d2624e2b1 100644 --- a/raps/telemetry.py +++ b/raps/telemetry.py @@ -18,7 +18,7 @@ from types import ModuleType if __name__ == "__main__": - # from raps.args import args,args_dict + # from raps.sim_config import args, args_dict parser = argparse.ArgumentParser(description='Telemetry data validator') parser.add_argument('--jid', type=str, default='*', help='Replay job id') parser.add_argument('-f', '--replay', nargs='+', type=str, @@ -49,7 +49,7 @@ import pandas as pd from tqdm import tqdm # from rich.progress import track -from raps.config import get_system_config +from raps.system_config import get_system_config from raps.job import Job, job_dict import matplotlib.pyplot as plt from raps.plotting import ( @@ -57,8 +57,8 @@ from raps.plotting import ( plot_nodes_gantt, plot_network_histogram ) -from raps.utils import next_arrival_byconfargs, create_casename, convert_to_seconds -# from raps.args import args, args_dict +from raps.utils import next_arrival_byconfargs, create_casename, convert_to_time_unit +# from raps.sim_config import args, args_dict class Telemetry: @@ -259,7 +259,7 @@ class Telemetry: print(f"File was generated with:" f"\n--system {args_from_file.system} ") if hasattr(args_from_file, 'fastforward'): - print(f"-ff {args_from_file.fastforward} ") + print(f"--ff {args_from_file.fastforward} ") if hasattr(args_from_file, 'time'): print(f"-t {args_from_file.time}") print(f"All Args:\n{args_from_file}" @@ -308,7 +308,7 @@ class Telemetry: timestep_end=timestep_end, args=args, filename=self.dirname) if args.time: - timestep_end = timestep_start + convert_to_seconds(args.time) + timestep_end = timestep_start + convert_to_time_unit(args.time) elif not timestep_end: timestep_end = int(max(job.wall_time + job.start_time for job in jobs)) + 1 diff --git a/raps/utils.py b/raps/utils.py index bbac74d8b1c8354be57d02e1c0bf3dd09af34d17..f54cc7144441e71f82016ab3dfade5f6957ff9ab 100644 --- a/raps/utils.py +++ b/raps/utils.py @@ -8,17 +8,22 @@ generating random numbers, summarizing and expanding ranges, determining job sta from datetime import timedelta from enum import Enum - import os import hashlib import math +import re import numpy as np import pandas as pd import random import sys import uuid import json - +import argparse +from pathlib import Path +from typing import Annotated as A, TypeVar, Callable +from pydantic import BaseModel, TypeAdapter, AfterValidator +from pydantic_settings import BaseSettings, SettingsConfigDict, CliApp, CliSettingsSource +import yaml from raps.job import Job @@ -455,52 +460,70 @@ def next_arrival(lambda_rate, reset=False, start_time=0): return next_arrival.next_time -def convert_to_seconds(time_str): - if isinstance(time_str, (int, float)): - return time_str # this happens.... - # Define the conversion factors - time_factors = { - 'd': 86400, # 1 day = 86400 seconds - 'h': 3600, # 1 hour = 3600 seconds - 'm': 60, # 1 minute = 60 seconds - 's': 1, # 1 second = 1 second - '': 1 # empty string = 1 second - } - downscale_factors = { - 'ms': 1000, - 'cs': 100, - 'ds': 10 - } - - # Check if the input string ends with a unit or is purely numeric - # and extract the numeric part and the time unit - if time_str[-1].isdigit(): - unit = '' - num_str = time_str[:] - else: - if time_str[-2].isdigit(): - unit = time_str[-1] - num_str = time_str[:-1] - else: - unit = time_str[-2:] - num_str = time_str[:-2] - - index = num_str.find(".") # convert int or float string - if index != -1: - num = float(num_str) - raise ValueError(f"Float not supported at this time: {num}{unit}") - - else: - num = int(num_str) - - # Convert to seconds using the conversion factors - if unit in time_factors: - return num * time_factors[unit], 1 - elif unit in downscale_factors: - downscale = downscale_factors[unit] - return num, downscale - else: - raise ValueError(f"Unknown time unit: {unit}") +TIME_UNITS = { + 'd': timedelta(days=1), + 'h': timedelta(hours=1), + 'm': timedelta(minutes=1), + 's': timedelta(seconds=1), + 'ds': timedelta(milliseconds=100), + 'cs': timedelta(milliseconds=10), + 'ms': timedelta(milliseconds=1), +} + + +def parse_time_unit(unit) -> timedelta: + parsed_unit = unit + if TypeAdapter(timedelta).validator.isinstance_python(unit): + parsed_unit = TypeAdapter(timedelta).validate_python(unit) + elif isinstance(unit, str): + parsed_unit = TIME_UNITS.get(unit) + if not isinstance(parsed_unit, timedelta): + raise ValueError(f"Invalid time unit {unit}") + if parsed_unit not in TIME_UNITS.values() or parsed_unit > TIME_UNITS['s']: + raise ValueError("Only time units of s, ds, cs, and ms are supported") + return parsed_unit + + +def parse_td(td, unit: str | timedelta = 's') -> timedelta: + """ Parse into a timedelta. Pass unit to interpret raw numbers as (default seconds) """ + unit = parse_time_unit(unit) + if TypeAdapter(int).validator.isinstance_python(td): + return unit * TypeAdapter(int).validate_python(td) + if TypeAdapter(timedelta).validator.isinstance_python(td): + return TypeAdapter(timedelta).validate_python(td) + if isinstance(td, str): + re_match = re.fullmatch(r"(\d+)\s*(\w+)", td.strip()) + if re_match and re_match[2] in TIME_UNITS: + num_str, unit_str = re_match.groups() + return int(num_str) * TIME_UNITS[unit_str] + raise ValueError(f"Invalid timedelta: {td}") + + +def convert_to_time_unit(td, unit: str | timedelta = 's'): + """ + Converts to integer number of time unit + Throws if the given time is less than the unit + """ + num = parse_td(td, unit) / parse_time_unit(unit) + if (num != 0 and num < 1) or not num.is_integer(): + raise ValueError(f"{td} is not divisible by time unit {unit}") + return int(num) + + +def infer_time_unit(td) -> timedelta: + """ Infers the time unit the user meant for the input string """ + parsed_td = parse_td(td) + time_unit = None + if isinstance(td, str): # infer unit from string, e.g. 1s or 200ms + re_match = re.fullmatch(r"(\d+)\s*(\w+)", td.strip()) + if re_match and re_match[2] in TIME_UNITS: + time_unit = TIME_UNITS[re_match[2]] + if not time_unit: + for unit in sorted(TIME_UNITS.values(), reverse=True): + if (parsed_td % unit).total_seconds() == 0: + time_unit = unit + break + return min(TIME_UNITS['s'], time_unit or TIME_UNITS['s']) def encrypt(name): @@ -604,3 +627,76 @@ class ValueComparableEnum(Enum): def __hash__(self): # required if you override __eq__ return hash(self.value) + + +ExpandedPath = A[Path, AfterValidator(lambda v: Path(v).expanduser().resolve())] +""" Type that that expands ~ and environment variables in a path string """ + + +T = TypeVar("T", bound=BaseModel) + + +def pydantic_add_args( + parser: argparse.ArgumentParser, model_cls: type[T], + model_config: SettingsConfigDict | None = None, +) -> Callable[[argparse.Namespace, dict | None], T]: + """ + Add arguments to the parser from the model. Returns a function that can be used to parse the + model from the argparse args. + + Normally you'd just configure Pydantic to just automatically create a BaseSettings object from + sys.argv and/or env variables. But we want a bit more control over the cli parser, and to use + the SimConfig model as a regular non-settings model in the simulation server. So here we do + some hacks to apply the args manually. + """ + model_config_dict = SettingsConfigDict({ + **(model_config or {}), + "cli_parse_args": False, # Don't automatically parse args + }) + + class SettingsModel(model_cls, BaseSettings): + @classmethod + def settings_customise_sources(cls, settings_cls, + init_settings, env_settings, dotenv_settings, file_secret_settings, + ): + return (init_settings,) # Don't load from env vars or anything else + + model_config = model_config_dict + + cli_settings_source = CliSettingsSource(SettingsModel, root_parser=parser) + + def model_validate_args(args: argparse.Namespace, data: dict | None = None): + model = CliApp.run(SettingsModel, + cli_args=args, + cli_settings_source=cli_settings_source, + **(data or {}), + ) + # Recreate model so we don't return the SettingsModel subclass + return model_cls.model_validate(model.model_dump()) + return model_validate_args + + +def yaml_dump(data): + """ Dumps yaml with pretty formatting """ + class IndentDumper(yaml.Dumper): + def represent_data(self, data): + # Quote all strings with special characters to avoid confusion + if ( + isinstance(data, str) and + (not re.fullmatch(r"[\w-]+", data) or data.isdigit()) and + "\n" not in data + ): + return self.represent_scalar('tag:yaml.org,2002:str', data, style='"') + return super(IndentDumper, self).represent_data(data) + + def increase_indent(self, flow=False, indentless=False): + # Indent lists + return super(IndentDumper, self).increase_indent(flow, False) + + return yaml.dump( + data, + Dumper=IndentDumper, + sort_keys=False, + indent=2, + allow_unicode=True, + ) diff --git a/raps/workload.py b/raps/workload.py index 1e6b5624d4bb174c445d9256dea016a8b28348ab..151e2c3590498b38c1c32accadc889c5e886ca01 100644 --- a/raps/workload.py +++ b/raps/workload.py @@ -711,7 +711,6 @@ def plot_job_hist(jobs, config=None, dist_split=None, gantt_nodes=False): axs[0][1].yaxis.tick_right() else: axs[0][1].set_yticks([]) - pass axs[0][1].hist(cpu_util, bins=100, orientation='vertical', zorder=1, density=True, color='tab:cyan') axs[0][1].axvline(np.mean(cpu_util), color='r', linewidth=1, zorder=3) axs[0][1].set(xlim=[0, config['CPUS_PER_NODE']]) @@ -800,83 +799,9 @@ def plot_job_hist(jobs, config=None, dist_split=None, gantt_nodes=False): plt.show() -def add_workload_to_parser(parser): - - choices = ['random', 'benchmark', 'peak', 'idle', 'synthetic', 'multitenant'] - parser.add_argument('-w', '--workload', type=str, choices=choices, - default=choices[0], help='Type of synthetic workload') - - parser.add_argument("--multimodal", default=[1.0], type=float, nargs="+", - help="Percentage to draw from each distribution " - "(list of floats)e.g. '0.2 0.8' percentages apply" - " in order to the list of the --distribution argument list.") - - # Jobsize: - parser.add_argument("--jobsize-distribution", type=str, nargs="+", - choices=['uniform', 'weibull', 'normal'], default=None, help='Distribution type') - - parser.add_argument("--jobsize-normal-mean", type=float, required=False, help="Mean (mu) for Normal distribution") - parser.add_argument("--jobsize-normal-stddev", type=float, required=False, - help="Standard deviation (sigma) for Normal distribution") - - parser.add_argument("--jobsize-weibull-shape", type=float, required=False, help="Jobsize shape of weibull") - parser.add_argument("--jobsize-weibull-scale", type=float, required=False, help="Jobsize scale of weibull") - - parser.add_argument("--jobsize-is-of-degree", default=None, type=int, required=False, - help="Draw jobsizes from distribution of degree N (squared,cubed).") - parser.add_argument("--jobsize-is-power-of", default=None, type=int, required=False, - help="Draw jobsizes from distribution of power of N (2->2^x,3->3^x).") - - # Walltime: - parser.add_argument("--walltime-distribution", type=str, nargs="+", - choices=['uniform', 'weibull', 'normal'], default=None, help='Distribution type') - - parser.add_argument("--walltime-normal-mean", type=float, required=False, - help="Walltime mean (mu) for Normal distribution") - parser.add_argument("--walltime-normal-stddev", type=float, required=False, - help="Walltime standard deviation (sigma) for Normal distribution") - - parser.add_argument("--walltime-weibull-shape", type=float, required=False, help="Walltime shape of weibull") - parser.add_argument("--walltime-weibull-scale", type=float, required=False, help="Walltime scale of weibull") - - # Utilizations - parser.add_argument("--cpuutil-distribution", type=str, nargs="+", - choices=['uniform', 'weibull', 'normal'], default=['uniform'], help='Distribution type') - - parser.add_argument("--cpuutil-normal-mean", type=float, required=False, - help="Walltime mean (mu) for Normal distribution") - parser.add_argument("--cpuutil-normal-stddev", type=float, required=False, - help="Walltime standard deviation (sigma) for Normal distribution") - - parser.add_argument("--cpuutil-weibull-shape", type=float, required=False, help="Walltime shape of weibull") - parser.add_argument("--cpuutil-weibull-scale", type=float, required=False, help="Walltime scale of weibull") - - parser.add_argument("--gpuutil-distribution", type=str, nargs="+", - choices=['uniform', 'weibull', 'normal'], default=['uniform'], help='Distribution type') - - parser.add_argument("--gpuutil-normal-mean", type=float, required=False, - help="Walltime mean (mu) for Normal distribution") - parser.add_argument("--gpuutil-normal-stddev", type=float, required=False, - help="Walltime standard deviation (sigma) for Normal distribution") - - parser.add_argument("--gpuutil-weibull-shape", type=float, required=False, help="Walltime shape of weibull") - parser.add_argument("--gpuutil-weibull-scale", type=float, required=False, help="Walltime scale of weibull") - - parser.add_argument("--gantt-nodes", default=False, action='store_true', required=False, - help="Print Gannt with nodes required as line thickness (default false)") - - return parser - - -def check_workload_args(args): - if (args.jobsize_is_power_of is not None and args.jobsize_is_of_degree is not None): - print("Choose either --jobsize-is-power-of or --jobsize-is-of-degree! Not both.") - exit(1) - - def run_workload(): - from raps.args import args, args_dict - from raps.config import get_system_config + from raps.sim_config import args, args_dict + from raps.system_config import get_system_config config = get_system_config(args.system).get_legacy() if args.replay: td = Telemetry(**args_dict) @@ -1045,7 +970,6 @@ def continuous_job_generation(*, engine, timestep, jobs): if len(engine.queue) <= engine.continuous_workload.args.maxqueue: new_jobs = engine.continuous_workload.generate_jobs() jobs.extend(new_jobs) - pass if __name__ == "__main__": diff --git a/scripts/marconi100-day51.sh b/scripts/marconi100-day51.sh index ae801a5736759d73473924bccd857a8c5fec4c3b..01da9a2bfc9a1e6930f3a52e22718982864f9cd2 100644 --- a/scripts/marconi100-day51.sh +++ b/scripts/marconi100-day51.sh @@ -1,4 +1,4 @@ -python main.py -f ~/data/marconi100/job_table.parquet --system marconi100 -ff 4381000 -t 61000 -o --policy replay -python main.py -f ~/data/marconi100/job_table.parquet --system marconi100 -ff 4381000 -t 61000 -o --policy fcfs -python main.py -f ~/data/marconi100/job_table.parquet --system marconi100 -ff 4381000 -t 61000 -o --policy fcfs --backfill easy -python main.py -f ~/data/marconi100/job_table.parquet --system marconi100 -ff 4381000 -t 61000 -o --policy priority --backfill firstfit +python main.py -f ~/data/marconi100/job_table.parquet --system marconi100 --ff 4381000 -t 61000 -o --policy replay +python main.py -f ~/data/marconi100/job_table.parquet --system marconi100 --ff 4381000 -t 61000 -o --policy fcfs +python main.py -f ~/data/marconi100/job_table.parquet --system marconi100 --ff 4381000 -t 61000 -o --policy fcfs --backfill easy +python main.py -f ~/data/marconi100/job_table.parquet --system marconi100 --ff 4381000 -t 61000 -o --policy priority --backfill firstfit diff --git a/scripts/meta_run.sh b/scripts/meta_run.sh index 41f4831e043f64a1f1ca822874fcd8f6537f3d4e..0c60596090d60d0ab3be306b1136ff1d23b5e7ea 100755 --- a/scripts/meta_run.sh +++ b/scripts/meta_run.sh @@ -17,7 +17,7 @@ while [ $current_sec -le $end_sec ]; do DATEDIRS="date=$DATEDIR" # Construct the command with the formatted date - command="python main.py -d -o --plot power loss -f $DPATH/slurm/joblive/$DATEDIRS $DPATH/jobprofile/jobprofile/$DATEDIRS >& $DATEDIRS.out &" + command="python main.py -d -o --plot power loss -f $DPATH/slurm/joblive/$DATEDIRS,$DPATH/jobprofile/jobprofile/$DATEDIRS >& $DATEDIRS.out &" sleep 10 # Execute the command diff --git a/tests/systems/test_main_network_run.py b/tests/systems/test_main_network_run.py index f40cc8f4b4ad973fc889bf8f3743f4eaeb40343d..8b80d5dbdc1a9c19d32408facb67fc671cd44642 100644 --- a/tests/systems/test_main_network_run.py +++ b/tests/systems/test_main_network_run.py @@ -24,7 +24,7 @@ def test_main_network_run(system, system_config, random_id): "python", "main.py", "--time", "1m", "--system", system, - "-net", + "--net", "-o", random_id ], capture_output=True, text=True, stdin=subprocess.DEVNULL) assert result.returncode == 0, f"Failed on {system}: {result.stderr}" diff --git a/tests/systems/test_main_network_withdata_run.py b/tests/systems/test_main_network_withdata_run.py index c478e21be085c2389b932964dec7b5e23d433645..82c30de7493c93f356ceab2302cf111b658dd3c2 100644 --- a/tests/systems/test_main_network_withdata_run.py +++ b/tests/systems/test_main_network_withdata_run.py @@ -31,7 +31,7 @@ def test_main_run(system, system_config, system_file, random_id): "--time", "1m", "--system", system, "-f", *file_list, - "-net", + "--net", "-o", random_id ], capture_output=True, text=True, stdin=subprocess.DEVNULL) assert result.returncode == 0, f"Failed on {system}: {result.stderr}" diff --git a/tests/systems/test_main_time_delta_run.py b/tests/systems/test_main_time_delta_run.py index f86caa7184a3943ee4c9cf98e4420e5e6f3d11fd..9cb87a229af27a3f5094a23e14d30d09a0057d31 100644 --- a/tests/systems/test_main_time_delta_run.py +++ b/tests/systems/test_main_time_delta_run.py @@ -3,8 +3,7 @@ import subprocess import gc import pytest from tests.util import PROJECT_ROOT -from raps.utils import convert_seconds_to_hhmmss -from raps.utils import convert_to_seconds +from raps.utils import convert_to_time_unit, convert_seconds_to_hhmmss pytestmark = [ @@ -38,8 +37,8 @@ def test_main_time_delta_run(system, system_config, time_arg, tdelta_arg, random "-o", random_id ], capture_output=True, text=True, stdin=subprocess.DEVNULL) assert result.returncode == 0, f"Failed on {system}: {result.stderr}" - time, downscale = convert_to_seconds(time_arg) - assert f"Time Simulated: {convert_seconds_to_hhmmss(time // downscale)}" in result.stdout + time = convert_to_time_unit(time_arg) + assert f"Time Simulated: {convert_seconds_to_hhmmss(time)}" in result.stdout subprocess.run( f"rm {random_id}.npz && rm -fr simulation_results/{random_id}", diff --git a/tests/systems/test_main_time_delta_sub_second_run.py b/tests/systems/test_main_time_delta_sub_second_run.py index 55425492756cbbcb11b24a79dedf5cc9976e9a3f..459d295318cb55e994f57fa546c088098c60acc8 100644 --- a/tests/systems/test_main_time_delta_sub_second_run.py +++ b/tests/systems/test_main_time_delta_sub_second_run.py @@ -3,8 +3,7 @@ import subprocess import gc import pytest from tests.util import PROJECT_ROOT -from raps.utils import convert_seconds_to_hhmmss -from raps.utils import convert_to_seconds +from raps.utils import convert_seconds_to_hhmmss, parse_td pytestmark = [ @@ -39,10 +38,8 @@ def test_main_time_delta_run(system, system_config, time_arg, tdelta_arg, random "-o", random_id ], capture_output=True, text=True, stdin=subprocess.DEVNULL) assert result.returncode == 0, f"Failed on {system}: {result.stderr}" - time, downscale = convert_to_seconds(time_arg) - td, td_ds = convert_to_seconds(tdelta_arg) - #assert f"Time Simulated: {convert_seconds_to_hhmmss(int((time / td_ds) * downscale))}" in result.stdout - assert f"Time Simulated: {convert_seconds_to_hhmmss(time / downscale)}" in result.stdout + time = parse_td(time_arg).seconds + assert f"Time Simulated: {convert_seconds_to_hhmmss(time)}" in result.stdout subprocess.run( f"rm {random_id}.npz && rm -fr simulation_results/{random_id}", diff --git a/tests/systems/test_main_time_ff_delta_run.py b/tests/systems/test_main_time_ff_delta_run.py index 3e46ddad3db6e7e461e02907d0042ab8f33a4ef6..d3ef9632eb40e260af9bdc5dcb44d1789eb1950d 100644 --- a/tests/systems/test_main_time_ff_delta_run.py +++ b/tests/systems/test_main_time_ff_delta_run.py @@ -30,7 +30,7 @@ def test_main_time_delta_run(system, system_config, time_arg, tdelta_arg, result = subprocess.run([ "python", "main.py", "-t", time_arg, - "-ff", ff_arg, + "--ff", ff_arg, "--time-delta", tdelta_arg, "--system", system, #--"-f", system_file, diff --git a/tests/systems/test_main_withdata_run.py b/tests/systems/test_main_withdata_run.py index f2f5f7c9d3323890d4ee070c7fa87a7358feda04..928d1496aa1f476390a744f3495f6da219d96e21 100644 --- a/tests/systems/test_main_withdata_run.py +++ b/tests/systems/test_main_withdata_run.py @@ -28,7 +28,7 @@ def test_main_withdata_run(system, system_config, system_file, random_id): "python", "main.py", "--time", "1m", "--system", system, - "-f", *file_list, + "-f", ','.join(file_list), "-o", random_id ], capture_output=True, text=True, stdin=subprocess.DEVNULL) assert result.returncode == 0, f"Failed on {system}: {result.stderr}" diff --git a/tests/systems/test_multi_part_sim_basic_run.py b/tests/systems/test_multi_part_sim_basic_run.py index 24a671e4960815bced5667c1a6a683d07c56b663..9b9db130fea6b1610f09b73921fbdef1f9564eb3 100644 --- a/tests/systems/test_multi_part_sim_basic_run.py +++ b/tests/systems/test_multi_part_sim_basic_run.py @@ -20,7 +20,6 @@ def test_multi_part_sim_run(system, system_config): result = subprocess.run([ "python", "multi-part-sim.py", "--time", "1h", - "--system", system, "-x", f"{system}/*", #"--noui" ], capture_output=True, text=True, stdin=subprocess.DEVNULL) diff --git a/tests/systems/test_multi_part_sim_network_run.py b/tests/systems/test_multi_part_sim_network_run.py index 643b97a6387ff5d2f7f9c32022c107080a3d21b8..1871725120b1d46b261d94c534abc7e1e87f241d 100644 --- a/tests/systems/test_multi_part_sim_network_run.py +++ b/tests/systems/test_multi_part_sim_network_run.py @@ -23,9 +23,8 @@ def test_multi_part_sim_run(system, system_config, random_id): result = subprocess.run([ "python", "multi-part-sim.py", "--time", "1h", - "--system", system, "-x", f"{system}/*", - "-net", + "--net", #"--noui" ], capture_output=True, text=True, stdin=subprocess.DEVNULL) assert result.returncode == 0, f"Failed on {system}: {result.stderr}" diff --git a/tests/systems/test_workload_synthetic.py b/tests/systems/test_workload_synthetic.py index 2e4c4a1812101071aa7e1679435daa0b576b0c91..dd5f8cf6c395f6aa8c7463101c57b11c4d270c56 100644 --- a/tests/systems/test_workload_synthetic.py +++ b/tests/systems/test_workload_synthetic.py @@ -1,7 +1,6 @@ import subprocess import gc import pytest -import shlex pytestmark = [ diff --git a/tests/test_main.py b/tests/test_main.py index 4b98263ba4dd4ddb6ed68de095507a0c2fa1ec40..76f48a361c744e5c4f650f3ce0fed59e5bdbab86 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -1,4 +1,3 @@ -from tests.smoke import main import subprocess import os diff --git a/tests/test_system_config.py b/tests/test_system_config.py deleted file mode 100644 index 74280dffa4b34d64853054211064836fcc9134bf..0000000000000000000000000000000000000000 --- a/tests/test_system_config.py +++ /dev/null @@ -1,10 +0,0 @@ -import pytest -from raps.config import list_systems, get_system_config - -@pytest.mark.parametrize("system_name", list_systems()) -def test_configs(system_name): - # Very basic test that all system configs are valid - config = get_system_config(system_name) - assert config.system_name == system_name - assert config.get_legacy()['system_name'] == system_name - assert config.get_legacy()['config'] == config diff --git a/tests/unit/test_system_config.py b/tests/unit/test_system_config.py new file mode 100644 index 0000000000000000000000000000000000000000..e32b99f4b5aaddd71752f77f4aaf1d6b619a9901 --- /dev/null +++ b/tests/unit/test_system_config.py @@ -0,0 +1,26 @@ +import pytest +from raps.raps_config import raps_config +from raps.system_config import list_systems, get_system_config, get_partition_configs + + +@pytest.mark.parametrize("system_name", list_systems()) +def test_configs(system_name): + # Very basic test that all system configs are valid + config = get_system_config(system_name) + assert config.system_name == system_name + assert config.get_legacy()['system_name'] == system_name + assert config.get_legacy()['system_config'] == config + + +@pytest.mark.parametrize("input,expected_name,expected_configs", [ + (["lumi"], "lumi", ["lumi/lumi-c", "lumi/lumi-g"]), + (["lumi/*"], "lumi", ["lumi/lumi-c", "lumi/lumi-g"]), + (["frontier", "summit"], "frontier+summit", ["frontier", "summit"]), + # test passing arbitrary paths + ([str(raps_config.system_config_dir / "lumi")], "lumi", ["lumi-c", "lumi-g"]), + ([str(raps_config.system_config_dir / "lumi/lumi-*")], "lumi-c+lumi-g", ["lumi-c", "lumi-g"]), +]) +def test_get_partition_configs(input, expected_name, expected_configs): + result = get_partition_configs(input) + assert result.system_name == expected_name + assert result.partition_names == expected_configs diff --git a/tests/unit/test_utils.py b/tests/unit/test_utils.py new file mode 100644 index 0000000000000000000000000000000000000000..edf06fe12f479ec0d8ecdeb9f6c45beff94380ec --- /dev/null +++ b/tests/unit/test_utils.py @@ -0,0 +1,42 @@ +import pytest +from datetime import timedelta +from raps.utils import parse_td, convert_to_time_unit, infer_time_unit, TIME_UNITS + + +@pytest.mark.parametrize("input,expected", [ + ("1", timedelta(seconds=1)), + ("1m", timedelta(minutes=1)), + (timedelta(minutes=1), timedelta(minutes=1)), + (2, timedelta(seconds=2)), + ("PT2S", timedelta(seconds=2)), +]) +def test_parse_td(input, expected): + assert parse_td(input) == expected + + +def test_parse_td_error(): + with pytest.raises(ValueError): + parse_td("1x") + + +@pytest.mark.parametrize("input,unit,expected", [ + ("1s", 's', 1), + ("1m", 's', 60), + (0, 'ms', 0), + (timedelta(seconds=6), 'ms', 6000), +]) +def test_convert_to_time_unit(input, unit, expected): + assert convert_to_time_unit(input, unit) == expected + + +@pytest.mark.parametrize("input,expected", [ + ("1s", 's'), + ("1000ms", 'ms'), + (0, 's'), + (timedelta(seconds=6), 's'), + (timedelta(days=6), 's'), + (timedelta(milliseconds=6), 'ms'), + (timedelta(milliseconds=60), 'cs'), +]) +def test_infer_time_unit(input, expected): + assert infer_time_unit(input) == TIME_UNITS[expected]