diff --git a/README.md b/README.md index eb39cc718c5ad7e4b86ea20d08cc2741af94f867..90ab831fbe21fe88ad180154a00ac2790780649c 100644 --- a/README.md +++ b/README.md @@ -46,7 +46,7 @@ For Adastra MI250 supercomputer, download 'AdastaJobsMI250_15days.parquet' from For Google cluster trace v2 - raps run --system gcloudv2 -f ~/data/gcloud/v2/google_cluster_data_2011_sample --ff 600 + raps run --system gcloudv2 -f ~/data/gcloud/v2/google_cluster_data_2011_sample --start '2011-05-02T00:10:00Z' # analyze dataset raps telemetry --system gcloudv2 -f ~/data/gcloud/v2/google_cluster_data_2011_sample -v @@ -87,7 +87,7 @@ For Lumi Lassen is one of the few datasets that has networking data. See `raps/dataloaders/lassen.py` for how to get the datasets. To run a network simulation, use the following command: - raps run -f ~/data/lassen/Lassen-Supercomputer-Job-Dataset --system lassen --policy fcfs --backfill firstfit --ff 365d -t 12h --arrival poisson --net + raps run -f ~/data/lassen/Lassen-Supercomputer-Job-Dataset --system lassen --policy fcfs --backfill firstfit --start '2019-08-22T00:00:00+00:00' -t 12h --arrival poisson --net ## Snapshot of extracted workload data diff --git a/experiments/gcloudv2.yaml b/experiments/gcloudv2.yaml index db8e21839ce05c788208fd66e839713f0e967e17..9c7a7005ca8a9f7d09da0b000227917ac488b917 100644 --- a/experiments/gcloudv2.yaml +++ b/experiments/gcloudv2.yaml @@ -1,4 +1,4 @@ system: gcloudv2 replay: - /opt/data/gcloud/v2/google_cluster_data_2011_sample -ff: 600 +start: 2011-05-02T00:10:00Z diff --git a/main.py b/main.py old mode 100644 new mode 100755 index b3c03f6ea7413d0385b4ff346fde0d42e3145733..b2eae9392b068eab22da2798631750808f7f0a32 --- a/main.py +++ b/main.py @@ -1,3 +1,4 @@ +#!/usr/bin/env python3 """ ExaDigiT Resource Allocator & Power Simulator (RAPS) """ diff --git a/pytest.ini b/pytest.ini index 08a528c6ede02c7b5fa836e0a61b193cc263f757..78b07aa905b9a827b3fcbf5ff202c540b5299fe7 100644 --- a/pytest.ini +++ b/pytest.ini @@ -22,6 +22,7 @@ markers = 40frontiers: System test adastraMI250: System test + bluewaters: System test frontier: System test fugaku: System test gcloudv2: System test diff --git a/raps/constants.py b/raps/constants.py index 53711e1c0bfc38df15662219864c81d7974b9fef..85b5e232012a0ac3e269895bef5e380cd87a6d3e 100644 --- a/raps/constants.py +++ b/raps/constants.py @@ -2,6 +2,7 @@ RAPS Constants """ from pathlib import Path +from datetime import datetime ELLIPSES = '\u2026' OUTPUT_PATH = Path('simulation_results') diff --git a/raps/cooling.py b/raps/cooling.py index ba96e4384f2e5946de511932ce793745f07480c3..066c12c3318a847f19a98f3a0031decc10be90b1 100644 --- a/raps/cooling.py +++ b/raps/cooling.py @@ -16,7 +16,7 @@ from fmpy import read_model_description, extract from fmpy.fmi2 import FMU2Slave from datetime import timedelta -from raps.policy import PolicyType +from raps.weather import Weather def get_matching_variables(variables, pattern): @@ -92,7 +92,7 @@ class ThermoFluidsModel: self.outputs = None self.unzipdir = None self.fmu = None - self.weather = None + self.weather: Weather | None = None def initialize(self): """ @@ -153,9 +153,7 @@ class ThermoFluidsModel: temperature = self.config['WET_BULB_TEMP'] # If replay mode is on and weather data is available - if engine.scheduler.policy == PolicyType.REPLAY and \ - self.weather and self.weather.start is not None and \ - self.weather.has_coords: + if self.weather and self.weather.has_coords: # Convert total seconds to timedelta object delta = timedelta(seconds=engine.current_timestep) target_datetime = self.weather.start + delta diff --git a/raps/dataloaders/adastraMI250.py b/raps/dataloaders/adastraMI250.py index 6eec26e8431f54ba08c03561c6c518f882b3546d..8cadbfbf2bcbeef8a32f519f8aede6390c22ed7b 100644 --- a/raps/dataloaders/adastraMI250.py +++ b/raps/dataloaders/adastraMI250.py @@ -11,8 +11,9 @@ # to replay with different scheduling policy python main.py -f /path/to/AdastaJobsMI250_15days.parquet --system adastraMI250 --policy priority --backfill easy - # to fast-forward 60 days and replay for 1 day - python main.py -f /path/to/AdastaJobsMI250_15days.parquet --system adastraMI250 --ff 60d -t 1d + # to run a specific time range + python main.py -f /path/to/AdastaJobsMI250_15days.parquet --system adastraMI250 \ + --start 2024-11-01T00:00:00Z --end 2024-11-02T00:00:00Z # to analyze dataset python -m raps.telemetry -f /path/to/AdastaJobsMI250_15days.parquet --system adastraMI250 -v diff --git a/raps/dataloaders/bluewaters.py b/raps/dataloaders/bluewaters.py index 9d5dc4d0dd5162b568b403d3f49cde2b6233afec..728e2bb236bcb4bae520cab1b060eb4156bd309c 100644 --- a/raps/dataloaders/bluewaters.py +++ b/raps/dataloaders/bluewaters.py @@ -181,7 +181,11 @@ def _parse_line(line: str, debug=False): def load_data(local_dataset_path, **kwargs): config = kwargs.get("config") root = Path(local_dataset_path[0]) - day = kwargs.get("start") + # TODO: confirm bluewater dates are in UTC + start = datetime.fromisoformat(kwargs.get('start') or "2017-03-28T00:00:00Z") + start = start.astimezone(timezone.utc) + # TODO: support multiple day replay + day = start.strftime("%Y%m%d") fp = root / "torque_logs" / day filter_str = kwargs.get("filter") debug = kwargs.get("debug") diff --git a/raps/dataloaders/frontier.py b/raps/dataloaders/frontier.py index 3849731d76ed041aa14672be7796aa6a60fbae28..a6ac45e70e94fbfc08ffb54496a74a8955f80552 100644 --- a/raps/dataloaders/frontier.py +++ b/raps/dataloaders/frontier.py @@ -275,12 +275,12 @@ def load_data_from_df(jobs_df: pd.DataFrame, jobprofile_df: pd.DataFrame, **kwar if end_time < telemetry_start: print("Job ends before first recorded telemetry entry:", job_id, "start:", - start_time, "end:", end_time, " Telemetry: ", len(gpu_trace), "entries.") + start_time, "end:", end_time, " Telemetry: ", len(gpu_trace), "entries.") continue # skip if start_time > telemetry_end: print("Job starts after last recorded telemetry entry:", job_id, "start:", - start_time, "end:", end_time, " Telemetry: ", len(gpu_trace), "entries.") + start_time, "end:", end_time, " Telemetry: ", len(gpu_trace), "entries.") continue # skip # Throw out jobs that are not valid! diff --git a/raps/dataloaders/kestrel.py b/raps/dataloaders/kestrel.py index c9efd704f47de86b77d679414da033725b166121..f15c80bc6ab5a168e7538554d3ed59e01880fe9e 100644 --- a/raps/dataloaders/kestrel.py +++ b/raps/dataloaders/kestrel.py @@ -49,14 +49,10 @@ def load_data_from_df(jobs_df: pd.DataFrame, **kwargs): The list of parsed jobs. """ config = kwargs.get('config') - min_time = kwargs.get('min_time', None) reschedule = kwargs.get('reschedule') - fastforward = kwargs.get('fastforward') validate = kwargs.get('validate') jid = kwargs.get('jid', '*') - if fastforward: print(f"fast-forwarding {fastforward} seconds") - # Sort jobs dataframe based on values in time_start column, adjust indices after sorting jobs_df = jobs_df.sort_values(by='submit_time') jobs_df = jobs_df[(jobs_df.start_time.between(pd.to_datetime('2024-09-01T00:00:00'), @@ -73,10 +69,7 @@ def load_data_from_df(jobs_df: pd.DataFrame, **kwargs): # Take earliest time as baseline reference # We can use the start time of the first job. - if min_time: - time_zero = min_time - else: - time_zero = jobs_df['submit_time'].min() + time_zero = jobs_df['submit_time'].min() num_jobs = len(jobs_df) print("time_zero:", time_zero, "num_jobs", num_jobs) @@ -125,8 +118,6 @@ def load_data_from_df(jobs_df: pd.DataFrame, **kwargs): # When extracting out a single job, run one iteration past the end of the job time_offset = config['UI_UPDATE_FREQ'] - if fastforward: time_offset -= fastforward - if reschedule: # Let the scheduler reschedule the jobs scheduled_nodes = None time_offset = next_arrival(1/config['JOB_ARRIVAL_TIME']) diff --git a/raps/dataloaders/lassen.py b/raps/dataloaders/lassen.py index bc57a7c0d8d959a3c2157444b253b586e9d56e1f..8bded75f6ebc9e2d17a6136a8b38c266e1f0cbb2 100644 --- a/raps/dataloaders/lassen.py +++ b/raps/dataloaders/lassen.py @@ -23,7 +23,7 @@ Usage Instructions: python main.py -f /path/to/LAST/Lassen-Supercomputer-Job-Dataset --system lassen --arrival poisson # to fast-forward 365 days and replay for 1 day. This region day has 2250 jobs with 1650 jobs executed. - python main.py -f /path/to/LAST/Lassen-Supercomputer-Job-Dataset --system lassen --ff 365d -t 1d + python main.py -f /path/to/LAST/Lassen-Supercomputer-Job-Dataset --system lassen --start '2019-08-22T00:00:00+00:00' -t 1d # For the network replay this command gives suiteable snapshots: python main.py -f /path/to/LAST/Lassen-Supercomputer-Job-Dataset --system lassen --policy fcfs --backfill firstfit -t 12h --arrival poisson # noqa @@ -35,10 +35,10 @@ import uuid import numpy as np import pandas as pd from tqdm import tqdm -from datetime import timedelta +from datetime import datetime, timedelta from ..job import job_dict, Job -from ..utils import power_to_utilization, parse_td, WorkloadData +from ..utils import power_to_utilization, WorkloadData def load_data(path, **kwargs): @@ -61,32 +61,26 @@ def load_data_from_df(allocation_df, node_df, step_df, **kwargs): jid = kwargs.get('jid', '*') validate = kwargs.get('validate') verbose = kwargs.get('verbose') - fastforward = kwargs.get('fastforward') # int in seconds + start = datetime.fromisoformat(kwargs['start']) if kwargs.get('start') else None allocation_df['job_submit_timestamp'] = pd.to_datetime( allocation_df['job_submit_time'], format='mixed', errors='coerce') allocation_df['begin_timestamp'] = pd.to_datetime(allocation_df['begin_time'], format='mixed', errors='coerce') allocation_df['end_timestamp'] = pd.to_datetime(allocation_df['end_time'], format='mixed', errors='coerce') - # Too large dataset! Cut by fastforward and time to simulate! - if fastforward is None: # This is in seconds / int? - fastforward = 0 - fastforward_timedelta = timedelta(seconds=fastforward) # timedelta - else: - fastforward_timedelta = timedelta(seconds=fastforward) # timedelta - time_to_simulate = kwargs.get('time') # int in seconds - if time_to_simulate is None: # This is a string! - time_to_simulate = 31536000 # a year - time_to_simulate_timedelta = timedelta(seconds=time_to_simulate) # timedelta - else: - time_to_simulate_timedelta = parse_td(time_to_simulate) # timedelta - telemetry_start_timestamp = allocation_df['begin_timestamp'].min() telemetry_start_time = 0 telemetry_end_timestamp = allocation_df['end_timestamp'].max() diff = telemetry_end_timestamp - telemetry_start_timestamp telemetry_end_time = int(math.ceil(diff.total_seconds())) + # Too large dataset! Cut by fastforward and time to simulate! + if start is None: + fastforward_timedelta = timedelta(seconds=0) + else: + fastforward_timedelta = start - telemetry_start_timestamp.tz_localize("UTC") + time_to_simulate_timedelta = timedelta(seconds=kwargs['time']) + simulation_start_timestamp = telemetry_start_timestamp + fastforward_timedelta simulation_end_timestamp = simulation_start_timestamp + time_to_simulate_timedelta diff --git a/raps/dataloaders/marconi100.py b/raps/dataloaders/marconi100.py index a10e1e8a28ba42886c093c5c456c449330618e92..6ff310b288579e6269643f0622c24387d84681c8 100644 --- a/raps/dataloaders/marconi100.py +++ b/raps/dataloaders/marconi100.py @@ -16,7 +16,7 @@ python main.py -f /path/to/job_table.parquet --system marconi100 --policy priority --backfill firstfit # to fast-forward 60 days and replay for 1 day - python main.py -f /path/to/job_table.parquet --system marconi100 --ff 60d -t 1d + python main.py -f /path/to/job_table.parquet --system marconi100 --start 2020-07-05T00:00:00+00:00 -t 1d # to analyze dataset python -m raps.telemetry -f /path/to/job_table.parquet --system marconi100 -v @@ -64,10 +64,6 @@ def load_data_from_df(jobs_df: pd.DataFrame, **kwargs): jid = kwargs.get('jid', '*') debug = kwargs.get('debug') - # fastforward = kwargs.get('fastforward') - # if fastforward: - # print(f"fast-forwarding {fastforward} seconds") - # Sort jobs dataframe based on values in time_start column, adjust indices after sorting jobs_df = jobs_df.sort_values(by='start_time') jobs_df = jobs_df.reset_index(drop=True) diff --git a/raps/dataloaders/mit_supercloud/cli.py b/raps/dataloaders/mit_supercloud/cli.py index 0596e5ce8279072f67ab796f29a9cd9f63a5684f..e459209d73edd60025e3b53273edcd369bf3e342 100644 --- a/raps/dataloaders/mit_supercloud/cli.py +++ b/raps/dataloaders/mit_supercloud/cli.py @@ -32,8 +32,8 @@ def main(): pl = subs.add_parser("load", parents=[common], help="Load local data into RAPS") pl.add_argument("path", help="Local data root") pl.set_defaults(func=lambda args: load_data(args.path, - start_date=args.start, - end_date=args.end, + start=args.start, + end=args.end, partition=args.partition)) args = p.parse_args() diff --git a/raps/dataloaders/mit_supercloud/loader.py b/raps/dataloaders/mit_supercloud/loader.py index a6229657ea1aaaf23e43993a3e7697d09265ca3b..fc16ade2e3c29a1898a55ad138ec34e750168197 100644 --- a/raps/dataloaders/mit_supercloud/loader.py +++ b/raps/dataloaders/mit_supercloud/loader.py @@ -119,7 +119,7 @@ from collections import Counter from datetime import datetime, timezone from raps.job import job_dict, Job -from raps.utils import summarize_ranges, next_arrival, WorkloadData +from raps.utils import summarize_ranges, WorkloadData from .utils import proc_cpu_series, proc_gpu_series, to_epoch from .utils import DEFAULT_START, DEFAULT_END @@ -236,8 +236,8 @@ def load_data(local_dataset_path, **kwargs): sl["__line__"] = sl.index + 2 # date window - start_ts = to_epoch(kwargs.get("start", DEFAULT_START)) - end_ts = to_epoch(kwargs.get("end", DEFAULT_END)) + start_ts = to_epoch(kwargs.get("start") or DEFAULT_START) + end_ts = to_epoch(kwargs.get("end") or DEFAULT_END) mask = (sl.time_submit >= start_ts) & (sl.time_submit < end_ts) sl = sl[mask] @@ -283,8 +283,8 @@ def load_data(local_dataset_path, **kwargs): # —— ERROR CATCH: no jobs in this window? —— if sl.empty: raise ValueError( - f"No SLURM jobs found between {kwargs.get('start_date')} and " - f"{kwargs.get('end_date')}. Please pick a range covered by the dataset." + f"No SLURM jobs found between {kwargs.get('start')} and " + f"{kwargs.get('end')}. Please pick a range covered by the dataset." ) # detect GPU‐using jobs diff --git a/raps/engine.py b/raps/engine.py index 4fdadb2b8a45abc1cc73f54eee69d8e96f2924c5..9ae27b127963e35c5ade824cb7da50a0710d5194 100644 --- a/raps/engine.py +++ b/raps/engine.py @@ -173,9 +173,9 @@ class Engine: self.avg_slowdown_history = [] self.max_slowdown_history = [] self.node_occupancy_history = [] - self.downtime = Downtime(first_downtime=sim_config.downtime_first, - downtime_interval=sim_config.downtime_interval, - downtime_length=sim_config.downtime_length, + self.downtime = Downtime(first_downtime=sim_config.downtime_first_int, + downtime_interval=sim_config.downtime_interval_int, + downtime_length=sim_config.downtime_length_int, debug=sim_config.debug, ) @@ -233,33 +233,9 @@ class Engine: random.seed(sim_config.seed) np.random.seed(sim_config.seed + 1) - if sim_config.cooling: - cooling_model = ThermoFluidsModel(**system_config_dict) - cooling_model.initialize() - if sim_config.start: - cooling_model.weather = Weather(sim_config.start, config=system_config_dict) - else: - cooling_model = None - - if sim_config.power_scope == 'node': - if sim_config.uncertainties: - power_manager = PowerManager(compute_node_power_validate_uncertainties, **system_config_dict) - else: - power_manager = PowerManager(compute_node_power_validate, **system_config_dict) - else: - if sim_config.uncertainties: - power_manager = PowerManager(compute_node_power_uncertainties, **system_config_dict) - else: - power_manager = PowerManager(compute_node_power, **system_config_dict) - - flops_manager = FLOPSManager( - config=system_config_dict, - validate=(sim_config.power_scope == "node"), - ) - if sim_config.live and not sim_config.replay: td = Telemetry(**sim_config_dict) - workload_data = td.load_from_live_system() + wd = td.load_from_live_system() elif sim_config.replay: # TODO: this will have issues if running separate systems or custom systems partition_short = partition.split("/")[-1] if partition else None @@ -278,31 +254,62 @@ class Engine: else: replay_files = sim_config.replay - workload_data = td.load_from_files(replay_files) + wd = td.load_from_files(replay_files) else: # Synthetic jobs wl = Workload(sim_config_args, system_config_dict) - workload_data = wl.generate_jobs() + wd = wl.generate_jobs() td = Telemetry(**sim_config_dict) - jobs = workload_data.jobs - - # TODO refactor how stat/end/fastforward/time work - if sim_config.fastforward is not None: - workload_data.telemetry_start = workload_data.telemetry_start + sim_config.fastforward - - if sim_config.time is not None: - workload_data.telemetry_end = workload_data.telemetry_start + sim_config.time - - if sim_config.time_delta is not None: - time_delta = sim_config.time_delta + jobs = wd.jobs + if len(jobs) == 0: + print(f"Warning no jobs found for {partition or 'system'}") + + if sim_config.start: + start = sim_config.start + diff = start - wd.start_date + if diff.total_seconds() < 0: + raise Exception( + f"{start.isoformat()} is before data range in workload. " + + f"Workload data begins at {wd.start_date.isoformat()}" + ) + wd.telemetry_start += int(diff.total_seconds()) + wd.start_date = start else: - time_delta = 1 + start = wd.start_date + start = start + sim_config.fastforward + wd.telemetry_end = wd.telemetry_start + sim_config.time_int + + time_delta = sim_config.time_delta_int if sim_config.continuous_job_generation: continuous_workload = wl else: continuous_workload = None + if sim_config.cooling: + cooling_model = ThermoFluidsModel(**system_config_dict) + cooling_model.initialize() + if sim_config.weather: + cooling_model.weather = Weather(start, config=system_config_dict) + else: + cooling_model = None + + if sim_config.power_scope == 'node': + if sim_config.uncertainties: + power_manager = PowerManager(compute_node_power_validate_uncertainties, **system_config_dict) + else: + power_manager = PowerManager(compute_node_power_validate, **system_config_dict) + else: + if sim_config.uncertainties: + power_manager = PowerManager(compute_node_power_uncertainties, **system_config_dict) + else: + power_manager = PowerManager(compute_node_power, **system_config_dict) + + flops_manager = FLOPSManager( + config=system_config_dict, + validate=(sim_config.power_scope == "node"), + ) + accounts = None if sim_config.accounts: job_accounts = Accounts(jobs) @@ -324,7 +331,7 @@ class Engine: system_config=system_config, ) - return engine, workload_data, time_delta + return engine, wd, time_delta def add_running_jobs_to_queue(self, jobs_to_submit: List): """ diff --git a/raps/envs/raps_env.py b/raps/envs/raps_env.py index e27a6d254b7cd5035bda0fc490170e19bfcd08ad..ec9fcad464972f3d56e6d3c3bc80c5367e41c2b4 100644 --- a/raps/envs/raps_env.py +++ b/raps/envs/raps_env.py @@ -15,23 +15,23 @@ def print_stats(stats, step=0): """prints SB3-style stats output""" wanted_keys = { - "time simulated": "engine/Time Simulated", - "average power": "engine/Average Power", - "system power efficiency": "engine/System Power Efficiency", - "total energy consumed": "engine/Total Energy Consumed", - "carbon emissions": "engine/Carbon Footprint", - "jobs completed": "jobs/Jobs Completed", + "time_simulated": "engine/Time Simulated", + "average_power": "engine/Average Power", + "system_power_efficiency": "engine/System Power Efficiency", + "total_energy_consumed": "engine/Total Energy Consumed", + "carbon_emissions": "engine/Carbon Footprint", + "jobs_completed": "jobs/Jobs Completed", "throughput": "jobs/Throughput", - "jobs still running": "jobs/Jobs Still Running", + "jobs_still_running": "jobs/Jobs Still Running", } for section in ["engine_stats", "job_stats"]: if section in stats: for k, v in stats[section].items(): - if k.lower() in wanted_keys: - if k.lower() == "jobs still running" and isinstance(v, list): + if k in wanted_keys: + if k == "jobs_still_running" and isinstance(v, list): v = len(v) - logger.record(wanted_keys[k.lower()], v) + logger.record(wanted_keys[k], v) logger.dump(step=step) diff --git a/raps/run_sim.py b/raps/run_sim.py index 4a3f9b385ed65e57e2e52aa3f7718d8ed436bc0b..74ea87a0f05c24bf1d6cbea71a9fee1e506e0cc7 100644 --- a/raps/run_sim.py +++ b/raps/run_sim.py @@ -6,14 +6,12 @@ These functions just handle rendering the terminal UI and outputting results to import json import pandas as pd import sys -import yaml import warnings -from pathlib import Path from raps.ui import LayoutManager from raps.plotting import Plotter from raps.engine import Engine from raps.multi_part_engine import MultiPartEngine -from raps.utils import write_dict_to_file, pydantic_add_args, SubParsers, yaml_dump, read_yaml +from raps.utils import write_dict_to_file, pydantic_add_args, SubParsers, read_yaml from raps.stats import ( get_engine_stats, get_job_stats, @@ -61,8 +59,7 @@ def run_sim(sim_config: SingleSimConfig): result=workload_data, args=sim_config, ) - config_yaml = yaml_dump(sim_config.model_dump(mode="json", exclude_defaults=True)) - (out / 'sim_config.yaml').write_text(config_yaml) + (out / 'sim_config.yaml').write_text(sim_config.dump_yaml()) jobs = workload_data.jobs timestep_start, timestep_end = workload_data.telemetry_start, workload_data.telemetry_end @@ -238,8 +235,7 @@ def run_parts_sim(sim_config: MultiPartSimConfig): result=workload_results[part], args=sim_config, ) - config_yaml = yaml_dump(sim_config.model_dump(mode="json", exclude_defaults=True)) - (out / 'sim_config.yaml').write_text(config_yaml) + (out / 'sim_config.yaml').write_text(sim_config.dump_yaml()) jobs = {p: w.jobs for p, w in workload_results.items()} @@ -310,5 +306,4 @@ def show_add_parser(subparsers: SubParsers): def show(sim_config: SingleSimConfig, show_defaults=False): - data = sim_config.model_dump(mode="json", exclude_defaults=not show_defaults) - print(yaml_dump(data), end="") + print(sim_config.dump_yaml(exclude_unset=not show_defaults), end='') diff --git a/raps/sim_config.py b/raps/sim_config.py index 05c078c257ebd5075121458f3b2b791218cc26ab..867dac0bd4429352df830de38ba609c5d472c412 100644 --- a/raps/sim_config.py +++ b/raps/sim_config.py @@ -3,15 +3,15 @@ import abc from pathlib import Path from functools import cached_property from datetime import timedelta -from typing import Literal +from typing import Literal, Annotated as A import importlib from raps.schedulers.default import PolicyType, BackfillType from raps.utils import ( - parse_time_unit, convert_to_time_unit, infer_time_unit, ExpandedPath, parse_td, create_casename, - RAPSBaseModel, + parse_time_unit, convert_to_time_unit, infer_time_unit, ExpandedPath, create_casename, + RAPSBaseModel, AutoAwareDatetime, SmartTimedelta, yaml_dump, ) from raps.system_config import SystemConfig, get_partition_configs, get_system_config -from pydantic import model_validator +from pydantic import model_validator, Field Distribution = Literal['uniform', 'weibull', 'normal'] @@ -21,37 +21,54 @@ class SimConfig(RAPSBaseModel, abc.ABC): """ Include the FMU cooling model """ simulate_network: bool = False """ Include network model """ + weather: bool | None = None + """ + Include weather information in the cooling model. + Defaults to True if replay, False otherwise. + """ # Simulation runtime options - fastforward: int | None = None + start: AutoAwareDatetime | None = None + """ Start of simulation """ + # Exclude end from serialization as it is redundant with time + end: A[AutoAwareDatetime | None, Field(exclude=True)] = None + """ End of simulation. Pass either `time` or `end`, not both. """ + time: SmartTimedelta = timedelta(hours=1) """ - Fast-forward by time amount (unit specified by `time_unit`, default seconds). - Can pass a string like 15s, 1m, 1h + Length of time to simulate (default seconds). + Can pass a string like 123, 27m, 3h, 7d + Pass either `time` or `end`, not both. """ - time: int | None = None + fastforward: SmartTimedelta = timedelta(seconds=0) """ - Length of time to simulate (unit specified by `time_unit`, default seconds). - Can pass a string like 123, 27m, 3h, 7d + "Fast-forward" the simulation by time amount before starting. This is just a convenience + shortcut for setting --start without having to recall the exact start date of the dataset. + Can pass a string like 15s, 1m, 1h """ - time_delta: int = 1 + time_delta: SmartTimedelta = timedelta(seconds=1) """ - Step size (unit specified by `time_unit`, default seconds). + Step size for the power simulation (default seconds). Can pass a string like 15s, 1m, 1h, 1ms """ time_unit: timedelta = timedelta(seconds=1) """ - Units all time delta ints are measured in (default seconds) + The base unit of the simulation, determining how often it will tick the job scheduler. """ + @cached_property + def time_int(self) -> int: + """ Return time as an int of time_unit """ + return int(self.time / self.time_unit) + + @cached_property + def time_delta_int(self) -> int: + """ Return time_delta as an int of time_unit """ + return int(self.time_delta / self.time_unit) + @cached_property def downscale(self) -> int: return int(timedelta(seconds=1) / self.time_unit) - start: str = "2021-05-21T13:00:00-04:00" - """ ISO8601 start of simulation """ - end: str = "2021-05-21T14:00:00-04:00" - """ ISO8601 end of simulation """ - numjobs: int = 100 """ Number of jobs to schedule """ @@ -200,22 +217,34 @@ class SimConfig(RAPSBaseModel, abc.ABC): """ Path to accounts JSON file from previous run """ # Downtime - downtime_first: int | None = None + downtime_first: SmartTimedelta | None = None """ First downtime (unit specified by `time_unit`, default seconds). Can pass a string like 27m, 3h, 7d """ - downtime_interval: str | None = None + downtime_interval: SmartTimedelta | None = None """ Interval between downtimes (unit specified by `time_unit`, default seconds). Can pass a string like 123, 27m, 3h, 7d """ - downtime_length: str | None = None + downtime_length: SmartTimedelta | None = None """ Downtime length (unit specified by `time_unit`, default seconds). Can pass a string like 123, 27m, 3h, 7d """ + @cached_property + def downtime_first_int(self) -> int | None: + return None if self.downtime_first is None else int(self.downtime_first / self.time_unit) + + @cached_property + def downtime_interval_int(self) -> int | None: + return None if self.downtime_interval is None else int(self.downtime_interval / self.time_unit) + + @cached_property + def downtime_length_int(self) -> int | None: + return None if self.downtime_length is None else int(self.downtime_length / self.time_unit) + # Continous Job Generation continuous_job_generation: bool = False """ Activate continuous job generation """ @@ -229,40 +258,60 @@ class SimConfig(RAPSBaseModel, abc.ABC): def _validate_before(cls, data): # This is called with the raw input, before Pydantic parses it, so data is just a dict and # contain any data types. + data = {**data} - time_fields = [ + # infer time_unit + td_fields = [ "time_delta", "time", "fastforward", "downtime_first", "downtime_interval", "downtime_length", ] - - if data.get('time_unit') is not None: - time_unit = parse_time_unit(data['time_unit']) - input_time_unit = time_unit - else: + if data.get('time_unit') is None: time_unit = min( - [infer_time_unit(data[f]) for f in time_fields if data.get(f)], + [infer_time_unit(data[f]) for f in td_fields if data.get(f)], default=timedelta(seconds=1) ) - # When "inferring" time unit interpret raw numbers as seconds. - # E.g. `-t 10 --time-delta 1ds` should be `-t 10s --time-delta 1ds` - input_time_unit = timedelta(seconds=1) - + else: + time_unit = parse_time_unit(data['time_unit']) data['time_unit'] = time_unit - for field in time_fields: - if data.get(field) is not None: - td = parse_td(data[field], input_time_unit) - data[field] = convert_to_time_unit(td, time_unit) return data @model_validator(mode="after") def _validate_after(self): + # Allow setting either start/end or start/time for backwards compatibility and convenience + if self.start and self.fastforward: + raise ValueError("start and fastforward are mutually exclusive") + + if self.end: + if not self.start: + raise ValueError("end requires start to be set") + if 'time' not in self.model_fields_set: # If time was not explicitly set + self.time = self.end - self.start + elif self.start: + self.end = self.start + self.time + + if self.start and self.start + self.time != self.end: + raise ValueError("time and end values don't match. You only need to specify one.") + + td_fields = [ + "time_delta", "time", "fastforward", + "downtime_first", "downtime_interval", "downtime_length", + ] + # Check time fields are divisible by time_unit. + for field in td_fields: + td = getattr(self, field) + if td is not None: + convert_to_time_unit(td, self.time_unit) # will throw if invalid + if not self.replay and not self.workload: self.workload = "random" if self.cooling: self.layout = "layout2" + if self.weather is None: + self.weather = self.cooling and bool(self.replay) + if self.jobsize_is_power_of is not None and self.jobsize_is_of_degree is not None: raise ValueError("jobsize_is_power_of and jobsize_is_of_degree are mutually exclusive") @@ -341,20 +390,32 @@ class SimConfig(RAPSBaseModel, abc.ABC): args_dict = self.model_dump(mode="json") args_dict['system'] = self.system_name # validate has been renamed to power_scope - args_dict['validate'] = args_dict["power_scope"] == "node" + args_dict['validate'] = self.power_scope == "node" args_dict['downscale'] = self.downscale # Convert Path objects to str - if args_dict['output']: - args_dict['output'] = str(args_dict['output']) - if args_dict['replay']: - args_dict['replay'] = [str(p) for p in args_dict['replay']] - if args_dict['accounts_json']: - args_dict['accounts_json'] = str(args_dict['accounts_json']) + if self.output: + args_dict['output'] = str(self.output) + if self.replay: + args_dict['replay'] = [str(p) for p in self.replay] + if self.accounts_json: + args_dict['accounts_json'] = str(self.accounts_json) + + args_dict["time"] = self.time_int + args_dict["time_delta"] = self.time_delta_int + args_dict["downtime_first"] = self.downtime_first_int + args_dict["downtime_interval"] = self.downtime_interval_int + args_dict["downtime_length"] = self.downtime_length_int + args_dict['start'] = self.start.astimezone().isoformat() if self.start else None + args_dict['end'] = self.end.astimezone().isoformat() if self.end else None + args_dict.pop("fastforward") # Remove fastforward from this to avoid confusion later args_dict['sim_config'] = self return args_dict + def dump_yaml(self, exclude_unset=True): + return yaml_dump(self.model_dump(mode="json", exclude_unset=exclude_unset)) + class SingleSimConfig(SimConfig, abc.ABC): system: SystemConfig | str = "frontier" @@ -397,8 +458,8 @@ SIM_SHORTCUTS = { "partitions": "x", "cooling": "c", "simulate-network": "net", - "fastforward": "ff", "time": "t", + "fastforward": "ff", "debug": "d", "numjobs": "n", "verbose": "v", diff --git a/raps/stats.py b/raps/stats.py index aa8610ed2cbba7a9460a7cc7b5bd7980b2c1ddab..a42015155d156ccb1f7164c4bc4cb370a91428ac 100644 --- a/raps/stats.py +++ b/raps/stats.py @@ -30,26 +30,26 @@ def get_engine_stats(engine: Engine): total_cost = total_energy_consumed * 1000 * engine.config.get('POWER_COST', 0) # Total cost in dollars stats = { - 'time simulated': time_simulated, + 'time_simulated': time_simulated, 'num_samples': num_samples, - 'average power': f'{average_power_mw:.4f} MW', - 'min loss': f'{min_loss_mw:.4f} MW', - 'average loss': f'{average_loss_mw:.2f} MW', - 'max loss': f'{max_loss_mw:.2f} MW', - 'system power efficiency': f'{efficiency * 100:.2f}%', - 'total energy consumed': f'{total_energy_consumed:.2f} MW-hr', - 'carbon emissions': f'{emissions:.4f} metric tons CO2', - 'total cost': f'${total_cost:.2f}' + 'average_power': average_power_mw, + 'min_loss': min_loss_mw, + 'average_loss': average_loss_mw, + 'max_loss': max_loss_mw, + 'system_power_efficiency': efficiency * 100, + 'total_energy_consumed': total_energy_consumed, + 'carbon_emissions': emissions, + 'total_cost': total_cost, } if engine.config['multitenant']: # Multitenancy Stats total_jobs_loaded = engine.total_initial_jobs # Assuming this is passed to __init__ - stats['total jobs loaded'] = total_jobs_loaded + stats['total_jobs_loaded'] = total_jobs_loaded if total_jobs_loaded > 0: - stats['jobs completed percentage'] = f"{(engine.jobs_completed / total_jobs_loaded * 100):.2f}%" + stats['jobs_completed_percentage'] = engine.jobs_completed / total_jobs_loaded * 100 else: - stats['jobs completed percentage'] = "0%" + stats['jobs_completed_percentage'] = 0 if engine.node_occupancy_history: # Calculate average concurrent jobs per node (average density across all nodes and timesteps) @@ -76,11 +76,11 @@ def get_engine_stats(engine: Engine): avg_jobs_per_active_node = (sum_jobs_per_active_node / count_active_timesteps_for_avg_active) \ if count_active_timesteps_for_avg_active > 0 else 0 - stats['avg concurrent jobs per active node'] = f"{avg_jobs_per_active_node:.2f}" - stats['max concurrent jobs per node'] = max_concurrent_jobs_per_node + stats['avg_concurrent_jobs_per_active_node'] = avg_jobs_per_active_node + stats['max_concurrent_jobs_per_node'] = max_concurrent_jobs_per_node else: - stats['avg concurrent jobs per node'] = "N/A" - stats['max concurrent jobs per node'] = "N/A" + stats['avg_concurrent_jobs_per_node'] = None + stats['max_concurrent_jobs_per_node'] = None # network_stats = get_network_stats() # stats.update(network_stats) @@ -124,19 +124,19 @@ def get_network_stats(engine: Engine): else: mean_net_util = 0.0 - stats["avg network util"] = f"{mean_net_util * 100:.2f}%" + stats["avg_network_util"] = mean_net_util * 100 if engine.avg_slowdown_history: avg_job_slow = sum(engine.avg_slowdown_history) / len(engine.avg_slowdown_history) else: avg_job_slow = 1.0 - stats["avg per-job slowdown"] = f"{avg_job_slow:.2f}x" + stats["avg_per_job_slowdown"] = avg_job_slow if engine.max_slowdown_history: max_job_slow = max(engine.max_slowdown_history) else: max_job_slow = 1.0 - stats["max per-job slowdown"] = f"{max_job_slow:.2f}x" + stats["max_per_job_slowdown"] = max_job_slow return stats @@ -301,31 +301,32 @@ def get_job_stats(engine: Engine): min_nrx_u, max_nrx_u, avg_nrx_u = -1, -1, -1 job_stats = { - 'jobs completed': engine.jobs_completed, - 'throughput': f'{throughput:.2f} jobs/hour', - 'jobs still running': [job.id for job in engine.running], - 'jobs still in queue': [job.id for job in engine.queue], - 'Jobs <= 5 nodes': jobsSmall, - 'Jobs <= 50 nodes': jobsMedium, - 'Jobs <= 250 nodes': jobsLarge, - 'Jobs <= 4500 nodes': jobsVLarge, - 'Jobs > 4500 nodes': jobsHuge, + 'jobs_total': engine.jobs_completed + len(engine.running) + len(engine.queue), + 'jobs_completed': engine.jobs_completed, + 'throughput': throughput, + 'jobs_still_running': [job.id for job in engine.running], + 'jobs_still_in_queue': [job.id for job in engine.queue], + 'jobs <= 5 nodes': jobsSmall, + 'jobs <= 50 nodes': jobsMedium, + 'jobs <= 250 nodes': jobsLarge, + 'jobs <= 4500 nodes': jobsVLarge, + 'jobs > 4500 nodes': jobsHuge, # Information on job-mix executed - 'min job size': min_job_size, - 'max job size': max_job_size, - 'average job size': avg_job_size, - 'min runtime': min_runtime, - 'max runtime': max_runtime, - 'average runtime': avg_runtime, - 'min energy': min_energy, - 'max energy': max_energy, - 'avg energy': avg_energy, - 'min edp': min_edp, - 'max edp': max_edp, - 'avg edp': avg_edp, - 'min edp^2': min_edp2, - 'max edp^2': max_edp2, - 'avg edp^2': avg_edp2, + 'min_job_size': min_job_size, + 'max_job_size': max_job_size, + 'average_job_size': avg_job_size, + 'min_runtime': min_runtime, + 'max_runtime': max_runtime, + 'average_runtime': avg_runtime, + 'min_energy': min_energy, + 'max_energy': max_energy, + 'avg_energy': avg_energy, + 'min_edp': min_edp, + 'max_edp': max_edp, + 'avg_edp': avg_edp, + 'min_edp^2': min_edp2, + 'max_edp^2': max_edp2, + 'avg_edp^2': avg_edp2, 'min_aggregate_node_hours': min_agg_node_hours, 'max_aggregate_node_hours': max_agg_node_hours, 'avg_aggregate_node_hours': avg_agg_node_hours, @@ -362,28 +363,44 @@ def print_formatted_report(engine_stats=None, scheduler_stats=None, network_stats=None ): + def print_report_section(name, data, templates): + if data: + rep_str = f"--- {name} ---" + print(rep_str) + for key, value in data.items(): + pretty_key = key.replace('_', ' ').title() + if key in templates: + pretty_value = templates[key].format(value) + elif isinstance(value, float): + pretty_value = f"{value:.2f}" + elif value is None: + pretty_value = "N/A" + else: + pretty_value = str(value) + print(f"{pretty_key}: {pretty_value}") + print(f"{'-' * len(rep_str)}\n") + print() + # Print a formatted report - if engine_stats: - rep_str = "--- Simulation Report ---" - print(f"\n{rep_str}") - for key, value in engine_stats.items(): - print(f"{key.replace('_', ' ').title()}: {value}") - print(f"{'-' * len(rep_str)}\n") - if job_stats: - rep_str = "--- Job Stat Report ---" - print(f"\n{rep_str}") - for key, value in job_stats.items(): - print(f"{key.replace('_', ' ').title()}: {value}") - print(f"{'-' * len(rep_str)}\n") - if scheduler_stats: - rep_str = "--- Scheduler Report ---" - print(f"\n{rep_str}") - for key, value in scheduler_stats.items(): - print(f"{key.replace('_', ' ').title()}: {value}") - print(f"{'-' * len(rep_str)}\n") - if network_stats: - rep_str = "--- Network Report ---" - print(f"\n{rep_str}") - for key, value in network_stats.items(): - print(f"{key.replace('_', ' ').title()}: {value}") - print(f"{'-' * len(rep_str)}\n") + print() + print_report_section("Simulation Report", engine_stats, { + 'average_power': '{:.4f} MW', + 'min_loss': '{:.4f} MW', + 'average_loss': '{:.2f} MW', + 'max_loss': '{:.2f} MW', + 'system_power_efficiency': '{:.2f}%', + 'total_energy_consumed': '{:.2f} MW-hr', + 'carbon_emissions': '{:.4f} metric tons CO2', + 'total_cost': '${:.2f}', + }) + print_report_section("Job Stat Report", job_stats, { + 'throughput': '{:.2f} jobs/hour', + 'jobs_completed_percentage': "{:.2f}%", + }) + print_report_section("Scheduler Report", scheduler_stats, { + }) + print_report_section("Network Report", network_stats, { + "avg_network_util": "{:.2f}%", + "avg_per_job_slowdown": "{:.2f}x", + "max_per_job_slowdown": "{:.2f}x", + }) diff --git a/raps/ui.py b/raps/ui.py index b4234bc9cf63179017ab9a73a7ba95b571ef7b76..3965935bf8672e6472bf965b5b13e4ae24a79967 100644 --- a/raps/ui.py +++ b/raps/ui.py @@ -26,6 +26,7 @@ from raps.engine import TickData, Engine MAX_ROWS = 30 + class LayoutManager: def __init__(self, layout_type, engine: Engine, total_timesteps=0, debug=None, args_dict=None, **config): self.debug = debug diff --git a/raps/utils.py b/raps/utils.py index c3c541f6f817f9480bc5ec0914800c8a548f3ace..4414fd9c3899087420a1baa5ecb371103d605fe3 100644 --- a/raps/utils.py +++ b/raps/utils.py @@ -6,7 +6,7 @@ generating random numbers, summarizing and expanding ranges, determining job sta """ -from datetime import timedelta +from datetime import datetime, timedelta, timezone from enum import Enum import os import hashlib @@ -21,7 +21,9 @@ import json import argparse from pathlib import Path from typing import Annotated as A, TypeVar, Callable, TypeAlias -from pydantic import BaseModel, TypeAdapter, AfterValidator, ConfigDict, AwareDatetime, ValidationError +from pydantic import ( + BaseModel, TypeAdapter, AfterValidator, BeforeValidator, ConfigDict, AwareDatetime, ValidationError +) from pydantic_settings import BaseSettings, SettingsConfigDict, CliApp, CliSettingsSource import yaml from raps.job import Job @@ -533,6 +535,9 @@ def parse_td(td, unit: str | timedelta = 's') -> timedelta: if TypeAdapter(timedelta).validator.isinstance_python(td): return TypeAdapter(timedelta).validate_python(td) if isinstance(td, str): + if not pd.isna(pd.to_timedelta(td, errors="coerce")): + return pd.to_timedelta(td) + # Special case parsing for ds and cs units which pandas doesn't support re_match = re.fullmatch(r"(\d+)\s*(\w+)", td.strip()) if re_match and re_match[2] in TIME_UNITS: num_str, unit_str = re_match.groups() @@ -670,15 +675,23 @@ class ValueComparableEnum(Enum): return hash(self.value) +def normalize_tz(d: datetime): + """ Convert datetime to UTC. If naive, assume local time, then convert to UTC """ + if not d.tzinfo: + return d.astimezone().astimezone(timezone.utc) + else: + return d.astimezone(timezone.utc) + + ExpandedPath = A[Path, AfterValidator(lambda v: Path(v).expanduser().resolve())] """ Type that that expands ~ and environment variables in a path string """ +AutoAwareDatetime = A[datetime, AfterValidator(normalize_tz)] +""" Datetime type wrapper, makes sure timezone is set """ -SmartTimedelta = A[timedelta, AfterValidator(parse_td)] +SmartTimedelta = A[timedelta, BeforeValidator(parse_td)] """ Can be passed as ISO 8601 format like PT5M, or a string like 9s, or a number of seconds """ -T = TypeVar("T", bound=BaseModel) - class RAPSBaseModel(BaseModel): """ Base Pydantic model with shared config """ @@ -687,6 +700,9 @@ class RAPSBaseModel(BaseModel): ) +T = TypeVar("T", bound=BaseModel) + + def pydantic_add_args( parser: argparse.ArgumentParser, model_cls: type[T], model_config: SettingsConfigDict | None = None, @@ -727,7 +743,8 @@ def pydantic_add_args( **(data or {}), ) # Recreate model so we don't return the SettingsModel subclass - return model_cls.model_validate(model.model_dump()) + # use exclude_unset so that model_field_set is preserved as well + return model_cls.model_validate(model.model_dump(exclude_unset=True)) except ValidationError as err: print(err) sys.exit(1) @@ -738,8 +755,11 @@ SubParsers: TypeAlias = "argparse._SubParsersAction[argparse.ArgumentParser]" """ Alias for the result of argparse parser.add_subparsers """ -def yaml_dump(data): +def yaml_dump(data, header_comment=''): """ Dumps yaml with pretty formatting """ + if header_comment: + header_comment = '\n'.join(f'# {ln}' for ln in header_comment.splitlines()) + "\n" + class IndentDumper(yaml.Dumper): def represent_data(self, data): # Quote all strings with special characters to avoid confusion @@ -755,7 +775,7 @@ def yaml_dump(data): # Indent lists return super(IndentDumper, self).increase_indent(flow, False) - return yaml.dump( + return header_comment + yaml.dump( data, Dumper=IndentDumper, sort_keys=False, @@ -766,10 +786,15 @@ def yaml_dump(data): def read_yaml(config_file: str): """ Parses yaml file. Pass "-" to read from stdin """ - if config_file == "-": - return yaml.safe_load(sys.stdin.read()) + # Assume stdin if not terminal + if config_file == "-" or (not config_file and not sys.stdin.isatty()): + data = sys.stdin.read() elif config_file: - return yaml.safe_load(Path(config_file).read_text()) + data = Path(config_file).read_text() + else: + data = "" + if data.strip(): + return yaml.safe_load(data) else: return {} @@ -860,7 +885,8 @@ class WorkloadData(RAPSBaseModel): telemetry_end: int # TODO: It might make more sense to make start_timestep/end_timestep always unix time, then we # wouldn't need this extra start_date field. - start_date: AwareDatetime + # Don't use AutoAwareDatetime here as we want to enforce dataloaders returning timezone info + start_date: A[AwareDatetime, AfterValidator(lambda d: d.astimezone(timezone.utc))] model_config = ConfigDict( arbitrary_types_allowed=True, diff --git a/raps/weather.py b/raps/weather.py index 8a4e138a99f20bf7e9c3d7a955b1c0fcbed5631a..655e8f35e2bcbceef954df01a33a3275ec8156f1 100644 --- a/raps/weather.py +++ b/raps/weather.py @@ -7,7 +7,7 @@ urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) class Weather: - def __init__(self, iso_string, config): + def __init__(self, start: datetime | None, config): """ Initialize the Weather class with configuration loaded from a JSON file. If zip_code and country_code are provided, the coordinates (lat, lon) @@ -20,13 +20,7 @@ class Weather: self.lon = None self.weather_cache = {} # Cache for storing weather data for the entire day self.has_coords = False - self.start = None - - try: - # Convert the ISO 8601 string to a datetime object - self.start = datetime.fromisoformat(iso_string.replace("Z", "+00:00")) - except ValueError: - print("Invalid ISO 8601 datetime string specified for --start. Using default temperature instead.") + self.start = start # Retrieve coordinates if zip_code and country_code are provided if self.zip_code and self.country_code: diff --git a/raps/workloads/__init__.py b/raps/workloads/__init__.py index 080eafc32c770da724e1f9d397d20926de1ffd19..a34261a9996082bedb6f8a5395ebb24d5d4661e3 100644 --- a/raps/workloads/__init__.py +++ b/raps/workloads/__init__.py @@ -2,15 +2,17 @@ import math import numpy as np +import pandas as pd from raps.utils import WorkloadData, SubParsers -from raps.utils import pydantic_add_args +from raps.utils import pydantic_add_args, create_file_indexed from raps.sim_config import SingleSimConfig +from raps.telemetry import Telemetry from .basic import BasicWorkload from .constants import JOB_NAMES, ACCT_NAMES, MAX_PRIORITY from .distribution import DistributionWorkload -from .live import continuous_job_generation, run_workload +from .live import continuous_job_generation from .multitenant import MultitenantWorkload from .utils import plot_job_hist @@ -26,11 +28,12 @@ class BaseWorkload: def generate_jobs(self): jobs = getattr(self, self.args.workload)(args=self.args) timestep_end = int(math.ceil(max([job.end_time for job in jobs]))) + now = pd.Timestamp.now('UTC').floor("min").to_pydatetime() return WorkloadData( jobs=jobs, telemetry_start=0, telemetry_end=timestep_end, - start_date=self.args.start, + start_date=self.args.start if self.args.start else now, ) def compute_traces(self, @@ -73,3 +76,29 @@ def run_workload_add_parser(subparsers: SubParsers): "cli_shortcuts": SIM_SHORTCUTS, }) parser.set_defaults(impl=lambda args: run_workload(model_validate(args, {}))) + + +def run_workload(sim_config: SingleSimConfig): + args = sim_config.get_legacy_args() + args_dict = sim_config.get_legacy_args() + config = sim_config.system_configs[0].get_legacy() + + if sim_config.replay: + td = Telemetry(**args_dict) + jobs = td.load_from_files(sim_config.replay).jobs + else: + workload = Workload(args, config) + jobs = getattr(workload, sim_config.workload)(args=sim_config.get_legacy_args()) + plot_job_hist(jobs, + config=config, + dist_split=sim_config.multimodal, + gantt_nodes=sim_config.gantt_nodes) + + out = sim_config.get_output() + if out: + timestep_start = min([x.submit_time for x in jobs]) + timestep_end = math.ceil(max([x.submit_time for x in jobs]) + max([x.expected_run_time for x in jobs])) + filename = create_file_indexed('wl', path=str(out), create=False, ending="npz").split(".npz")[0] + # savez_compressed add npz itself, but create_file_indexed needs to check for .npz to find existing files + np.savez_compressed(filename, jobs=jobs, timestep_start=timestep_start, timestep_end=timestep_end, args=args) + print(filename + ".npz") # To std-out to show which npz was created. diff --git a/raps/workloads/live.py b/raps/workloads/live.py index 974b369441505dd99548aacfd3a2b7b7a49e7516..b4f2733d138c48222ef08394da7649f717ac0786 100644 --- a/raps/workloads/live.py +++ b/raps/workloads/live.py @@ -1,39 +1,6 @@ -import math -import numpy as np -from raps.sim_config import SingleSimConfig -from raps.telemetry import Telemetry -from raps.utils import create_file_indexed -from .utils import plot_job_hist - def continuous_job_generation(self, *, engine, timestep, jobs): # print("if len(engine.queue) <= engine.continuous_workload.args.maxqueue:") # print(f"if {len(engine.queue)} <= {engine.continuous_workload.args.maxqueue}:") if len(engine.queue) <= engine.continuous_workload.args.maxqueue: new_jobs = engine.continuous_workload.generate_jobs().jobs jobs.extend(new_jobs) - - -def run_workload(sim_config: SingleSimConfig): - args = sim_config.get_legacy_args() - args_dict = sim_config.get_legacy_args() - config = sim_config.system_configs[0].get_legacy() - - if sim_config.replay: - td = Telemetry(**args_dict) - jobs = td.load_from_files(sim_config.replay).jobs - else: - workload = Workload(args, config) - jobs = getattr(workload, sim_config.workload)(args=sim_config.get_legacy_args()) - plot_job_hist(jobs, - config=config, - dist_split=sim_config.multimodal, - gantt_nodes=sim_config.gantt_nodes) - - out = sim_config.get_output() - if out: - timestep_start = min([x.submit_time for x in jobs]) - timestep_end = math.ceil(max([x.submit_time for x in jobs]) + max([x.expected_run_time for x in jobs])) - filename = create_file_indexed('wl', path=str(out), create=False, ending="npz").split(".npz")[0] - # savez_compressed add npz itself, but create_file_indexed needs to check for .npz to find existing files - np.savez_compressed(filename, jobs=jobs, timestep_start=timestep_start, timestep_end=timestep_end, args=args) - print(filename + ".npz") # To std-out to show which npz was created. diff --git a/tests/conftest.py b/tests/conftest.py index 855f969f04aefe6fd79c7ba8f63cc379f7070e60..0b7d8b3b9ef3fa4a76ef715f238f2279039eb26c 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -4,6 +4,8 @@ import shutil from glob import glob from pathlib import Path import gc +import os +from .util import PROJECT_ROOT def pytest_addoption(parser): @@ -25,6 +27,7 @@ def sim_output(): Handles cleaning up output from the sim. Can also be used even if you aren't outputing anything to run garbage collection after the sim. """ + os.chdir(PROJECT_ROOT) out = f"test-output/test-{str(uuid.uuid4())[:8]}" yield out for file in glob(f"{out}*"): diff --git a/tests/systems/conftest.py b/tests/systems/conftest.py index 2703755135de9a9ad6028267d51839df0651e3bc..31fa936244e72532a13efa85702bf763dafbc1ce 100644 --- a/tests/systems/conftest.py +++ b/tests/systems/conftest.py @@ -2,18 +2,194 @@ import pytest from tests.util import DATA_PATH +SYSTEM_CONFIGS = { + "40frontiers": { + "marks": [pytest.mark.long], # All these tests are long running as the system is large. + "main": True, + "telemetry": False, + "workload": False, + "multi-part-sim": False, + "withdata": False, + "start": None, + "files": [], + "cooling": False, + "uncertainty": True, + "time": True, + "time_delta": True, + "net": False, + }, + "adastraMI250": { + "marks": [], + "main": True, + "telemetry": True, + "workload": True, + "multi-part-sim": False, + "withdata": True, + "start": "2024-09-01T02:00:00Z", + "files": ["adastraMI250/AdastaJobsMI250_15days.parquet"], + "cooling": False, + "uncertainty": True, + "time": True, + "time_delta": True, + "net": False, + }, + "bluewaters": { + "marks": [], + "main": True, + "telemetry": True, + "workload": True, + "multi-part-sim": False, + "withdata": True, + "start": "2017-03-28T02:00:00Z", + "files": ["bluewaters"], + "cooling": False, + "uncertainty": False, + "time": True, + "time_delta": True, + "net": False, + }, + "frontier": { + "marks": [], + "main": True, + "telemetry": True, + "workload": True, + "multi-part-sim": False, + "withdata": True, + "start": "2024-01-18T03:00:00Z", + "files": ["frontier/slurm/joblive/date=2024-01-18/", "frontier/jobprofile/date=2024-01-18/"], + "cooling": True, + "uncertainty": True, + "time": True, + "time_delta": True, + "net": False, + }, + "fugaku": { + "marks": [], + "main": True, + "telemetry": True, + "workload": True, + "multi-part-sim": False, + "withdata": True, + "start": "2021-04-03T02:00:00Z", + "files": ["fugaku/21_04.parquet"], + "cooling": False, + "uncertainty": False, + "time": True, + "time_delta": True, + "net": False, + }, + "gcloudv2": { + "marks": [], + "main": True, + "telemetry": True, + "workload": True, + "multi-part-sim": False, + "withdata": True, + "start": "2011-05-02T05:00:00Z", + "files": ["gcloud/v2/google_cluster_data_2011_sample"], + "cooling": False, + "uncertainty": False, + "time": True, + "time_delta": True, + "net": False, + }, + "lassen": { + "marks": [], + "main": True, + "telemetry": False, # Takes very long! + "workload": False, + "multi-part-sim": False, + "withdata": True, + "start": "2019-08-22T00:00:00Z", + "files": ["lassen/Lassen-Supercomputer-Job-Dataset"], + "cooling": True, + "uncertainty": False, + "time": True, + "time_delta": True, + "net": True, + }, + "marconi100": { + "marks": [], + "main": True, + "telemetry": True, + "workload": True, + "multi-part-sim": False, + "withdata": True, + "start": "2020-05-06T07:30:00Z", + "files": ["marconi100/job_table.parquet"], + "cooling": True, + "uncertainty": False, + "time": True, + "time_delta": True, + "net": False, + }, + "mit_supercloud": { + "marks": [], + "main": False, + "telemetry": False, + "workload": False, + "multi-part-sim": True, + "withdata": True, + "start": "2021-05-22T00:00:00Z", + "files": ["mit_supercloud/202201"], + "cooling": False, + "uncertainty": False, + "time": False, + "time_delta": False, + "net": False, + "net-multi-sim": True, + }, + "setonix": { + "marks": [], + "main": False, + "telemetry": True, + "workload": False, + "multi-part-sim": True, + "withdata": False, + "files": [], + "start": None, + "cooling": False, + "uncertainty": False, + "time": False, + "time_delta": False, + "net": False, + }, + "summit": { + "marks": [], + "main": True, + "telemetry": False, + "workload": False, + "multi-part-sim": False, + "withdata": False, + "files": [], + "start": None, + "cooling": True, + "uncertainty": False, + "time": True, + "time_delta": True, + "net": False, + }, + "lumi": { + "marks": [], + "main": False, + "telemetry": False, + "workload": False, + "multi-part-sim": True, + "withdata": False, + "files": [], + "start": None, + "cooling": False, + "uncertainty": False, + "time": False, + "time_delta": False, + "net": False, + "net-multi-sim": False + }, +} + + @pytest.fixture(params=[ - pytest.param("40frontiers", marks=pytest.mark.long), # All these tests are long running as the system is large. - "adastraMI250", - "frontier", - "fugaku", - "gcloudv2", - "lassen", - "marconi100", - "mit_supercloud", - "setonix", - "summit", - "lumi" + pytest.param(k, marks=v.get('marks', [])) for k, v in SYSTEM_CONFIGS.items() ]) def system(request): return request.param @@ -33,183 +209,12 @@ def pytest_collection_modifyitems(config, items): # #Define tests to run here! @pytest.fixture def system_config(system): - # Defaults for systems not listed explicitly - default_config = {} # No defaults! - - configs = { - "40frontiers": { - "main": True, - "telemetry": False, - "workload": False, - "multi-part-sim": False, - "withdata": False, - "cooling": False, - "uncertainty": True, - "time": True, - "fastforward": True, - "time_delta": True, - "net": False, - }, - "adastraMI250": { - "main": True, - "telemetry": True, - "workload": True, - "multi-part-sim": False, - "withdata": True, - "cooling": False, - "uncertainty": True, - "time": True, - "fastforward": True, - "time_delta": True, - "net": False, - }, - "bluewaters": { - "main": True, - "telemetry": True, - "multi-part-sim": False, - "withdata": True, - "cooling": False, - "uncertainty": False, - "time": True, - "fastforward": True, - "time_delta": True, - "net": False, - }, - "frontier": { - "main": True, - "telemetry": True, - "workload": True, - "multi-part-sim": False, - "withdata": True, - "cooling": True, - "uncertainty": True, - "time": True, - "fastforward": True, - "time_delta": True, - "net": False, - }, - "fugaku": { - "main": True, - "telemetry": True, - "multi-part-sim": False, - "withdata": True, - "cooling": False, - "uncertainty": False, - "time": True, - "fastforward": True, - "time_delta": True, - "net": False, - }, - "gcloudv2": { - "main": True, - "telemetry": True, - "multi-part-sim": False, - "withdata": True, - "cooling": False, - "uncertainty": False, - "time": True, - "fastforward": True, - "time_delta": True, - "net": False, - }, - "lassen": { - "main": True, - "telemetry": False, # Takes very long! - "multi-part-sim": False, - "withdata": True, - "cooling": True, - "uncertainty": False, - "time": True, - "fastforward": True, - "time_delta": True, - "net": True, - }, - "marconi100": { - "main": True, - "telemetry": True, - "multi-part-sim": False, - "withdata": True, - "cooling": True, - "uncertainty": False, - "time": True, - "fastforward": True, - "time_delta": True, - "net": False, - }, - "mit_supercloud": { - "main": False, - "telemetry": False, - "multi-part-sim": True, - "withdata": True, - "cooling": False, - "uncertainty": False, - "time": False, - "fastforward": False, - "time_delta": False, - "net": False, - "net-multi-sim": True, - }, - "setonix": { - "main": False, - "telemetry": True, - "multi-part-sim": True, - "withdata": False, - "cooling": False, - "uncertainty": False, - "time": False, - "fastforward": False, - "time_delta": False, - "net": False, - }, - "summit": { - "main": True, - "telemetry": False, - "multi-part-sim": False, - "withdata": False, - "cooling": True, - "uncertainty": False, - "time": True, - "fastforward": True, - "time_delta": True, - "net": False, - }, - "lumi": { - "main": False, - "telemetry": False, - "multi-part-sim": True, - "withdata": False, - "cooling": False, - "uncertainty": False, - "time": False, - "fastforward": False, - "time_delta": False, - "net": False, - "net-multi-sim": False - }, - - - } - return configs.get(system, default_config) + return SYSTEM_CONFIGS[system] @pytest.fixture def system_files(system): - files = { - "40frontiers": [], - "adastraMI250": ["adastraMI250/AdastaJobsMI250_15days.parquet"], - "bluewaters": ["bluewaters"], - "frontier": ["frontier/slurm/joblive/date=2024-01-18/", "frontier/jobprofile/date=2024-01-18/"], - "fugaku": ["fugaku/21_04.parquet"], - "gcloudv2": ["gcloud/v2/google_cluster_data_2011_sample"], - "lassen": ["lassen/Lassen-Supercomputer-Job-Dataset"], - "marconi100": ["marconi100/job_table.parquet"], - "mit_supercloud": ["mit_supercloud/202201"], - "setonix": [], - "summit": [], - "lumi": [] - } - - file_list = [DATA_PATH / f for f in files.get(system, [])] + file_list = [DATA_PATH / f for f in SYSTEM_CONFIGS[system].get('files', [])] for file in file_list: assert file.exists(), \ f"File `{file}' does not exist. does ./data exist or is RAPS_DATA_DIR set?" diff --git a/tests/systems/test_engine.py b/tests/systems/test_engine.py deleted file mode 100644 index 0404e89c00c24fbffffa06447c9eb8d93e0adaa1..0000000000000000000000000000000000000000 --- a/tests/systems/test_engine.py +++ /dev/null @@ -1,39 +0,0 @@ -import pytest -from raps.engine import Engine -from raps.sim_config import SingleSimConfig -from raps.stats import ( - get_engine_stats, - # get_job_stats, - # get_scheduler_stats, - # get_network_stats, -) - -pytestmark = [ - pytest.mark.system, - pytest.mark.nodata -] - - -def test_engine(system, system_config, sim_output): - if not system_config.get("main", False): - pytest.skip(f"{system} does not support basic main run.") - - sim_config = SingleSimConfig.model_validate({ - "system": system, - "time": "2m", - }) - engine, workload_data, time_delta = Engine.from_sim_config(sim_config) - jobs = workload_data.jobs - timestep_start = workload_data.telemetry_start - timestep_end = workload_data.telemetry_end - ticks = list(engine.run_simulation(jobs, timestep_start, timestep_end, time_delta)) - - assert len(ticks) == 120 - - engine_stats = get_engine_stats(engine) - # job_stats = get_job_stats(engine) - # scheduler_stats = get_scheduler_stats(engine) - # network_stats = get_network_stats(engine) - - assert engine_stats['time simulated'] == '0:02:00' - # TODO: More specific tests of values diff --git a/tests/systems/test_engine_basic.py b/tests/systems/test_engine_basic.py new file mode 100644 index 0000000000000000000000000000000000000000..96c62530228084b2c55464c4e7af9b821e1dbcce --- /dev/null +++ b/tests/systems/test_engine_basic.py @@ -0,0 +1,20 @@ +import pytest +from ..util import run_engine + +pytestmark = [ + pytest.mark.system, + pytest.mark.nodata +] + + +def test_engine_basic(system, system_config, sim_output): + if not system_config.get("main", False): + pytest.skip(f"{system} does not support basic main run.") + + engine, stats = run_engine({ + "system": system, + "time": "2m", + }) + + assert stats['tick_count'] == 120 + assert stats['engine']['time_simulated'] == '0:02:00' diff --git a/tests/systems/test_main_fastforward_run.py b/tests/systems/test_main_fastforward_run.py index ab19b2403b179221e2c65798dd524d81e4d0d17f..9fe3216da154c74f730feea3b6d4faf5c84a3dbb 100644 --- a/tests/systems/test_main_fastforward_run.py +++ b/tests/systems/test_main_fastforward_run.py @@ -1,7 +1,7 @@ import os import subprocess import pytest -from tests.util import PROJECT_ROOT +from ..util import run_engine pytestmark = [ @@ -11,23 +11,14 @@ pytestmark = [ ] -@pytest.mark.parametrize("ff_arg", [ - "0", "1", "3600", "7200", "43200", - "0s", "1s", "3600s", "7200s", "43200s", - "0m", "1m", "60m", - "0h", "1h", "6h", -]) +@pytest.mark.parametrize("ff_arg", ["0s", "1s", "3600s", "60m"]) def test_main_fastforward_run(system, system_config, ff_arg, sim_output): - if not system_config.get("fastforward", False): - pytest.skip(f"{system} does not support basic main run.") + if not system_config.get("main", False): + pytest.skip(f"{system} does not support basic main even without data.") - os.chdir(PROJECT_ROOT) - result = subprocess.run([ - "python", "main.py", "run", - "-t 1", - "--fastforward", ff_arg, - "--system", system, - "--noui", - "-o", sim_output - ], capture_output=True, text=True, stdin=subprocess.DEVNULL) - assert result.returncode == 0, f"Failed on {system}: {result.stderr}" + engine, stats = run_engine({ + "system": system, + "fastforward": ff_arg, + "time": "10s", + }) + assert stats['engine']['time_simulated'] == '0:00:10' diff --git a/tests/systems/test_main_start_run.py b/tests/systems/test_main_start_run.py new file mode 100644 index 0000000000000000000000000000000000000000..19cc163ee7d4ffccc2377922222736ce5224d71c --- /dev/null +++ b/tests/systems/test_main_start_run.py @@ -0,0 +1,22 @@ +import pytest +from ..util import run_engine + + +pytestmark = [ + pytest.mark.system, + pytest.mark.nodata, +] + + +@pytest.mark.parametrize("start", [ + "2025-01-01", "2024-01-04T00:00Z", "1970-01-01T00:00:00+00:00", +]) +def test_main_start_run(system, system_config, sim_output, start): + if not system_config.get("main", False): + pytest.skip(f"{system} does not support basic main even without data.") + + engine, stats = run_engine({ + "system": system, + "time": "10s", + "start": start + }) diff --git a/tests/systems/test_main_time_ff_delta_run.py b/tests/systems/test_main_time_ff_delta_run.py deleted file mode 100644 index 74247583e31067f328560aed438cdf91e0234243..0000000000000000000000000000000000000000 --- a/tests/systems/test_main_time_ff_delta_run.py +++ /dev/null @@ -1,38 +0,0 @@ -import os -import subprocess -import pytest -from tests.util import PROJECT_ROOT - - -pytestmark = [ - pytest.mark.system, - pytest.mark.nodata, - pytest.mark.time_delta -] - - -@pytest.mark.parametrize("time_arg, tdelta_arg, ff_arg", [ - ("100", "1", "103"), - ("100", "1s", "2s"), - ("100", "10s", "10s"), - ("10m", "1m", "1m"), - ("10h", "1h", "2h"), - ("10h", "3h", "1h"), - pytest.param("3d", "1d", "1d", marks=pytest.mark.long, id="1d (long)"), -], ids=["1", "1s", "10s", "1m", "1h", "3h", "1d"]) -def test_main_time_ff_delta_run(system, system_config, time_arg, tdelta_arg, - ff_arg, sim_output): - if not system_config.get("time_delta", False): - pytest.skip(f"{system} does not support time_delta run.") - - os.chdir(PROJECT_ROOT) - result = subprocess.run([ - "python", "main.py", "run", - "-t", time_arg, - "--ff", ff_arg, - "--time-delta", tdelta_arg, - "--system", system, - "--noui", - "-o", sim_output - ], capture_output=True, text=True, stdin=subprocess.DEVNULL) - assert result.returncode == 0, f"Failed on {system}: {result.stderr}" diff --git a/tests/systems/test_main_withdata_range_run.py b/tests/systems/test_main_withdata_range_run.py new file mode 100644 index 0000000000000000000000000000000000000000..63c3e8657d3d523b3ed32abf61c118987f4973bb --- /dev/null +++ b/tests/systems/test_main_withdata_range_run.py @@ -0,0 +1,27 @@ +import pytest +from ..util import run_engine + +pytestmark = [ + pytest.mark.system, + pytest.mark.withdata, + pytest.mark.long +] + + +def test_main_withdata_range_run(system, system_config, system_files, sim_output): + if not system_config.get("main", False): + pytest.skip(f"{system} does not support basic main even without data.") + if not system_config.get("withdata", False): + pytest.skip(f"{system} does not support basic main with data.") + + engine, stats = run_engine({ + "system": system, + "start": system_config['start'], + "time": "10m", + "replay": system_files, + }) + + # Check that it at least loaded some data + assert stats['tick_count'] == 10 * 60 + assert stats['job']['jobs_total'] > 0 + assert len(stats['job']['jobs_still_running']) + stats['job']['jobs_completed'] > 0 diff --git a/tests/systems/test_main_withdata_run.py b/tests/systems/test_main_withdata_run.py index 3539db9acd77add3f651b79240bcf62c81c944ef..ed1a9442e7d8386d01a4e8607a07218b14286914 100644 --- a/tests/systems/test_main_withdata_run.py +++ b/tests/systems/test_main_withdata_run.py @@ -1,8 +1,5 @@ -import os -import subprocess import pytest -from tests.util import PROJECT_ROOT - +from ..util import run_engine pytestmark = [ pytest.mark.system, @@ -16,12 +13,14 @@ def test_main_withdata_run(system, system_config, system_files, sim_output): pytest.skip(f"{system} does not support basic main even without data.") if not system_config.get("withdata", False): pytest.skip(f"{system} does not support basic main with data.") - os.chdir(PROJECT_ROOT) - result = subprocess.run([ - "python", "main.py", "run", - "--time", "1m", - "--system", system, - "-f", ','.join(system_files), - "-o", sim_output - ], capture_output=True, text=True, stdin=subprocess.DEVNULL) - assert result.returncode == 0, f"Failed on {system}: {result.stderr}" + + engine, stats = run_engine({ + "system": system, + "time": "10m", + "replay": system_files, + }) + + # Check that it at least loaded some data + assert stats['tick_count'] == 10 * 60 + assert stats['job']['jobs_total'] > 0 + assert len(stats['job']['jobs_still_running']) + stats['job']['jobs_completed'] > 0 diff --git a/tests/systems/test_multi_part_sim_withdata_run.py b/tests/systems/test_multi_part_sim_withdata_run.py index 538726c4ece73c68179e7291a5b81b3063d25b16..b35e8fe194cf31cc640c1698078b076d719a7211 100644 --- a/tests/systems/test_multi_part_sim_withdata_run.py +++ b/tests/systems/test_multi_part_sim_withdata_run.py @@ -1,7 +1,5 @@ -import os -import subprocess import pytest -from tests.util import PROJECT_ROOT +from tests.util import run_multi_part_engine pytestmark = [ @@ -17,12 +15,12 @@ def test_multi_part_sim_withdata_run(system, system_config, system_files, sim_ou if not system_config.get("withdata", False): pytest.skip(f"{system} does not support multi-part-sim run with data.") - os.chdir(PROJECT_ROOT) - result = subprocess.run([ - "python", "main.py", "run-parts", - "--time", "1h", - "-x", f"{system}/*", - "-f", ','.join(system_files), - "-o", sim_output, - ], capture_output=True, text=True, stdin=subprocess.DEVNULL) - assert result.returncode == 0, f"Failed on {system}: {result.stderr}" + engine, stats = run_multi_part_engine({ + "start": system_config['start'], + "time": "1h", + "partitions": [system], + "replay": system_files, + }) + + times = [s['engine']['time_simulated'] for s in stats['partitions'].values()] + assert len(set(times)) == 1 # All run the same time diff --git a/tests/unit/test_utils.py b/tests/unit/test_utils.py index edf06fe12f479ec0d8ecdeb9f6c45beff94380ec..5eb7edb4a15f2a75adf87ded18b6bf8049c45878 100644 --- a/tests/unit/test_utils.py +++ b/tests/unit/test_utils.py @@ -1,6 +1,6 @@ import pytest from datetime import timedelta -from raps.utils import parse_td, convert_to_time_unit, infer_time_unit, TIME_UNITS +from raps.utils import parse_td, convert_to_time_unit, infer_time_unit, TIME_UNITS, parse_time_unit @pytest.mark.parametrize("input,expected", [ @@ -9,11 +9,25 @@ from raps.utils import parse_td, convert_to_time_unit, infer_time_unit, TIME_UNI (timedelta(minutes=1), timedelta(minutes=1)), (2, timedelta(seconds=2)), ("PT2S", timedelta(seconds=2)), + ("+1 day", timedelta(days=1)), + ("2ds", timedelta(milliseconds=200)), + ("2cs", timedelta(milliseconds=20)), + ("2ms", timedelta(milliseconds=2)), ]) def test_parse_td(input, expected): assert parse_td(input) == expected +@pytest.mark.parametrize("input,expected", [ + ("s", timedelta(seconds=1)), + ("ms", timedelta(milliseconds=1)), + ("ds", timedelta(milliseconds=100)), + ("cs", timedelta(milliseconds=10)), +]) +def test_parse_time_unit(input, expected): + assert parse_time_unit(input) == expected + + def test_parse_td_error(): with pytest.raises(ValueError): parse_td("1x") diff --git a/tests/util.py b/tests/util.py index 6ee1df764bdf971f4c54066068814fc823a35ef1..b5ba49578377d978eff05b019abf0c0f9e75f895 100644 --- a/tests/util.py +++ b/tests/util.py @@ -1,5 +1,14 @@ import os +from typing import Any from pathlib import Path +import shlex +import json +from raps.engine import Engine +from raps.multi_part_engine import MultiPartEngine +from raps.sim_config import SingleSimConfig, MultiPartSimConfig +from raps.stats import ( + get_engine_stats, get_job_stats, get_scheduler_stats, get_network_stats, +) def find_project_root(): @@ -26,3 +35,79 @@ def requires_all_markers(request, required_markers): markexpr = getattr(request.config.option, "markexpr", "") selected = set(part.strip() for part in markexpr.split("and")) return required_markers.issubset(selected) + + +def _get_cmd(config, sub_cmd): + return f"echo {shlex.quote(json.dumps(config))} | python main.py {sub_cmd} - -o none" + + +def _get_stats(engine: Engine): + return { + 'engine': get_engine_stats(engine), + 'job': get_job_stats(engine), + 'scheduler': get_scheduler_stats(engine), + 'network': get_network_stats(engine) if engine.simulate_network else None, + } + + +def run_engine(sim_config, include_ticks=False) -> tuple[Engine, dict[str, Any]]: + """ + Run a simulation to completion. Returns the completed Engine and a dict containing the engine + stats. If include_ticks is True, the dict will also include a list of all the TickDatas (this + can be very large, especially if cooling is enabled!) + """ + # Log command to rerun the test manually for debugging convenience + print(f"Command to reproduce run:\n {_get_cmd(sim_config, "run")}") + + sim_config = SingleSimConfig.model_validate(sim_config) + engine, workload_data, time_delta = Engine.from_sim_config(sim_config) + jobs = workload_data.jobs + timestep_start = workload_data.telemetry_start + timestep_end = workload_data.telemetry_end + gen = engine.run_simulation(jobs, timestep_start, timestep_end, time_delta) + + stats = { + "tick_count": 0, + "tick_datas": [] if include_ticks else None, + } + + for tick in gen: + stats['tick_count'] += 1 + if include_ticks: + stats['tick_datas'].append(tick) + + stats.update(_get_stats(engine)) + + return engine, stats + + +def run_multi_part_engine(sim_config, include_ticks=False) -> tuple[MultiPartEngine, dict[str, dict[str, Any]]]: + """ + Run a multi-part simulation to completion. Returns the completed Engine and a dict containing the engine + stats for each partition. If include_ticks is True, the dicts will also include a list of all the + TickDatas (this can be very large, especially if cooling is enabled!) + """ + # Log command to rerun the test manually for debugging convenience + print(f"Command to reproduce run:\n {_get_cmd(sim_config, "run-parts")}") + + sim_config = MultiPartSimConfig.model_validate(sim_config) + multi_engine, workload_results, timestep_start, timestep_end, time_delta = \ + MultiPartEngine.from_sim_config(sim_config) + jobs = {p: w.jobs for p, w in workload_results.items()} + gen = multi_engine.run_simulation(jobs, timestep_start, timestep_end, time_delta) + + stats = { + "tick_count": 0, + "tick_datas": [] if include_ticks else None, + "partitions": {}, + } + + for tick in gen: + stats['tick_count'] += 1 + if include_ticks: + stats['tick_datas'].append(tick) + + for partition, engine in multi_engine.engines.items(): + stats['partitions'][partition] = _get_stats(engine) + + return multi_engine, stats