From 88bd91650233a0f623a24687fd83be863385b42a Mon Sep 17 00:00:00 2001 From: Jesse Hines Date: Thu, 28 Aug 2025 15:21:44 -0400 Subject: [PATCH 01/14] Add missing tests --- tests/systems/conftest.py | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/tests/systems/conftest.py b/tests/systems/conftest.py index 269d101..2703755 100644 --- a/tests/systems/conftest.py +++ b/tests/systems/conftest.py @@ -63,6 +63,18 @@ def system_config(system): "time_delta": True, "net": False, }, + "bluewaters": { + "main": True, + "telemetry": True, + "multi-part-sim": False, + "withdata": True, + "cooling": False, + "uncertainty": False, + "time": True, + "fastforward": True, + "time_delta": True, + "net": False, + }, "frontier": { "main": True, "telemetry": True, @@ -89,10 +101,10 @@ def system_config(system): "net": False, }, "gcloudv2": { - "main": False, - "telemetry": False, + "main": True, + "telemetry": True, "multi-part-sim": False, - "withdata": False, + "withdata": True, "cooling": False, "uncertainty": False, "time": True, @@ -185,6 +197,7 @@ def system_files(system): files = { "40frontiers": [], "adastraMI250": ["adastraMI250/AdastaJobsMI250_15days.parquet"], + "bluewaters": ["bluewaters"], "frontier": ["frontier/slurm/joblive/date=2024-01-18/", "frontier/jobprofile/date=2024-01-18/"], "fugaku": ["fugaku/21_04.parquet"], "gcloudv2": ["gcloud/v2/google_cluster_data_2011_sample"], -- GitLab From 47f27d768fb3b2792b50db2a2744ab27d5fffae6 Mon Sep 17 00:00:00 2001 From: Jesse Hines Date: Wed, 3 Sep 2025 12:04:29 -0400 Subject: [PATCH 02/14] Return start_date in data loaders --- raps/dataloaders/adastraMI250.py | 8 +- raps/dataloaders/bluewaters.py | 14 +++- raps/dataloaders/frontier.py | 17 +++- raps/dataloaders/fugaku.py | 7 +- raps/dataloaders/gcloudv2.py | 18 +++-- raps/dataloaders/kestrel.py | 8 +- raps/dataloaders/lassen.py | 8 +- raps/dataloaders/marconi100.py | 8 +- raps/dataloaders/mit_supercloud/loader.py | 22 ++--- raps/utils.py | 97 ++++++++++++++++++++++- 10 files changed, 169 insertions(+), 38 deletions(-) diff --git a/raps/dataloaders/adastraMI250.py b/raps/dataloaders/adastraMI250.py index 90201c8..9cb53d5 100644 --- a/raps/dataloaders/adastraMI250.py +++ b/raps/dataloaders/adastraMI250.py @@ -24,7 +24,7 @@ import pandas as pd from tqdm import tqdm from ..job import job_dict, Job -from ..utils import next_arrival_byconfkwargs +from ..utils import next_arrival_byconfkwargs, DataLoaderResult def load_data(jobs_path, **kwargs): @@ -205,7 +205,11 @@ def load_data_from_df(jobs_df: pd.DataFrame, **kwargs): count_jobs_notOK += 1 print("jobs not added: ", count_jobs_notOK) - return jobs, telemetry_start_time, telemetry_end_time + return DataLoaderResult( + jobs = jobs, + telemetry_start=telemetry_start_time, telemetry_end=telemetry_end_time, + start_date=telemetry_start_timestamp, + ) def xname_to_index(xname: str, config: dict): diff --git a/raps/dataloaders/bluewaters.py b/raps/dataloaders/bluewaters.py index 46fa462..b7f1c10 100644 --- a/raps/dataloaders/bluewaters.py +++ b/raps/dataloaders/bluewaters.py @@ -32,7 +32,9 @@ import math import re import pandas as pd from pathlib import Path +from datetime import datetime, timezone from raps.telemetry import Job, job_dict +from raps.utils import DataLoaderResult def throughput_traces(total_tx, total_rx, intervals): @@ -325,7 +327,11 @@ def load_data(local_dataset_path, **kwargs): j.trace_end_time -= t0 # pprint(jobs) - simulation_start = 0 - simulation_end = max((j.end_time for j in jobs), default=0) - - return jobs, simulation_start, simulation_end + telemetry_start = 0 + telemetry_end = max((j.end_time for j in jobs), default=0) + + return DataLoaderResult( + jobs=jobs, + telemetry_start=telemetry_start, telemetry_end=telemetry_end, + start_date=datetime.fromtimestamp(t0, timezone.utc), + ) diff --git a/raps/dataloaders/frontier.py b/raps/dataloaders/frontier.py index 8491617..15621d1 100644 --- a/raps/dataloaders/frontier.py +++ b/raps/dataloaders/frontier.py @@ -10,12 +10,13 @@ python -m raps.telemetry -f $DPATH/slurm/joblive/$DATEDIR,$DPATH/jobprofile/$DATEDIR """ import time +from datetime import datetime, timezone import numpy as np import pandas as pd from tqdm import tqdm from ..job import job_dict, Job -from ..utils import power_to_utilization, next_arrival_byconfkwargs, encrypt +from ..utils import power_to_utilization, next_arrival_byconfkwargs, encrypt, DataLoaderResult def aging_boost(nnodes): @@ -325,7 +326,12 @@ def load_data_from_df(jobs_df: pd.DataFrame, jobprofile_df: pd.DataFrame, **kwar job = Job(job_info) jobs.append(job) - return jobs, telemetry_start, telemetry_end + return DataLoaderResult( + jobs = jobs, + telemetry_start = telemetry_start, + telemetry_end = telemetry_end, + start_date = telemetry_start_timestamp, + ) def load_live_data(**kwargs): @@ -537,7 +543,12 @@ def load_live_data(**kwargs): job = Job(job_info) jobs.append(job) - return jobs, telemetry_start, telemetry_end + return DataLoaderResult( + jobs = jobs, + telemetry_start = telemetry_start, + telemetry_end = telemetry_end, + start_date = datetime.fromtimestamp(telemetry_start, timezone.utc), + ) def xname_to_index(xname: str, config: dict): diff --git a/raps/dataloaders/fugaku.py b/raps/dataloaders/fugaku.py index 734fa61..0dd2c3b 100644 --- a/raps/dataloaders/fugaku.py +++ b/raps/dataloaders/fugaku.py @@ -17,6 +17,7 @@ import pandas as pd from tqdm import tqdm from ..job import job_dict, Job +from ..utils import DataLoaderResult def load_data(path, **kwargs): @@ -167,7 +168,11 @@ def load_data_from_df(df, **kwargs): job = Job(job_info) job_list.append(job) - return job_list, telemetry_start, telemetry_end + return DataLoaderResult( + jobs=job_list, + telemetry_start = telemetry_start, telemetry_end = telemetry_end, + start_date = telemetry_start_timestamp, + ) def node_index_to_name(index: int, config: dict): diff --git a/raps/dataloaders/gcloudv2.py b/raps/dataloaders/gcloudv2.py index e19b0e8..73f918b 100644 --- a/raps/dataloaders/gcloudv2.py +++ b/raps/dataloaders/gcloudv2.py @@ -1,13 +1,14 @@ import os import re +from datetime import datetime from tqdm import tqdm from typing import List, Optional, Generator, Tuple, Any, Union import numpy as np import pandas as pd -from raps.job import job_dict -from raps.job import Job +from raps.job import job_dict, Job +from raps.utils import DataLoaderResult """ Official instructions are here: @@ -200,7 +201,7 @@ class GoogleClusterV2DataLoader: yield pd.concat(dfs, ignore_index=True) -def load_data(data_path: Union[str, List[str]], **kwargs: Any) -> Tuple[List[Any], float, float]: +def load_data(data_path: Union[str, List[str]], **kwargs: Any): config = kwargs.get('config') # Unpack list if isinstance(data_path, list): @@ -331,6 +332,11 @@ def load_data(data_path: Union[str, List[str]], **kwargs: Any) -> Tuple[List[Any jobs.append(Job(job_d)) # Compute simulation span: start at t=0, end at the latest job finish - simulation_start = 0 - simulation_end = int(max(usage_map_end.values()) - t0) - return jobs, simulation_start, simulation_end + telemetry_start = 0 + telemetry_end = int(max(usage_map_end.values()) - t0) + return DataLoaderResult( + jobs = jobs, + telemetry_start=telemetry_start, telemetry_end=telemetry_end, + # gcloud dataset timestamps are already relative, and it doesn't list a start exact date. + start_date=datetime.fromisoformat("2011-05-02T00:00:00Z"), + ) diff --git a/raps/dataloaders/kestrel.py b/raps/dataloaders/kestrel.py index 8b8470a..c82b957 100644 --- a/raps/dataloaders/kestrel.py +++ b/raps/dataloaders/kestrel.py @@ -6,7 +6,7 @@ import pandas as pd from tqdm import tqdm from ..job import job_dict, Job -from ..utils import power_to_utilization, next_arrival +from ..utils import power_to_utilization, next_arrival, DataLoaderResult def load_data(jobs_path, **kwargs): @@ -153,7 +153,11 @@ def load_data_from_df(jobs_df: pd.DataFrame, **kwargs): trace_quanta=trace_quanta) jobs.append(Job(job_info)) - return jobs, telemetry_start, telemetry_end + return DataLoaderResult( + jobs=jobs, + telemetry_start=telemetry_start, telemetry_end=telemetry_end, + start_date=telemetry_start_timestamp, + ) def node_index_to_name(index: int, config: dict): diff --git a/raps/dataloaders/lassen.py b/raps/dataloaders/lassen.py index c9aae0d..51292fc 100644 --- a/raps/dataloaders/lassen.py +++ b/raps/dataloaders/lassen.py @@ -38,7 +38,7 @@ from tqdm import tqdm from datetime import timedelta from ..job import job_dict, Job -from ..utils import power_to_utilization, next_arrival_byconfkwargs, parse_td +from ..utils import power_to_utilization, next_arrival_byconfkwargs, parse_td, DataLoaderResult def load_data(path, **kwargs): @@ -249,7 +249,11 @@ def load_data_from_df(allocation_df, node_df, step_df, **kwargs): job = Job(job_info) job_list.append(job) - return job_list, telemetry_start_time, telemetry_end_time + return DataLoaderResult( + jobs=job_list, + telemetry_start=telemetry_start_time, telemetry_end=telemetry_end_time, + start_date=telemetry_start_timestamp, + ) def get_scheduled_nodes(allocation_id, node_df): diff --git a/raps/dataloaders/marconi100.py b/raps/dataloaders/marconi100.py index fef8ec0..9a236f2 100644 --- a/raps/dataloaders/marconi100.py +++ b/raps/dataloaders/marconi100.py @@ -28,7 +28,7 @@ import pandas as pd from tqdm import tqdm from ..job import job_dict, Job -from ..utils import power_to_utilization, next_arrival_byconfkwargs +from ..utils import power_to_utilization, next_arrival_byconfkwargs, DataLoaderResult def load_data(jobs_path, **kwargs): @@ -233,7 +233,11 @@ def load_data_from_df(jobs_df: pd.DataFrame, **kwargs): job = Job(job_info) jobs.append(job) - return jobs, telemetry_start, telemetry_end + return DataLoaderResult( + jobs = jobs, + telemetry_start=telemetry_start, telemetry_end=telemetry_end, + start_date=telemetry_start_timestamp, + ) def node_index_to_name(index: int, config: dict): diff --git a/raps/dataloaders/mit_supercloud/loader.py b/raps/dataloaders/mit_supercloud/loader.py index 6057210..7c29f31 100644 --- a/raps/dataloaders/mit_supercloud/loader.py +++ b/raps/dataloaders/mit_supercloud/loader.py @@ -116,9 +116,9 @@ import re from tqdm import tqdm from typing import Dict, Union, Optional from collections import Counter - +from datetime import datetime, timezone from raps.job import job_dict, Job -from raps.utils import summarize_ranges +from raps.utils import summarize_ranges, DataLoaderResult from .utils import proc_cpu_series, proc_gpu_series, to_epoch from .utils import DEFAULT_START, DEFAULT_END @@ -602,20 +602,12 @@ def load_data(local_dataset_path, **kwargs): job = Job(current_job_dict) jobs_list.append(job) - # Calculate min_overall_utime and max_overall_utime - telemetry_start = int(sl.time_start.min()) - telemetry_end = int(sl.time_end.max()) - # min_overall_utime = int(sl.time_submit.min()) - # max_overall_utime = int(sl.time_submit.max()) - - # args_namespace = SimpleNamespace( - # fastforward=min_overall_utime, - # system='mit_supercloud', - # time=max_overall_utime - # ) - print("\nSkipped jobs summary:") for reason, count in skip_counts.items(): print(f"- {reason}: {count}") - return jobs_list, telemetry_start, telemetry_end # min_overall_utime, max_overall_utime, args_namespace + return DataLoaderResult( + jobs = jobs_list, + telemetry_start=0, telemetry_end=int(end_ts - start_ts), + start_date=datetime.fromtimestamp(start_ts, timezone.utc), + ) diff --git a/raps/utils.py b/raps/utils.py index fe7af8f..e0c9acf 100644 --- a/raps/utils.py +++ b/raps/utils.py @@ -21,7 +21,7 @@ import json import argparse from pathlib import Path from typing import Annotated as A, TypeVar, Callable, TypeAlias -from pydantic import BaseModel, TypeAdapter, AfterValidator +from pydantic import BaseModel, TypeAdapter, AfterValidator, ConfigDict, AwareDatetime from pydantic_settings import BaseSettings, SettingsConfigDict, CliApp, CliSettingsSource import yaml from raps.job import Job @@ -633,6 +633,9 @@ ExpandedPath = A[Path, AfterValidator(lambda v: Path(v).expanduser().resolve())] """ Type that that expands ~ and environment variables in a path string """ +SmartTimedelta = A[timedelta, AfterValidator(parse_td)] +""" Can be passed as ISO 8601 format like PT5M, or a string like 9s, or a number of seconds """ + T = TypeVar("T", bound=BaseModel) @@ -706,3 +709,95 @@ def yaml_dump(data): indent=2, allow_unicode=True, ) + + +class DataLoaderResult(BaseModel): + """ + Result of a dataloader load_data() function. + + jobs: + The list of parsed jobs. + + telemetry_start + the first timestep in which the simulation be executed. + + telemetry_end + the last timestep in which the simulation can be executed. + + start_date + The actual date that telemetry_start represents. + ---- + Explanation regarding times: + + The loaded dataframe contains + a first timestamp with associated data + and a last timestamp with associated data + + These form the maximum extent of the simuluation time. + telemetry_start and telemetry_end. + + [ ] + ^ ^ + telemetry_start telemetry_end + + These values form the maximum extent of the simulation. + telemetry_start is typically 0, but any int can be used as long as all the times in the + jobs are relative to the telemetry_start. + + Next is the actual extent of the simulation: + + [ ] + ^ ^ + simulation_start simulation_end + + The simulation will start at telemetry_start by default, but the user can specify an explicit + simulation start time. + + Additionally, jobs can have started before telemetry_start, + And can have a recorded ending after simulation_end, + [ ] + ^ ^ + first_start_timestamp last_end_timestamp + + This means that the time between first_start_timestamp and telemetry_start + has no associated values in the traces! + The missing values after simulation_end can be ignored, as the simulatuion + will have stoped before. + + However, the times before telemetry_start have to be padded to generate + correct offsets within their data! + Within the simulation a job's current time is specified as the difference + between its start_time and the current timestep of the simulation. + + With this each job's + - submit_time + - time_limit + - start_time # Maybe Null + - end_time # Maybe Null + - expected_run_time (end_time - start_time) # Maybe Null + - current_run_time (How long did the job run already, when loading) # Maybe zero + - trace_time (lenght of each trace in seconds) # Maybe Null + - trace_start_time (time offset in seconds after which the trace starts) # Maybe Null + - trace_end_time (time offset in seconds after which the trace ends) # Maybe Null + - trace_quanta (job's associated trace quanta, to correctly replay with different trace quanta) # Maybe Null + has to be set for use within the simulation + + The values trace_start_time are similar to the telemetry_start and + telemetry_stop but may different due to missing data, for each job. + + The returned values are these: + - The list of parsed jobs. (as a Job object) + - telemetry_start: int (in seconds) + - telemetry_end: int (in seconds) + - start_date: datetime + """ + jobs: list[Job] + telemetry_start: int + telemetry_end: int + # TODO: It might make more sense to make start_timestep/end_timestep always unix time, then we + # wouldn't need this extra start_date field. + start_date: AwareDatetime + + model_config = ConfigDict( + arbitrary_types_allowed = True, + ) -- GitLab From 485dd6ad4c6d890db7698afcc297375845606784 Mon Sep 17 00:00:00 2001 From: Jesse Hines Date: Thu, 4 Sep 2025 10:22:07 -0400 Subject: [PATCH 03/14] Refactor snapshots to save new dataloader result --- raps/engine.py | 9 +- raps/telemetry.py | 331 ++++++++++++++-------------------------------- raps/workload.py | 2 +- 3 files changed, 109 insertions(+), 233 deletions(-) diff --git a/raps/engine.py b/raps/engine.py index 64bf218..36e2559 100644 --- a/raps/engine.py +++ b/raps/engine.py @@ -266,8 +266,9 @@ class Engine: if sim_config.live and not sim_config.replay: td = Telemetry(**sim_config_dict) - jobs, timestep_start, timestep_end = \ - td.load_jobs_times_args_from_live_system() + result = td.load_from_live_system() + jobs = result.jobs + timestep_start, timestep_end = result.telemetry_start, result.telemetry_end elif sim_config.replay: # TODO: this will have issues if running separate systems or custom systems partition_short = partition.split("/")[-1] if partition else None @@ -286,10 +287,12 @@ class Engine: else: replay_files = sim_config.replay - jobs, timestep_start, timestep_end, args_from_file = td.load_jobs_times_args_from_files( + result = td.load_from_files( files=replay_files, args=sim_config_args, config=system_config_dict, ) + jobs = result.jobs + timestep_start, timestep_end = result.telemetry_start, result.telemetry_end else: # Synthetic jobs wl = Workload(sim_config_args, system_config_dict) jobs = wl.generate_jobs() diff --git a/raps/telemetry.py b/raps/telemetry.py index 3f883de..4bececc 100644 --- a/raps/telemetry.py +++ b/raps/telemetry.py @@ -13,14 +13,14 @@ from pathlib import Path # import json from typing import Optional from types import ModuleType - import importlib import numpy as np import pandas as pd from tqdm import tqdm -from pydantic import BaseModel +from pydantic import BaseModel, model_validator # from rich.progress import track +from raps.sim_config import SimConfig from raps.system_config import get_system_config from raps.job import Job, job_dict import matplotlib.pyplot as plt @@ -31,9 +31,48 @@ from raps.plotting import ( ) from raps.utils import ( next_arrival_byconfargs, convert_to_time_unit, pydantic_add_args, SubParsers, ExpandedPath, + DataLoaderResult, yaml_dump, ) +# TODO: should reuse this model in SimConfig +class TelemetryArgs(BaseModel): + jid: str = '*' + """ Replay job id """ + replay: list[ExpandedPath] | None = None + """ path/to/joblive path/to/jobprofile -or- filename.npz (overrides --workload option) """ + plot: list[Literal["jobs", "nodes"]] | None = None + """ Output plots """ + gantt_nodes: bool = False + """ Print Gannt with nodes required as line thickness (default false) """ + time: str | None = None + """ Length of time to simulate, e.g., 123, 123s, 27m, 3h, 7d """ + system: str = 'frontier' + """ System config to use """ + arrival: Literal['prescribed', 'poisson'] = "prescribed" + """ Modify arrival distribution ({choices[1]}) or use the original submit times """ + verbose: bool = False + output: str | None = None + """ Store output in --output file. """ + live: bool = False + """ Grab data from live system. """ + + @model_validator(mode="after") + def _validate_after(self): + if not self.live and not self.replay: + raise ValueError("Either --live or --replay is required") + return self + + +shortcuts = { + "replay": "f", + "plot": "p", + "time": "t", + "verbose": "v", + "output": "o", +} + + class Telemetry: """A class for handling telemetry data, including reading/parsing job data, and loading/saving snapshots.""" dataloader: Optional[ModuleType] @@ -49,16 +88,18 @@ class Telemetry: print(f"WARNING: Failed to load dataloader: {e}") self.dataloader = None - def save_snapshot(self, *, jobs: list, timestep_start: int, timestep_end: int, args: dict, filename: str): + def save_snapshot(self, *, dest: str, result: DataLoaderResult, args: SimConfig|TelemetryArgs): """Saves a snapshot of the jobs to a compressed file. """ - list_of_job_dicts = [] - for job in jobs: - list_of_job_dicts.append(job.__dict__) - np.savez_compressed(filename, jobs=list_of_job_dicts, timestep_start=timestep_start, - timestep_end=timestep_end, args=args) + np.savez_compressed(dest, + jobs=[vars(j) for j in result.jobs], + telemetry_start=result.telemetry_start, + telemetry_end=result.telemetry_end, + start_date=result.start_date, + args=args, + ) - def load_snapshot(self, snapshot: str, downscale=1) -> list: - """Reads a snapshot from a compressed file and return 4 values: joblist, timestep_start, timestep_end and args. + def load_snapshot(self, snapshot: str | Path) -> tuple[DataLoaderResult, SimConfig|TelemetryArgs]: + """Reads a snapshot from a compressed file :param str snapshot: Filename :returns: @@ -68,75 +109,19 @@ class Telemetry: - args, which were used to generate the loaded snapshot """ data = np.load(snapshot, allow_pickle=True, mmap_mode='r') - jobs = [] - list_of_job_dicts = data['jobs'].tolist() - for job_info in list_of_job_dicts: - jobs.append(Job(job_info)) - if 'timestep_start' in data: - timestep_start = int(data['timestep_start']) - else: - timestep_start = 0 - if 'timestep_end' in data: - timestep_end = int(data['timestep_end']) - else: - timestep_end = np.inf - raise ValueError("Invalid timestep_end in snapshot") - if 'args' in data: - args_from_file = data['args'].tolist() - else: - args_from_file = None - - return jobs, \ - timestep_start, \ - timestep_end, \ - args_from_file - - def load_csv_results(self, file): - jobs = [] - time_start = 0 - time_end = 0 - for line in pd.read_csv(file, chunksize=1): - job_info = job_dict(nodes_required=line.get('num_nodes').item(), - name=line.get('name').item(), - account=line.get('account').item(), - current_state=line.get('current_state').item(), - end_state=line.get('end_state').item(), - scheduled_nodes=line.get('scheduled_nodes').item(), - id=line.get('id').item(), - priority=line.get('priority').item(), - partition=line.get('partition').item(), - cpu_cores_required=line.get('cpu_cores_required').item(), - gpu_units_required=line.get('gpu_units_required').item(), - allocated_cpu_cores=line.get('allocated_cpu_cores').item(), - allocated_gpu_units=line.get('allocated_gpu_units').item(), - - cpu_trace=line.get('cpu_trace'), - gpu_trace=line.get('cpu_trace'), - ntx_trace=line.get('cpu_trace'), - nrx_trace=line.get('cpu_trace'), - submit_time=line.get('submit_time').item(), - time_limit=line.get('time_limit').item(), - start_time=line.get('start_time').item(), - end_time=line.get('end_time').item(), - expected_run_time=line.get('expected_run_time').item(), - current_run_time=line.get('current_run_time').item(), - trace_time=line.get('trace_time'), - # trace_start_time=line.get('trace_start_time').item(), - trace_start_time=line.get('trace_start_time'), - # trace_end_time=line.get('trace_end_time').item(), - trace_end_time=line.get('trace_end_time'), - trace_quanta=line.get('trace_quanta').item(), - trace_missing_values=line.get('trace_missing_values'), - downscale=line.get('downscale'), - ) - job = Job(job_info) - jobs.append(job) - # if hasattr(data,'args'): - # args_from_file = data["args"].item() # This should be empty as csv contains no args. - # else: - # args_from_file = None - - return jobs, time_start, time_end, None + jobs = [Job(j) for j in data['jobs']] + telemetry_start = data['telemetry_start'].item() + telemetry_end = data['telemetry_end'].item() + start_date = data['start_date'].item() + args = data['args'].item() + + result = DataLoaderResult( + jobs=jobs, + telemetry_start=telemetry_start, telemetry_end=telemetry_end, + start_date=start_date, + ) + + return result, args def load_data(self, files): """Load telemetry data using custom data loaders.""" @@ -148,43 +133,6 @@ class Telemetry: assert self.dataloader return self.dataloader.load_live_data(**self.kwargs) - def load_data_from_df(self, *args, **kwargs): - """Load telemetry data using custom data loaders.""" - assert self.dataloader - return self.dataloader.load_data_from_df(*args, **kwargs) - - def load_data_from_csv(self, file, *args, **kwargs): - jobs = [] - df = pd.read_csv(file, chunksize=1, header='infer') - for d in df: - # print(d['name'].astype(str)) - job_info = job_dict(nodes_required=None, - name=d['name'].astype(str).item(), - account=d['account'].astype(str).item(), - cpu_trace=None, - gpu_trace=None, - ntx_trace=None, - nrx_trace=None, - end_state=d['state'].astype(str).item(), - scheduled_nodes=d['scheduled_nodes'].item(), - id=d['id'].astype(int).item(), - priority=None, - partition=None, - submit_time=d['submit_time'].astype(int).item(), - time_limit=None, - start_time=d['start_time'].astype(int).item(), - end_time=d['end_time'].astype(int).item(), - wall_time=d['end_time'].astype(int).item() - d['start_time'].astype(int).item(), - trace_time=None, - trace_start_time=None, - trace_end_time=None, - trace_missing_values=None - ) - jobs.append(job_info) - minstarttime = min([x['start_time'] for x in jobs]) - maxendtime = max([x['end_time'] for x in jobs]) - return jobs, minstarttime, maxendtime, None - def node_index_to_name(self, index: int): """ Convert node index into a name""" assert self.dataloader @@ -200,105 +148,39 @@ class Telemetry: assert self.dataloader return self.dataloader.cdu_pos(index, config=self.config) - def load_jobs_times_args_from_live_system(self): - jobs, timestep_start, timestep_end = self.load_live_data() - # data_args = None - return jobs, timestep_start, timestep_end + def load_from_live_system(self): + result = self.load_live_data() + return result - def load_jobs_times_args_from_files(self, *, files, args, config, downscale=1): + def load_from_files(self, *, files, args, config): """ Load all files as combined jobs """ - # Read telemetry data (either npz file or via custom data loader) - # TODO: Merge args? See main.py:79 - timestep_end = 0 - timestep_start = sys.maxsize - jobs = [] - trigger_custom_dataloader = False - for i, file in enumerate(files): - file = str(Path(file)) - if hasattr(args, 'is_results_file') and args.is_results_file: - if file.endswith(".csv"): - jobs, timestep_start, timestep, _ = self.load_csv_results(file) - - elif file.endswith(".npz"): # Replay .npz file - print(f"Loading {file}...") - jobs_from_file, timestep_start_from_file, timestep_end_from_file, args_from_file = self.load_snapshot( - file) - if args_from_file is not None: - print(f"File was generated with:" - f"\n--system {args_from_file.system} ") - if hasattr(args_from_file, 'fastforward'): - print(f"--ff {args_from_file.fastforward} ") - if hasattr(args_from_file, 'time'): - print(f"-t {args_from_file.time}") - print(f"All Args:\n{args_from_file}" - "\nTo use these set them from the commandline!") - else: - print("No generation arguments extracted from input file!") - # Args are usually extracted to tell the users how to reporduce results. - # They are not processed and re-set to said arguments automatily - jobs.extend(jobs_from_file) - timestep_start = min(timestep_start, timestep_start_from_file) - timestep_end = max(timestep_end, timestep_end_from_file) - - if hasattr(args, 'scale') and args.scale: - for job in tqdm(jobs, desc=f"Scaling jobs to {args.scale} nodes"): - job['nodes_required'] = random.randint(1, args.scale) - job['scheduled_nodes'] = None # Setting to None triggers scheduler to assign nodes - - if hasattr(args, 'arrival') and args.arrival == 'poisson': - print("available nodes:", config['AVAILABLE_NODES']) - for job in tqdm(jobs, desc="Rescheduling jobs"): - job['scheduled_nodes'] = None - job['submit_time'] = next_arrival_byconfargs(config, args) - else: - trigger_custom_dataloader = True - break - - if trigger_custom_dataloader: # custom data loader - try: - jobs, timestep_start_from_data, timestep_end_from_data = self.load_data(args.replay) - except AssertionError: - raise ValueError("Forgot --is-results-file ?") - timestep_start = min(timestep_start, timestep_start_from_data) - timestep_end = max(timestep_end, timestep_end_from_data) + assert len(files) >= 1 + files = [Path(f) for f in files] + + if str(files[0]).endswith(".npz"): + file = files[0] + print(f"Loading {file}") + result, args = self.load_snapshot(file) + print(f"File was generated with: --system {args.system}") + + # TODO: should move this logic into a separate method and out of the individual dataloaders + if hasattr(args, 'scale') and args.scale: + for job in tqdm(result.jobs, desc=f"Scaling jobs to {args.scale} nodes"): + job.nodes_required = random.randint(1, args.scale) + job.scheduled_nodes = None # Setting to None triggers scheduler to assign nodes + + if hasattr(args, 'arrival') and args.arrival == 'poisson': + print("available nodes:", config['AVAILABLE_NODES']) + for job in tqdm(result.jobs, desc="Rescheduling jobs"): + job.scheduled_nodes = None + job.submit_time = next_arrival_byconfargs(config, args) + job.start_time = None + job.end_time = None + else: # custom data loader + result = self.load_data(args.replay) if args.time: - timestep_end = timestep_start + convert_to_time_unit(args.time) - elif not timestep_end: - timestep_end = int(max(job.wall_time + job.start_time for job in jobs)) + 1 - - return jobs, timestep_start, timestep_end, args - - -class TelemetryArgs(BaseModel): - jid: str = '*' - """ Replay job id """ - replay: list[ExpandedPath] | None = None - """ path/to/joblive path/to/jobprofile -or- filename.npz (overrides --workload option) """ - plot: list[Literal["jobs", "nodes"]] | None = None - """ Output plots """ - is_results_file: bool = False - gantt_nodes: bool = False - """ Print Gannt with nodes required as line thickness (default false) """ - time: str | None = None - """ Length of time to simulate, e.g., 123, 123s, 27m, 3h, 7d """ - system: str = 'frontier' - """ System config to use """ - arrival: Literal['prescribed', 'poisson'] = "prescribed" - """ Modify arrival distribution ({choices[1]}) or use the original submit times """ - verbose: bool = False - output: str | None = None - """ Store output in --output file. """ - live: bool = False - """ Grab data from live system. """ - - -shortcuts = { - "replay": "f", - "plot": "p", - "time": "t", - "verbose": "v", - "output": "o", -} + result.telemetry_end = result.telemetry_start + convert_to_time_unit(args.time) + return result def run_telemetry_add_parser(subparsers: SubParsers): @@ -318,24 +200,15 @@ def run_telemetry(args: TelemetryArgs): td = Telemetry(**args_dict) if args.live and not args.replay: - td = Telemetry(**args_dict) - jobs, timestep_start, timestep_end = \ - td.load_jobs_times_args_from_live_system() - if args.output: - td.save_snapshot( - jobs=jobs, timestep_start=timestep_start, - timestep_end=timestep_end, args=args, filename=args.output, - ) - - elif args.replay: - jobs, timestep_start, timestep_end, _ = \ - td.load_jobs_times_args_from_files(files=args.replay, - args=args, - config=config) - + result = td.load_from_live_system() else: - print("Either --live or --replay is required") - sys.exit(1) + result = td.load_from_files(files=args.replay, args=args, config=config) + jobs = result.jobs + timestep_start = result.telemetry_start + timestep_end = result.telemetry_end + + if args.output: + td.save_snapshot(dest = args.output, result = result, args = args) timesteps = timestep_end - timestep_start diff --git a/raps/workload.py b/raps/workload.py index f8ac77b..f5b2e55 100644 --- a/raps/workload.py +++ b/raps/workload.py @@ -972,7 +972,7 @@ def run_workload(sim_config: SimConfig): if sim_config.replay: td = Telemetry(**args_dict) - jobs, _, _, _ = td.load_jobs_times_args_from_files(files=sim_config.replay, args=args, config=config) + jobs = td.load_from_files(files=sim_config.replay, args=args, config=config).jobs else: workload = Workload(args, config) jobs = getattr(workload, sim_config.workload)(args=sim_config.get_legacy_args) -- GitLab From 873ac16cca103a4e24bcfd3b0176a0200a091de3 Mon Sep 17 00:00:00 2001 From: Jesse Hines Date: Thu, 4 Sep 2025 10:48:54 -0400 Subject: [PATCH 04/14] Use same format in Workload --- raps/dataloaders/adastraMI250.py | 4 ++-- raps/dataloaders/bluewaters.py | 4 ++-- raps/dataloaders/frontier.py | 6 +++--- raps/dataloaders/fugaku.py | 4 ++-- raps/dataloaders/gcloudv2.py | 4 ++-- raps/dataloaders/kestrel.py | 4 ++-- raps/dataloaders/lassen.py | 4 ++-- raps/dataloaders/marconi100.py | 4 ++-- raps/dataloaders/mit_supercloud/loader.py | 4 ++-- raps/telemetry.py | 8 ++++---- raps/utils.py | 5 +++-- raps/workload.py | 11 ++++++++--- 12 files changed, 34 insertions(+), 28 deletions(-) diff --git a/raps/dataloaders/adastraMI250.py b/raps/dataloaders/adastraMI250.py index 9cb53d5..60ad8d1 100644 --- a/raps/dataloaders/adastraMI250.py +++ b/raps/dataloaders/adastraMI250.py @@ -24,7 +24,7 @@ import pandas as pd from tqdm import tqdm from ..job import job_dict, Job -from ..utils import next_arrival_byconfkwargs, DataLoaderResult +from ..utils import next_arrival_byconfkwargs, WorkloadResult def load_data(jobs_path, **kwargs): @@ -205,7 +205,7 @@ def load_data_from_df(jobs_df: pd.DataFrame, **kwargs): count_jobs_notOK += 1 print("jobs not added: ", count_jobs_notOK) - return DataLoaderResult( + return WorkloadResult( jobs = jobs, telemetry_start=telemetry_start_time, telemetry_end=telemetry_end_time, start_date=telemetry_start_timestamp, diff --git a/raps/dataloaders/bluewaters.py b/raps/dataloaders/bluewaters.py index b7f1c10..c2d328f 100644 --- a/raps/dataloaders/bluewaters.py +++ b/raps/dataloaders/bluewaters.py @@ -34,7 +34,7 @@ import pandas as pd from pathlib import Path from datetime import datetime, timezone from raps.telemetry import Job, job_dict -from raps.utils import DataLoaderResult +from raps.utils import WorkloadResult def throughput_traces(total_tx, total_rx, intervals): @@ -330,7 +330,7 @@ def load_data(local_dataset_path, **kwargs): telemetry_start = 0 telemetry_end = max((j.end_time for j in jobs), default=0) - return DataLoaderResult( + return WorkloadResult( jobs=jobs, telemetry_start=telemetry_start, telemetry_end=telemetry_end, start_date=datetime.fromtimestamp(t0, timezone.utc), diff --git a/raps/dataloaders/frontier.py b/raps/dataloaders/frontier.py index 15621d1..d1968a6 100644 --- a/raps/dataloaders/frontier.py +++ b/raps/dataloaders/frontier.py @@ -16,7 +16,7 @@ import pandas as pd from tqdm import tqdm from ..job import job_dict, Job -from ..utils import power_to_utilization, next_arrival_byconfkwargs, encrypt, DataLoaderResult +from ..utils import power_to_utilization, next_arrival_byconfkwargs, encrypt, WorkloadResult def aging_boost(nnodes): @@ -326,7 +326,7 @@ def load_data_from_df(jobs_df: pd.DataFrame, jobprofile_df: pd.DataFrame, **kwar job = Job(job_info) jobs.append(job) - return DataLoaderResult( + return WorkloadResult( jobs = jobs, telemetry_start = telemetry_start, telemetry_end = telemetry_end, @@ -543,7 +543,7 @@ def load_live_data(**kwargs): job = Job(job_info) jobs.append(job) - return DataLoaderResult( + return WorkloadResult( jobs = jobs, telemetry_start = telemetry_start, telemetry_end = telemetry_end, diff --git a/raps/dataloaders/fugaku.py b/raps/dataloaders/fugaku.py index 0dd2c3b..703d28e 100644 --- a/raps/dataloaders/fugaku.py +++ b/raps/dataloaders/fugaku.py @@ -17,7 +17,7 @@ import pandas as pd from tqdm import tqdm from ..job import job_dict, Job -from ..utils import DataLoaderResult +from ..utils import WorkloadResult def load_data(path, **kwargs): @@ -168,7 +168,7 @@ def load_data_from_df(df, **kwargs): job = Job(job_info) job_list.append(job) - return DataLoaderResult( + return WorkloadResult( jobs=job_list, telemetry_start = telemetry_start, telemetry_end = telemetry_end, start_date = telemetry_start_timestamp, diff --git a/raps/dataloaders/gcloudv2.py b/raps/dataloaders/gcloudv2.py index 73f918b..60426cc 100644 --- a/raps/dataloaders/gcloudv2.py +++ b/raps/dataloaders/gcloudv2.py @@ -8,7 +8,7 @@ import numpy as np import pandas as pd from raps.job import job_dict, Job -from raps.utils import DataLoaderResult +from raps.utils import WorkloadResult """ Official instructions are here: @@ -334,7 +334,7 @@ def load_data(data_path: Union[str, List[str]], **kwargs: Any): # Compute simulation span: start at t=0, end at the latest job finish telemetry_start = 0 telemetry_end = int(max(usage_map_end.values()) - t0) - return DataLoaderResult( + return WorkloadResult( jobs = jobs, telemetry_start=telemetry_start, telemetry_end=telemetry_end, # gcloud dataset timestamps are already relative, and it doesn't list a start exact date. diff --git a/raps/dataloaders/kestrel.py b/raps/dataloaders/kestrel.py index c82b957..04adaa8 100644 --- a/raps/dataloaders/kestrel.py +++ b/raps/dataloaders/kestrel.py @@ -6,7 +6,7 @@ import pandas as pd from tqdm import tqdm from ..job import job_dict, Job -from ..utils import power_to_utilization, next_arrival, DataLoaderResult +from ..utils import power_to_utilization, next_arrival, WorkloadResult def load_data(jobs_path, **kwargs): @@ -153,7 +153,7 @@ def load_data_from_df(jobs_df: pd.DataFrame, **kwargs): trace_quanta=trace_quanta) jobs.append(Job(job_info)) - return DataLoaderResult( + return WorkloadResult( jobs=jobs, telemetry_start=telemetry_start, telemetry_end=telemetry_end, start_date=telemetry_start_timestamp, diff --git a/raps/dataloaders/lassen.py b/raps/dataloaders/lassen.py index 51292fc..88b36e7 100644 --- a/raps/dataloaders/lassen.py +++ b/raps/dataloaders/lassen.py @@ -38,7 +38,7 @@ from tqdm import tqdm from datetime import timedelta from ..job import job_dict, Job -from ..utils import power_to_utilization, next_arrival_byconfkwargs, parse_td, DataLoaderResult +from ..utils import power_to_utilization, next_arrival_byconfkwargs, parse_td, WorkloadResult def load_data(path, **kwargs): @@ -249,7 +249,7 @@ def load_data_from_df(allocation_df, node_df, step_df, **kwargs): job = Job(job_info) job_list.append(job) - return DataLoaderResult( + return WorkloadResult( jobs=job_list, telemetry_start=telemetry_start_time, telemetry_end=telemetry_end_time, start_date=telemetry_start_timestamp, diff --git a/raps/dataloaders/marconi100.py b/raps/dataloaders/marconi100.py index 9a236f2..c4648b3 100644 --- a/raps/dataloaders/marconi100.py +++ b/raps/dataloaders/marconi100.py @@ -28,7 +28,7 @@ import pandas as pd from tqdm import tqdm from ..job import job_dict, Job -from ..utils import power_to_utilization, next_arrival_byconfkwargs, DataLoaderResult +from ..utils import power_to_utilization, next_arrival_byconfkwargs, WorkloadResult def load_data(jobs_path, **kwargs): @@ -233,7 +233,7 @@ def load_data_from_df(jobs_df: pd.DataFrame, **kwargs): job = Job(job_info) jobs.append(job) - return DataLoaderResult( + return WorkloadResult( jobs = jobs, telemetry_start=telemetry_start, telemetry_end=telemetry_end, start_date=telemetry_start_timestamp, diff --git a/raps/dataloaders/mit_supercloud/loader.py b/raps/dataloaders/mit_supercloud/loader.py index 7c29f31..282f0b5 100644 --- a/raps/dataloaders/mit_supercloud/loader.py +++ b/raps/dataloaders/mit_supercloud/loader.py @@ -118,7 +118,7 @@ from typing import Dict, Union, Optional from collections import Counter from datetime import datetime, timezone from raps.job import job_dict, Job -from raps.utils import summarize_ranges, DataLoaderResult +from raps.utils import summarize_ranges, WorkloadResult from .utils import proc_cpu_series, proc_gpu_series, to_epoch from .utils import DEFAULT_START, DEFAULT_END @@ -606,7 +606,7 @@ def load_data(local_dataset_path, **kwargs): for reason, count in skip_counts.items(): print(f"- {reason}: {count}") - return DataLoaderResult( + return WorkloadResult( jobs = jobs_list, telemetry_start=0, telemetry_end=int(end_ts - start_ts), start_date=datetime.fromtimestamp(start_ts, timezone.utc), diff --git a/raps/telemetry.py b/raps/telemetry.py index 4bececc..f55d1bc 100644 --- a/raps/telemetry.py +++ b/raps/telemetry.py @@ -31,7 +31,7 @@ from raps.plotting import ( ) from raps.utils import ( next_arrival_byconfargs, convert_to_time_unit, pydantic_add_args, SubParsers, ExpandedPath, - DataLoaderResult, yaml_dump, + WorkloadResult, ) @@ -88,7 +88,7 @@ class Telemetry: print(f"WARNING: Failed to load dataloader: {e}") self.dataloader = None - def save_snapshot(self, *, dest: str, result: DataLoaderResult, args: SimConfig|TelemetryArgs): + def save_snapshot(self, *, dest: str, result: WorkloadResult, args: SimConfig|TelemetryArgs): """Saves a snapshot of the jobs to a compressed file. """ np.savez_compressed(dest, jobs=[vars(j) for j in result.jobs], @@ -98,7 +98,7 @@ class Telemetry: args=args, ) - def load_snapshot(self, snapshot: str | Path) -> tuple[DataLoaderResult, SimConfig|TelemetryArgs]: + def load_snapshot(self, snapshot: str | Path) -> tuple[WorkloadResult, SimConfig|TelemetryArgs]: """Reads a snapshot from a compressed file :param str snapshot: Filename @@ -115,7 +115,7 @@ class Telemetry: start_date = data['start_date'].item() args = data['args'].item() - result = DataLoaderResult( + result = WorkloadResult( jobs=jobs, telemetry_start=telemetry_start, telemetry_end=telemetry_end, start_date=start_date, diff --git a/raps/utils.py b/raps/utils.py index e0c9acf..f67c533 100644 --- a/raps/utils.py +++ b/raps/utils.py @@ -711,9 +711,10 @@ def yaml_dump(data): ) -class DataLoaderResult(BaseModel): +class WorkloadResult(BaseModel): """ - Result of a dataloader load_data() function. + Represents a workload, a list of jobs with some metadata. Returned by dataloaders load_data() + function, and by Workload.generate_jobs(). jobs: The list of parsed jobs. diff --git a/raps/workload.py b/raps/workload.py index f5b2e55..48c7af3 100644 --- a/raps/workload.py +++ b/raps/workload.py @@ -30,7 +30,8 @@ from raps.utils import ( determine_state, next_arrival, next_arrival_byconfargs, truncated_weibull, - truncated_weibull_float + truncated_weibull_float, + WorkloadResult, ) import math import random @@ -66,7 +67,11 @@ class Workload: # This function calls the job generation function as specified by the workload keyword. # The respective funciton of this class is called. jobs = getattr(self, self.args.workload)(args=self.args) - return jobs + return WorkloadResult( + jobs = jobs, + telemetry_start=0, telemetry_end=self.args.time, + start_date=self.args.start, + ) def compute_traces(self, cpu_util: float, @@ -994,5 +999,5 @@ def continuous_job_generation(*, engine, timestep, jobs): # print("if len(engine.queue) <= engine.continuous_workload.args.maxqueue:") # print(f"if {len(engine.queue)} <= {engine.continuous_workload.args.maxqueue}:") if len(engine.queue) <= engine.continuous_workload.args.maxqueue: - new_jobs = engine.continuous_workload.generate_jobs() + new_jobs = engine.continuous_workload.generate_jobs().jobs jobs.extend(new_jobs) -- GitLab From 9ae197cce44c504423a852aa94acb4c2b7e5b0a9 Mon Sep 17 00:00:00 2001 From: Jesse Hines Date: Thu, 4 Sep 2025 11:07:39 -0400 Subject: [PATCH 05/14] More fixes --- raps/dataloaders/adastraMI250.py | 2 +- raps/engine.py | 24 ++++++++---------------- raps/multi_part_engine.py | 15 ++++++++------- raps/run_sim.py | 22 +++++++++++----------- raps/sim_config.py | 4 ++-- raps/telemetry.py | 4 ++-- tests/systems/test_engine.py | 5 ++++- 7 files changed, 36 insertions(+), 40 deletions(-) diff --git a/raps/dataloaders/adastraMI250.py b/raps/dataloaders/adastraMI250.py index 60ad8d1..7522097 100644 --- a/raps/dataloaders/adastraMI250.py +++ b/raps/dataloaders/adastraMI250.py @@ -208,7 +208,7 @@ def load_data_from_df(jobs_df: pd.DataFrame, **kwargs): return WorkloadResult( jobs = jobs, telemetry_start=telemetry_start_time, telemetry_end=telemetry_end_time, - start_date=telemetry_start_timestamp, + start_date=telemetry_start_timestamp.tz_localize("UTC"), ) diff --git a/raps/engine.py b/raps/engine.py index 36e2559..f371b3e 100644 --- a/raps/engine.py +++ b/raps/engine.py @@ -10,12 +10,11 @@ import os import select import time import random -import math from raps.job import Job, JobState from raps.policy import PolicyType from raps.utils import ( summarize_ranges, - get_current_utilization + get_current_utilization, ) from raps.resmgr import ResourceManager from raps.schedulers import load_scheduler @@ -266,9 +265,7 @@ class Engine: if sim_config.live and not sim_config.replay: td = Telemetry(**sim_config_dict) - result = td.load_from_live_system() - jobs = result.jobs - timestep_start, timestep_end = result.telemetry_start, result.telemetry_end + workload_result = td.load_from_live_system() elif sim_config.replay: # TODO: this will have issues if running separate systems or custom systems partition_short = partition.split("/")[-1] if partition else None @@ -287,22 +284,17 @@ class Engine: else: replay_files = sim_config.replay - result = td.load_from_files( + workload_result = td.load_from_files( files=replay_files, args=sim_config_args, config=system_config_dict, ) - jobs = result.jobs - timestep_start, timestep_end = result.telemetry_start, result.telemetry_end else: # Synthetic jobs wl = Workload(sim_config_args, system_config_dict) - jobs = wl.generate_jobs() - timestep_start = 0 - if hasattr(jobs[0], 'end_time'): - timestep_end = int(math.ceil(max([job.end_time for job in jobs]))) - else: - timestep_end = 88200 # 24 hours - + workload_result = wl.generate_jobs() td = Telemetry(**sim_config_dict) + + jobs = workload_result.jobs + timestep_start, timestep_end = workload_result.telemetry_start, workload_result.telemetry_end # TODO refactor how stat/end/fastforward/time work if sim_config.fastforward is not None: @@ -342,7 +334,7 @@ class Engine: system_config=system_config, ) - return engine, jobs, timestep_start, timestep_end, time_delta + return engine, workload_result, time_delta def add_running_jobs_to_queue(self, jobs_to_submit: List): """ diff --git a/raps/multi_part_engine.py b/raps/multi_part_engine.py index 461425b..bebf47b 100644 --- a/raps/multi_part_engine.py +++ b/raps/multi_part_engine.py @@ -1,6 +1,7 @@ from collections.abc import Iterable from raps.engine import Engine, TickData from raps.sim_config import SimConfig +from raps.utils import WorkloadResult class MultiPartEngine: @@ -17,29 +18,29 @@ class MultiPartEngine: if len(root_systems) > 1: raise ValueError("Replay for multi-system runs is not supported") - jobs_by_partition = {} + workloads_by_partition: dict[str, WorkloadResult] = {} engines: dict[str, Engine] = {} timestep_start, timestep_end, time_delta = 0, 0, 0 for partition in sim_config.system_configs: name = partition.system_name - engine, jobs, timestep_start, timestep_end, time_delta = Engine.from_sim_config( + engine, workload_result, time_delta = Engine.from_sim_config( sim_config, partition=name, ) - for job in jobs: + for job in workload_result.jobs: job.partition = name - jobs_by_partition[name] = jobs + workloads_by_partition[name] = workload_result engines[name] = engine - total_initial_jobs = sum(len(j) for j in jobs_by_partition.values()) + total_initial_jobs = sum(len(j.jobs) for j in workloads_by_partition.values()) for engine in engines.values(): engine.total_initial_jobs = total_initial_jobs multi_engine = MultiPartEngine( engines=engines, - jobs=jobs_by_partition, + jobs={p: w.jobs for p, w in workloads_by_partition.items()}, ) - return multi_engine, jobs_by_partition, timestep_start, timestep_end, time_delta + return multi_engine, workloads_by_partition, timestep_start, timestep_end, time_delta def run_simulation(self, jobs: dict, timestep_start, timestep_end, time_delta=1 ) -> Iterable[dict[str, TickData | None]]: diff --git a/raps/run_sim.py b/raps/run_sim.py index 7587dbb..b41eb80 100644 --- a/raps/run_sim.py +++ b/raps/run_sim.py @@ -76,18 +76,18 @@ def run_sim(sim_config: SimConfig): print("Use run-multi-part to run multi-partition simulations") sys.exit(1) - engine, jobs, timestep_start, timestep_end, time_delta = Engine.from_sim_config(sim_config) + engine, workload_result, time_delta = Engine.from_sim_config(sim_config) out = sim_config.output if out: out.mkdir(parents=True) engine.telemetry.save_snapshot( - jobs=jobs, - timestep_start=timestep_start, - timestep_end=timestep_end, - args=sim_config.get_legacy_args(), filename=str(out), + dest = str(out), + result = workload_result, + args=sim_config, ) - + jobs = workload_result.jobs + timestep_start, timestep_end = workload_result.telemetry_start, workload_result.telemetry_end total_timesteps = timestep_end - timestep_start downscale = sim_config.downscale @@ -242,7 +242,7 @@ def run_multi_part_sim_add_parser(subparsers: SubParsers): def run_multi_part_sim(sim_config: SimConfig): - multi_engine, jobs, timestep_start, timestep_end, time_delta = MultiPartEngine.from_sim_config(sim_config) + multi_engine, workload_results, timestep_start, timestep_end, time_delta = MultiPartEngine.from_sim_config(sim_config) # TODO: The mit_supercloud dataloader seems to be outputting the wrong timesteps? mit_supercloud # is the only multi-partition system with replay, so just manually overriding the timesteps here @@ -253,11 +253,11 @@ def run_multi_part_sim(sim_config: SimConfig): if sim_config.output: for part, engine in multi_engine.engines.items(): engine.telemetry.save_snapshot( - jobs=jobs[part], - timestep_start=timestep_start, timestep_end=timestep_end, - filename=part.split('/')[-1], - args=sim_config.get_legacy_args(), + dest=str(sim_config.output / part.split('/')[-1]), + result=workload_results[part], + args=sim_config, ) + jobs = {p: w.jobs for p, w in workload_results.items()} ui_update_freq = sim_config.system_configs[0].scheduler.ui_update_freq gen = multi_engine.run_simulation(jobs, timestep_start, timestep_end, time_delta) diff --git a/raps/sim_config.py b/raps/sim_config.py index 26a328a..92a5f5f 100644 --- a/raps/sim_config.py +++ b/raps/sim_config.py @@ -48,9 +48,9 @@ class SimConfig(BaseModel): def downscale(self) -> int: return int(timedelta(seconds=1) / self.time_unit) - start: str = "2021-05-21T13:00" + start: str = "2021-05-21T13:00:00-04:00" """ ISO8601 start of simulation """ - end: str = "2021-05-21T14:00" + end: str = "2021-05-21T14:00:00-04:00" """ ISO8601 end of simulation """ numjobs: int = 100 diff --git a/raps/telemetry.py b/raps/telemetry.py index f55d1bc..83b09a2 100644 --- a/raps/telemetry.py +++ b/raps/telemetry.py @@ -160,8 +160,8 @@ class Telemetry: if str(files[0]).endswith(".npz"): file = files[0] print(f"Loading {file}") - result, args = self.load_snapshot(file) - print(f"File was generated with: --system {args.system}") + result, args_from_file = self.load_snapshot(file) + print(f"File was generated with: --system {args_from_file.system}") # TODO: should move this logic into a separate method and out of the individual dataloaders if hasattr(args, 'scale') and args.scale: diff --git a/tests/systems/test_engine.py b/tests/systems/test_engine.py index ce40878..4d57752 100644 --- a/tests/systems/test_engine.py +++ b/tests/systems/test_engine.py @@ -22,7 +22,10 @@ def test_engine(system, system_config, sim_output): "system": system, "time": "2m", }) - engine, jobs, timestep_start, timestep_end, time_delta = Engine.from_sim_config(sim_config) + engine, workload_result, time_delta = Engine.from_sim_config(sim_config) + jobs = workload_result.jobs + timestep_start = workload_result.telemetry_start + timestep_end = workload_result.telemetry_end ticks = list(engine.run_simulation(jobs, timestep_start, timestep_end, time_delta)) assert len(ticks) == 120 -- GitLab From 445b49360138746a86f99e368223f557c5095f18 Mon Sep 17 00:00:00 2001 From: Jesse Hines Date: Thu, 4 Sep 2025 12:47:49 -0400 Subject: [PATCH 06/14] Move arrival time logic into one place --- raps/dataloaders/adastraMI250.py | 14 ++++-------- raps/dataloaders/frontier.py | 20 +++++----------- raps/dataloaders/fugaku.py | 7 ------ raps/dataloaders/lassen.py | 14 ++++-------- raps/dataloaders/marconi100.py | 16 ++++--------- raps/engine.py | 10 +++----- raps/telemetry.py | 39 ++++++++++++++++---------------- raps/workload.py | 2 +- 8 files changed, 43 insertions(+), 79 deletions(-) diff --git a/raps/dataloaders/adastraMI250.py b/raps/dataloaders/adastraMI250.py index 7522097..4d96b02 100644 --- a/raps/dataloaders/adastraMI250.py +++ b/raps/dataloaders/adastraMI250.py @@ -146,15 +146,11 @@ def load_data_from_df(jobs_df: pd.DataFrame, **kwargs): priority = int(jobs_df.loc[jidx, 'priority']) - if arrival == 'poisson': # Modify the arrival times of the jobs according to Poisson distribution - scheduled_nodes = None - submit_time = next_arrival_byconfkwargs(config, kwargs) - else: # Prescribed replay - scheduled_nodes = (jobs_df.loc[jidx, 'nodes']).tolist() - - submit_timestamp = jobs_df.loc[jidx, 'submit_time'] - diff = submit_timestamp - telemetry_start_timestamp - submit_time = int(diff.total_seconds()) + scheduled_nodes = (jobs_df.loc[jidx, 'nodes']).tolist() + + submit_timestamp = jobs_df.loc[jidx, 'submit_time'] + diff = submit_timestamp - telemetry_start_timestamp + submit_time = int(diff.total_seconds()) time_limit = jobs_df.loc[jidx, 'time_limit'] # in seconds diff --git a/raps/dataloaders/frontier.py b/raps/dataloaders/frontier.py index d1968a6..de84770 100644 --- a/raps/dataloaders/frontier.py +++ b/raps/dataloaders/frontier.py @@ -267,20 +267,12 @@ def load_data_from_df(jobs_df: pd.DataFrame, jobprofile_df: pd.DataFrame, **kwar if '' in xnames: continue - if arrival == 'poisson': # Modify the arrival times of the jobs according to Poisson distribution - scheduled_nodes = None - submit_time = next_arrival_byconfkwargs(config, kwargs) - start_time = None # ? - end_time = None # ? - priority = aging_boost(nodes_required) - - else: # Prescribed replay - scheduled_nodes = [] - # priority = 0 # not used for replay - priority = aging_boost(nodes_required) - for xname in xnames: - indices = xname_to_index(xname, config) - scheduled_nodes.append(indices) + scheduled_nodes = [] + # priority = 0 # not used for replay + priority = aging_boost(nodes_required) + for xname in xnames: + indices = xname_to_index(xname, config) + scheduled_nodes.append(indices) # Throw out jobs that are not valid! if gpu_trace.size == 0: diff --git a/raps/dataloaders/fugaku.py b/raps/dataloaders/fugaku.py index 703d28e..1515915 100644 --- a/raps/dataloaders/fugaku.py +++ b/raps/dataloaders/fugaku.py @@ -135,13 +135,6 @@ def load_data_from_df(df, **kwargs): trace_missing_values = False # Sane Choice? trace_quanta = config['TRACE_QUANTA'] - # Should we still have this? - # if arrival == 'poisson': # Modify the arrival times of according to Poisson distribution - # time_offset = next_arrival(1/config['JOB_ARRIVAL_TIME']) - # else: - # time_offset = (submit_time - min_time).total_seconds() # Compute time offset in seconds - # Removed from job_dict: time_offset=time_offset, - # Create job dictionary job_info = job_dict( nodes_required=nodes_required, diff --git a/raps/dataloaders/lassen.py b/raps/dataloaders/lassen.py index 88b36e7..a0d1ca7 100644 --- a/raps/dataloaders/lassen.py +++ b/raps/dataloaders/lassen.py @@ -198,16 +198,10 @@ def load_data_from_df(allocation_df, node_df, step_df, **kwargs): priority = row.get('priority', 0) partition = row.get('partition', "0") - if arrival == 'poisson': # Modify the submit times according to Poisson process - scheduled_nodes = None - submit_time = fastforward + next_arrival_byconfkwargs(config, kwargs) - start_time = submit_time # Pretend Job could start immediately # Alternative: None - end_time = submit_time + wall_time # Alternative: None - else: # Prescribed replay - scheduled_nodes = get_scheduled_nodes(row['allocation_id'], node_df) - submit_time = compute_time_offset(row['job_submit_timestamp'], telemetry_start_timestamp) - start_time = compute_time_offset(row['begin_timestamp'], telemetry_start_timestamp) - end_time = compute_time_offset(row['end_timestamp'], telemetry_start_timestamp) + scheduled_nodes = get_scheduled_nodes(row['allocation_id'], node_df) + submit_time = compute_time_offset(row['job_submit_timestamp'], telemetry_start_timestamp) + start_time = compute_time_offset(row['begin_timestamp'], telemetry_start_timestamp) + end_time = compute_time_offset(row['end_timestamp'], telemetry_start_timestamp) time_limit = row['time_limit'] diff --git a/raps/dataloaders/marconi100.py b/raps/dataloaders/marconi100.py index c4648b3..fd08b91 100644 --- a/raps/dataloaders/marconi100.py +++ b/raps/dataloaders/marconi100.py @@ -165,17 +165,11 @@ def load_data_from_df(jobs_df: pd.DataFrame, **kwargs): print("wall_time != (end_time - start_time)") print(f"{wall_time} != {(end_time - start_time)}") - if arrival == 'poisson': # Modify the arrival times according to Poisson distribution - scheduled_nodes = None - submit_time = next_arrival_byconfkwargs(config, kwargs) - start_time = None - end_time = None - else: # Prescribed replay - scheduled_nodes = (jobs_df.loc[jidx, 'nodes']).tolist() - - submit_timestamp = jobs_df.loc[jidx, 'submit_time'] - diff = submit_timestamp - telemetry_start_timestamp - submit_time = int(diff.total_seconds()) + scheduled_nodes = (jobs_df.loc[jidx, 'nodes']).tolist() + + submit_timestamp = jobs_df.loc[jidx, 'submit_time'] + diff = submit_timestamp - telemetry_start_timestamp + submit_time = int(diff.total_seconds()) trace_time = gpu_trace.size * config['TRACE_QUANTA'] # seconds trace_start_time = 0 diff --git a/raps/engine.py b/raps/engine.py index f371b3e..3760254 100644 --- a/raps/engine.py +++ b/raps/engine.py @@ -284,24 +284,20 @@ class Engine: else: replay_files = sim_config.replay - workload_result = td.load_from_files( - files=replay_files, - args=sim_config_args, config=system_config_dict, - ) + workload_result = td.load_from_files(replay_files) else: # Synthetic jobs wl = Workload(sim_config_args, system_config_dict) workload_result = wl.generate_jobs() td = Telemetry(**sim_config_dict) jobs = workload_result.jobs - timestep_start, timestep_end = workload_result.telemetry_start, workload_result.telemetry_end # TODO refactor how stat/end/fastforward/time work if sim_config.fastforward is not None: - timestep_start = timestep_start + sim_config.fastforward + workload_result.telemetry_start = workload_result.telemetry_start + sim_config.fastforward if sim_config.time is not None: - timestep_end = timestep_start + sim_config.time + workload_result.telemetry_end = workload_result.telemetry_end + sim_config.time if sim_config.time_delta is not None: time_delta = sim_config.time_delta diff --git a/raps/telemetry.py b/raps/telemetry.py index 83b09a2..0d848fc 100644 --- a/raps/telemetry.py +++ b/raps/telemetry.py @@ -148,11 +148,11 @@ class Telemetry: assert self.dataloader return self.dataloader.cdu_pos(index, config=self.config) - def load_from_live_system(self): + def load_from_live_system(self) -> WorkloadResult: result = self.load_live_data() return result - def load_from_files(self, *, files, args, config): + def load_from_files(self, files) -> WorkloadResult: """ Load all files as combined jobs """ assert len(files) >= 1 files = [Path(f) for f in files] @@ -162,26 +162,25 @@ class Telemetry: print(f"Loading {file}") result, args_from_file = self.load_snapshot(file) print(f"File was generated with: --system {args_from_file.system}") - - # TODO: should move this logic into a separate method and out of the individual dataloaders - if hasattr(args, 'scale') and args.scale: - for job in tqdm(result.jobs, desc=f"Scaling jobs to {args.scale} nodes"): - job.nodes_required = random.randint(1, args.scale) - job.scheduled_nodes = None # Setting to None triggers scheduler to assign nodes - - if hasattr(args, 'arrival') and args.arrival == 'poisson': - print("available nodes:", config['AVAILABLE_NODES']) - for job in tqdm(result.jobs, desc="Rescheduling jobs"): - job.scheduled_nodes = None - job.submit_time = next_arrival_byconfargs(config, args) - job.start_time = None - job.end_time = None else: # custom data loader - result = self.load_data(args.replay) - if args.time: - result.telemetry_end = result.telemetry_start + convert_to_time_unit(args.time) + result = self.load_data(files) + self.update_jobs(result.jobs) return result + def update_jobs(self, jobs: list[Job]): + """ Updates jobs with new scale or random start times """ + if self.kwargs.get("scale") is not None: + for job in jobs: + job.nodes_required = random.randint(1, self.kwargs['scale']) + job.scheduled_nodes = None # Setting to None triggers scheduler to assign nodes + + if self.kwargs['arrival'] == "poisson": + for job in jobs: + job.scheduled_nodes = None + job.submit_time = next_arrival_byconfargs(self.config, self.kwargs) + job.start_time = None + job.end_time = None + def run_telemetry_add_parser(subparsers: SubParsers): parser = subparsers.add_parser("telemetry", description=""" @@ -202,7 +201,7 @@ def run_telemetry(args: TelemetryArgs): if args.live and not args.replay: result = td.load_from_live_system() else: - result = td.load_from_files(files=args.replay, args=args, config=config) + result = td.load_from_files(args.replay) jobs = result.jobs timestep_start = result.telemetry_start timestep_end = result.telemetry_end diff --git a/raps/workload.py b/raps/workload.py index 48c7af3..015678f 100644 --- a/raps/workload.py +++ b/raps/workload.py @@ -977,7 +977,7 @@ def run_workload(sim_config: SimConfig): if sim_config.replay: td = Telemetry(**args_dict) - jobs = td.load_from_files(files=sim_config.replay, args=args, config=config).jobs + jobs = td.load_from_files(sim_config.replay).jobs else: workload = Workload(args, config) jobs = getattr(workload, sim_config.workload)(args=sim_config.get_legacy_args) -- GitLab From 31be11d3830327beb62f0047318476bbeb05d556 Mon Sep 17 00:00:00 2001 From: Jesse Hines Date: Thu, 4 Sep 2025 12:54:55 -0400 Subject: [PATCH 07/14] Add result file back --- raps/telemetry.py | 58 ++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 55 insertions(+), 3 deletions(-) diff --git a/raps/telemetry.py b/raps/telemetry.py index 0d848fc..8ec4c9a 100644 --- a/raps/telemetry.py +++ b/raps/telemetry.py @@ -42,6 +42,7 @@ class TelemetryArgs(BaseModel): replay: list[ExpandedPath] | None = None """ path/to/joblive path/to/jobprofile -or- filename.npz (overrides --workload option) """ plot: list[Literal["jobs", "nodes"]] | None = None + is_results_file: bool = False """ Output plots """ gantt_nodes: bool = False """ Print Gannt with nodes required as line thickness (default false) """ @@ -123,6 +124,53 @@ class Telemetry: return result, args + def load_csv_results(self, file): + jobs = [] + time_start = 0 + time_end = 0 + for line in pd.read_csv(file, chunksize=1): + job_info = job_dict(nodes_required=line.get('num_nodes').item(), + name=line.get('name').item(), + account=line.get('account').item(), + current_state=line.get('current_state').item(), + end_state=line.get('end_state').item(), + scheduled_nodes=line.get('scheduled_nodes').item(), + id=line.get('id').item(), + priority=line.get('priority').item(), + partition=line.get('partition').item(), + cpu_cores_required=line.get('cpu_cores_required').item(), + gpu_units_required=line.get('gpu_units_required').item(), + allocated_cpu_cores=line.get('allocated_cpu_cores').item(), + allocated_gpu_units=line.get('allocated_gpu_units').item(), + + cpu_trace=line.get('cpu_trace'), + gpu_trace=line.get('cpu_trace'), + ntx_trace=line.get('cpu_trace'), + nrx_trace=line.get('cpu_trace'), + submit_time=line.get('submit_time').item(), + time_limit=line.get('time_limit').item(), + start_time=line.get('start_time').item(), + end_time=line.get('end_time').item(), + expected_run_time=line.get('expected_run_time').item(), + current_run_time=line.get('current_run_time').item(), + trace_time=line.get('trace_time'), + # trace_start_time=line.get('trace_start_time').item(), + trace_start_time=line.get('trace_start_time'), + # trace_end_time=line.get('trace_end_time').item(), + trace_end_time=line.get('trace_end_time'), + trace_quanta=line.get('trace_quanta').item(), + trace_missing_values=line.get('trace_missing_values'), + downscale=line.get('downscale'), + ) + job = Job(job_info) + jobs.append(job) + # if hasattr(data,'args'): + # args_from_file = data["args"].item() # This should be empty as csv contains no args. + # else: + # args_from_file = None + + return jobs, time_start, time_end + def load_data(self, files): """Load telemetry data using custom data loaders.""" assert self.dataloader @@ -198,13 +246,17 @@ def run_telemetry(args: TelemetryArgs): args_dict['config'] = config td = Telemetry(**args_dict) + if args.is_results_file and args.replay: + file = str(args.replay[0]) + jobs, timestep_start, timestep_end = td.load_csv_results(file) if args.live and not args.replay: result = td.load_from_live_system() + jobs = result.jobs + timestep_start, timestep_end = result.telemetry_start, result.telemetry_end else: result = td.load_from_files(args.replay) - jobs = result.jobs - timestep_start = result.telemetry_start - timestep_end = result.telemetry_end + jobs = result.jobs + timestep_start, timestep_end = result.telemetry_start, result.telemetry_end if args.output: td.save_snapshot(dest = args.output, result = result, args = args) -- GitLab From 3b43f122781a8e0a8b5be65c7cff38496b14b3b7 Mon Sep 17 00:00:00 2001 From: Jesse Hines Date: Thu, 4 Sep 2025 14:19:18 -0400 Subject: [PATCH 08/14] Formatting --- raps/dataloaders/adastraMI250.py | 5 ++--- raps/dataloaders/frontier.py | 19 ++++++++-------- raps/dataloaders/fugaku.py | 4 ++-- raps/dataloaders/gcloudv2.py | 4 ++-- raps/dataloaders/lassen.py | 3 +-- raps/dataloaders/marconi100.py | 5 ++--- raps/dataloaders/mit_supercloud/loader.py | 2 +- raps/engine.py | 9 +++++--- raps/run_sim.py | 7 +++--- raps/telemetry.py | 27 ++++++++++------------- raps/utils.py | 4 ++-- raps/workload.py | 2 +- 12 files changed, 44 insertions(+), 47 deletions(-) diff --git a/raps/dataloaders/adastraMI250.py b/raps/dataloaders/adastraMI250.py index 4d96b02..1ce7689 100644 --- a/raps/dataloaders/adastraMI250.py +++ b/raps/dataloaders/adastraMI250.py @@ -24,7 +24,7 @@ import pandas as pd from tqdm import tqdm from ..job import job_dict, Job -from ..utils import next_arrival_byconfkwargs, WorkloadResult +from ..utils import WorkloadResult def load_data(jobs_path, **kwargs): @@ -58,7 +58,6 @@ def load_data_from_df(jobs_df: pd.DataFrame, **kwargs): """ count_jobs_notOK = 0 config = kwargs.get('config') - arrival = kwargs.get('arrival') validate = kwargs.get('validate') jid = kwargs.get('jid', '*') @@ -202,7 +201,7 @@ def load_data_from_df(jobs_df: pd.DataFrame, **kwargs): print("jobs not added: ", count_jobs_notOK) return WorkloadResult( - jobs = jobs, + jobs=jobs, telemetry_start=telemetry_start_time, telemetry_end=telemetry_end_time, start_date=telemetry_start_timestamp.tz_localize("UTC"), ) diff --git a/raps/dataloaders/frontier.py b/raps/dataloaders/frontier.py index de84770..9ef6b72 100644 --- a/raps/dataloaders/frontier.py +++ b/raps/dataloaders/frontier.py @@ -16,7 +16,7 @@ import pandas as pd from tqdm import tqdm from ..job import job_dict, Job -from ..utils import power_to_utilization, next_arrival_byconfkwargs, encrypt, WorkloadResult +from ..utils import power_to_utilization, encrypt, WorkloadResult def aging_boost(nnodes): @@ -137,7 +137,6 @@ def load_data_from_df(jobs_df: pd.DataFrame, jobprofile_df: pd.DataFrame, **kwar """ config = kwargs.get('config') encrypt_bool = kwargs.get('encrypt') - arrival = kwargs.get('arrival') validate = kwargs.get('validate') jid = kwargs.get('jid', '*') debug = kwargs.get('debug') @@ -319,10 +318,10 @@ def load_data_from_df(jobs_df: pd.DataFrame, jobprofile_df: pd.DataFrame, **kwar job = Job(job_info) jobs.append(job) return WorkloadResult( - jobs = jobs, - telemetry_start = telemetry_start, - telemetry_end = telemetry_end, - start_date = telemetry_start_timestamp, + jobs=jobs, + telemetry_start=telemetry_start, + telemetry_end=telemetry_end, + start_date=telemetry_start_timestamp, ) @@ -536,10 +535,10 @@ def load_live_data(**kwargs): jobs.append(job) return WorkloadResult( - jobs = jobs, - telemetry_start = telemetry_start, - telemetry_end = telemetry_end, - start_date = datetime.fromtimestamp(telemetry_start, timezone.utc), + jobs=jobs, + telemetry_start=telemetry_start, + telemetry_end=telemetry_end, + start_date=datetime.fromtimestamp(telemetry_start, timezone.utc), ) diff --git a/raps/dataloaders/fugaku.py b/raps/dataloaders/fugaku.py index 1515915..70b6cb8 100644 --- a/raps/dataloaders/fugaku.py +++ b/raps/dataloaders/fugaku.py @@ -163,8 +163,8 @@ def load_data_from_df(df, **kwargs): return WorkloadResult( jobs=job_list, - telemetry_start = telemetry_start, telemetry_end = telemetry_end, - start_date = telemetry_start_timestamp, + telemetry_start=telemetry_start, telemetry_end=telemetry_end, + start_date=telemetry_start_timestamp, ) diff --git a/raps/dataloaders/gcloudv2.py b/raps/dataloaders/gcloudv2.py index 60426cc..4dba4fb 100644 --- a/raps/dataloaders/gcloudv2.py +++ b/raps/dataloaders/gcloudv2.py @@ -2,7 +2,7 @@ import os import re from datetime import datetime from tqdm import tqdm -from typing import List, Optional, Generator, Tuple, Any, Union +from typing import List, Optional, Generator, Any, Union import numpy as np import pandas as pd @@ -335,7 +335,7 @@ def load_data(data_path: Union[str, List[str]], **kwargs: Any): telemetry_start = 0 telemetry_end = int(max(usage_map_end.values()) - t0) return WorkloadResult( - jobs = jobs, + jobs=jobs, telemetry_start=telemetry_start, telemetry_end=telemetry_end, # gcloud dataset timestamps are already relative, and it doesn't list a start exact date. start_date=datetime.fromisoformat("2011-05-02T00:00:00Z"), diff --git a/raps/dataloaders/lassen.py b/raps/dataloaders/lassen.py index a0d1ca7..c9c8378 100644 --- a/raps/dataloaders/lassen.py +++ b/raps/dataloaders/lassen.py @@ -38,7 +38,7 @@ from tqdm import tqdm from datetime import timedelta from ..job import job_dict, Job -from ..utils import power_to_utilization, next_arrival_byconfkwargs, parse_td, WorkloadResult +from ..utils import power_to_utilization, parse_td, WorkloadResult def load_data(path, **kwargs): @@ -60,7 +60,6 @@ def load_data_from_df(allocation_df, node_df, step_df, **kwargs): config = kwargs.get('config') jid = kwargs.get('jid', '*') validate = kwargs.get('validate') - arrival = kwargs.get('arrival') verbose = kwargs.get('verbose') fastforward = kwargs.get('fastforward') # int in seconds diff --git a/raps/dataloaders/marconi100.py b/raps/dataloaders/marconi100.py index fd08b91..961eca3 100644 --- a/raps/dataloaders/marconi100.py +++ b/raps/dataloaders/marconi100.py @@ -28,7 +28,7 @@ import pandas as pd from tqdm import tqdm from ..job import job_dict, Job -from ..utils import power_to_utilization, next_arrival_byconfkwargs, WorkloadResult +from ..utils import power_to_utilization, WorkloadResult def load_data(jobs_path, **kwargs): @@ -60,7 +60,6 @@ def load_data_from_df(jobs_df: pd.DataFrame, **kwargs): """ config = kwargs.get('config') # min_time = kwargs.get('min_time', None) # Unused - arrival = kwargs.get('arrival') validate = kwargs.get('validate') jid = kwargs.get('jid', '*') debug = kwargs.get('debug') @@ -228,7 +227,7 @@ def load_data_from_df(jobs_df: pd.DataFrame, **kwargs): jobs.append(job) return WorkloadResult( - jobs = jobs, + jobs=jobs, telemetry_start=telemetry_start, telemetry_end=telemetry_end, start_date=telemetry_start_timestamp, ) diff --git a/raps/dataloaders/mit_supercloud/loader.py b/raps/dataloaders/mit_supercloud/loader.py index 282f0b5..3642936 100644 --- a/raps/dataloaders/mit_supercloud/loader.py +++ b/raps/dataloaders/mit_supercloud/loader.py @@ -607,7 +607,7 @@ def load_data(local_dataset_path, **kwargs): print(f"- {reason}: {count}") return WorkloadResult( - jobs = jobs_list, + jobs=jobs_list, telemetry_start=0, telemetry_end=int(end_ts - start_ts), start_date=datetime.fromtimestamp(start_ts, timezone.utc), ) diff --git a/raps/engine.py b/raps/engine.py index 3760254..dbc5e76 100644 --- a/raps/engine.py +++ b/raps/engine.py @@ -40,9 +40,9 @@ from raps.downtime import Downtime from raps.weather import Weather from raps.sim_config import SimConfig from raps.system_config import SystemConfig - from bisect import bisect_right + @dataclasses.dataclass class TickData: """ Represents the state output from the simulation each tick """ @@ -289,7 +289,7 @@ class Engine: wl = Workload(sim_config_args, system_config_dict) workload_result = wl.generate_jobs() td = Telemetry(**sim_config_dict) - + jobs = workload_result.jobs # TODO refactor how stat/end/fastforward/time work @@ -547,7 +547,10 @@ class Engine: job.running_time = self.current_timestep - job.start_time if job.current_state != JobState.RUNNING: - raise ValueError(f"Job {job.id} is in running list, but state is not RUNNING: job.state == {job.current_state}") + raise ValueError( + f"Job {job.id} is in running list, " + + "but state is not RUNNING: job.state == {job.current_state}" + ) else: # if job.state == JobState.RUNNING: # Error checks if job.running_time > job.time_limit and job.end_time is not None: diff --git a/raps/run_sim.py b/raps/run_sim.py index b41eb80..f1834c7 100644 --- a/raps/run_sim.py +++ b/raps/run_sim.py @@ -82,8 +82,8 @@ def run_sim(sim_config: SimConfig): if out: out.mkdir(parents=True) engine.telemetry.save_snapshot( - dest = str(out), - result = workload_result, + dest=str(out), + result=workload_result, args=sim_config, ) jobs = workload_result.jobs @@ -242,7 +242,8 @@ def run_multi_part_sim_add_parser(subparsers: SubParsers): def run_multi_part_sim(sim_config: SimConfig): - multi_engine, workload_results, timestep_start, timestep_end, time_delta = MultiPartEngine.from_sim_config(sim_config) + multi_engine, workload_results, timestep_start, timestep_end, time_delta = \ + MultiPartEngine.from_sim_config(sim_config) # TODO: The mit_supercloud dataloader seems to be outputting the wrong timesteps? mit_supercloud # is the only multi-partition system with replay, so just manually overriding the timesteps here diff --git a/raps/telemetry.py b/raps/telemetry.py index 8ec4c9a..93d7992 100644 --- a/raps/telemetry.py +++ b/raps/telemetry.py @@ -7,7 +7,6 @@ The module defines a `Telemetry` class for managing telemetry data and several helper functions for data encryption and conversion between node name and index formats. """ from typing import Literal -import sys import random from pathlib import Path # import json @@ -16,7 +15,6 @@ from types import ModuleType import importlib import numpy as np import pandas as pd -from tqdm import tqdm from pydantic import BaseModel, model_validator # from rich.progress import track @@ -30,8 +28,7 @@ from raps.plotting import ( plot_network_histogram ) from raps.utils import ( - next_arrival_byconfargs, convert_to_time_unit, pydantic_add_args, SubParsers, ExpandedPath, - WorkloadResult, + next_arrival_byconfargs, pydantic_add_args, SubParsers, ExpandedPath, WorkloadResult, ) @@ -89,17 +86,17 @@ class Telemetry: print(f"WARNING: Failed to load dataloader: {e}") self.dataloader = None - def save_snapshot(self, *, dest: str, result: WorkloadResult, args: SimConfig|TelemetryArgs): + def save_snapshot(self, *, dest: str, result: WorkloadResult, args: SimConfig | TelemetryArgs): """Saves a snapshot of the jobs to a compressed file. """ np.savez_compressed(dest, - jobs=[vars(j) for j in result.jobs], - telemetry_start=result.telemetry_start, - telemetry_end=result.telemetry_end, - start_date=result.start_date, - args=args, - ) - - def load_snapshot(self, snapshot: str | Path) -> tuple[WorkloadResult, SimConfig|TelemetryArgs]: + jobs=[vars(j) for j in result.jobs], + telemetry_start=result.telemetry_start, + telemetry_end=result.telemetry_end, + start_date=result.start_date, + args=args, + ) + + def load_snapshot(self, snapshot: str | Path) -> tuple[WorkloadResult, SimConfig | TelemetryArgs]: """Reads a snapshot from a compressed file :param str snapshot: Filename @@ -210,7 +207,7 @@ class Telemetry: print(f"Loading {file}") result, args_from_file = self.load_snapshot(file) print(f"File was generated with: --system {args_from_file.system}") - else: # custom data loader + else: # custom data loader result = self.load_data(files) self.update_jobs(result.jobs) return result @@ -259,7 +256,7 @@ def run_telemetry(args: TelemetryArgs): timestep_start, timestep_end = result.telemetry_start, result.telemetry_end if args.output: - td.save_snapshot(dest = args.output, result = result, args = args) + td.save_snapshot(dest=args.output, result=result, args=args) timesteps = timestep_end - timestep_start diff --git a/raps/utils.py b/raps/utils.py index f67c533..0e636f7 100644 --- a/raps/utils.py +++ b/raps/utils.py @@ -798,7 +798,7 @@ class WorkloadResult(BaseModel): # TODO: It might make more sense to make start_timestep/end_timestep always unix time, then we # wouldn't need this extra start_date field. start_date: AwareDatetime - + model_config = ConfigDict( - arbitrary_types_allowed = True, + arbitrary_types_allowed=True, ) diff --git a/raps/workload.py b/raps/workload.py index 015678f..cc1ba09 100644 --- a/raps/workload.py +++ b/raps/workload.py @@ -68,7 +68,7 @@ class Workload: # The respective funciton of this class is called. jobs = getattr(self, self.args.workload)(args=self.args) return WorkloadResult( - jobs = jobs, + jobs=jobs, telemetry_start=0, telemetry_end=self.args.time, start_date=self.args.start, ) -- GitLab From d67a220ea37eab39e7911b3b1ddab6f5156a2282 Mon Sep 17 00:00:00 2001 From: Jesse Hines Date: Thu, 4 Sep 2025 14:56:03 -0400 Subject: [PATCH 09/14] Renaming --- raps/dataloaders/adastraMI250.py | 4 ++-- raps/dataloaders/bluewaters.py | 4 ++-- raps/dataloaders/frontier.py | 6 +++--- raps/dataloaders/fugaku.py | 4 ++-- raps/dataloaders/gcloudv2.py | 4 ++-- raps/dataloaders/kestrel.py | 4 ++-- raps/dataloaders/lassen.py | 4 ++-- raps/dataloaders/marconi100.py | 4 ++-- raps/dataloaders/mit_supercloud/loader.py | 4 ++-- raps/engine.py | 14 +++++++------- raps/multi_part_engine.py | 10 +++++----- raps/run_sim.py | 8 ++++---- raps/telemetry.py | 12 ++++++------ raps/utils.py | 2 +- raps/workload.py | 4 ++-- tests/systems/test_engine.py | 8 ++++---- 16 files changed, 48 insertions(+), 48 deletions(-) diff --git a/raps/dataloaders/adastraMI250.py b/raps/dataloaders/adastraMI250.py index 1ce7689..6eec26e 100644 --- a/raps/dataloaders/adastraMI250.py +++ b/raps/dataloaders/adastraMI250.py @@ -24,7 +24,7 @@ import pandas as pd from tqdm import tqdm from ..job import job_dict, Job -from ..utils import WorkloadResult +from ..utils import WorkloadData def load_data(jobs_path, **kwargs): @@ -200,7 +200,7 @@ def load_data_from_df(jobs_df: pd.DataFrame, **kwargs): count_jobs_notOK += 1 print("jobs not added: ", count_jobs_notOK) - return WorkloadResult( + return WorkloadData( jobs=jobs, telemetry_start=telemetry_start_time, telemetry_end=telemetry_end_time, start_date=telemetry_start_timestamp.tz_localize("UTC"), diff --git a/raps/dataloaders/bluewaters.py b/raps/dataloaders/bluewaters.py index c2d328f..272db1c 100644 --- a/raps/dataloaders/bluewaters.py +++ b/raps/dataloaders/bluewaters.py @@ -34,7 +34,7 @@ import pandas as pd from pathlib import Path from datetime import datetime, timezone from raps.telemetry import Job, job_dict -from raps.utils import WorkloadResult +from raps.utils import WorkloadData def throughput_traces(total_tx, total_rx, intervals): @@ -330,7 +330,7 @@ def load_data(local_dataset_path, **kwargs): telemetry_start = 0 telemetry_end = max((j.end_time for j in jobs), default=0) - return WorkloadResult( + return WorkloadData( jobs=jobs, telemetry_start=telemetry_start, telemetry_end=telemetry_end, start_date=datetime.fromtimestamp(t0, timezone.utc), diff --git a/raps/dataloaders/frontier.py b/raps/dataloaders/frontier.py index 9ef6b72..0e1bdcd 100644 --- a/raps/dataloaders/frontier.py +++ b/raps/dataloaders/frontier.py @@ -16,7 +16,7 @@ import pandas as pd from tqdm import tqdm from ..job import job_dict, Job -from ..utils import power_to_utilization, encrypt, WorkloadResult +from ..utils import power_to_utilization, encrypt, WorkloadData def aging_boost(nnodes): @@ -317,7 +317,7 @@ def load_data_from_df(jobs_df: pd.DataFrame, jobprofile_df: pd.DataFrame, **kwar job = Job(job_info) jobs.append(job) - return WorkloadResult( + return WorkloadData( jobs=jobs, telemetry_start=telemetry_start, telemetry_end=telemetry_end, @@ -534,7 +534,7 @@ def load_live_data(**kwargs): job = Job(job_info) jobs.append(job) - return WorkloadResult( + return WorkloadData( jobs=jobs, telemetry_start=telemetry_start, telemetry_end=telemetry_end, diff --git a/raps/dataloaders/fugaku.py b/raps/dataloaders/fugaku.py index 70b6cb8..4ccc885 100644 --- a/raps/dataloaders/fugaku.py +++ b/raps/dataloaders/fugaku.py @@ -17,7 +17,7 @@ import pandas as pd from tqdm import tqdm from ..job import job_dict, Job -from ..utils import WorkloadResult +from ..utils import WorkloadData def load_data(path, **kwargs): @@ -161,7 +161,7 @@ def load_data_from_df(df, **kwargs): job = Job(job_info) job_list.append(job) - return WorkloadResult( + return WorkloadData( jobs=job_list, telemetry_start=telemetry_start, telemetry_end=telemetry_end, start_date=telemetry_start_timestamp, diff --git a/raps/dataloaders/gcloudv2.py b/raps/dataloaders/gcloudv2.py index 4dba4fb..6f05a87 100644 --- a/raps/dataloaders/gcloudv2.py +++ b/raps/dataloaders/gcloudv2.py @@ -8,7 +8,7 @@ import numpy as np import pandas as pd from raps.job import job_dict, Job -from raps.utils import WorkloadResult +from raps.utils import WorkloadData """ Official instructions are here: @@ -334,7 +334,7 @@ def load_data(data_path: Union[str, List[str]], **kwargs: Any): # Compute simulation span: start at t=0, end at the latest job finish telemetry_start = 0 telemetry_end = int(max(usage_map_end.values()) - t0) - return WorkloadResult( + return WorkloadData( jobs=jobs, telemetry_start=telemetry_start, telemetry_end=telemetry_end, # gcloud dataset timestamps are already relative, and it doesn't list a start exact date. diff --git a/raps/dataloaders/kestrel.py b/raps/dataloaders/kestrel.py index 04adaa8..c9efd70 100644 --- a/raps/dataloaders/kestrel.py +++ b/raps/dataloaders/kestrel.py @@ -6,7 +6,7 @@ import pandas as pd from tqdm import tqdm from ..job import job_dict, Job -from ..utils import power_to_utilization, next_arrival, WorkloadResult +from ..utils import power_to_utilization, next_arrival, WorkloadData def load_data(jobs_path, **kwargs): @@ -153,7 +153,7 @@ def load_data_from_df(jobs_df: pd.DataFrame, **kwargs): trace_quanta=trace_quanta) jobs.append(Job(job_info)) - return WorkloadResult( + return WorkloadData( jobs=jobs, telemetry_start=telemetry_start, telemetry_end=telemetry_end, start_date=telemetry_start_timestamp, diff --git a/raps/dataloaders/lassen.py b/raps/dataloaders/lassen.py index c9c8378..fd0e364 100644 --- a/raps/dataloaders/lassen.py +++ b/raps/dataloaders/lassen.py @@ -38,7 +38,7 @@ from tqdm import tqdm from datetime import timedelta from ..job import job_dict, Job -from ..utils import power_to_utilization, parse_td, WorkloadResult +from ..utils import power_to_utilization, parse_td, WorkloadData def load_data(path, **kwargs): @@ -242,7 +242,7 @@ def load_data_from_df(allocation_df, node_df, step_df, **kwargs): job = Job(job_info) job_list.append(job) - return WorkloadResult( + return WorkloadData( jobs=job_list, telemetry_start=telemetry_start_time, telemetry_end=telemetry_end_time, start_date=telemetry_start_timestamp, diff --git a/raps/dataloaders/marconi100.py b/raps/dataloaders/marconi100.py index 961eca3..a10e1e8 100644 --- a/raps/dataloaders/marconi100.py +++ b/raps/dataloaders/marconi100.py @@ -28,7 +28,7 @@ import pandas as pd from tqdm import tqdm from ..job import job_dict, Job -from ..utils import power_to_utilization, WorkloadResult +from ..utils import power_to_utilization, WorkloadData def load_data(jobs_path, **kwargs): @@ -226,7 +226,7 @@ def load_data_from_df(jobs_df: pd.DataFrame, **kwargs): job = Job(job_info) jobs.append(job) - return WorkloadResult( + return WorkloadData( jobs=jobs, telemetry_start=telemetry_start, telemetry_end=telemetry_end, start_date=telemetry_start_timestamp, diff --git a/raps/dataloaders/mit_supercloud/loader.py b/raps/dataloaders/mit_supercloud/loader.py index 3642936..e3103ba 100644 --- a/raps/dataloaders/mit_supercloud/loader.py +++ b/raps/dataloaders/mit_supercloud/loader.py @@ -118,7 +118,7 @@ from typing import Dict, Union, Optional from collections import Counter from datetime import datetime, timezone from raps.job import job_dict, Job -from raps.utils import summarize_ranges, WorkloadResult +from raps.utils import summarize_ranges, WorkloadData from .utils import proc_cpu_series, proc_gpu_series, to_epoch from .utils import DEFAULT_START, DEFAULT_END @@ -606,7 +606,7 @@ def load_data(local_dataset_path, **kwargs): for reason, count in skip_counts.items(): print(f"- {reason}: {count}") - return WorkloadResult( + return WorkloadData( jobs=jobs_list, telemetry_start=0, telemetry_end=int(end_ts - start_ts), start_date=datetime.fromtimestamp(start_ts, timezone.utc), diff --git a/raps/engine.py b/raps/engine.py index dbc5e76..3ef0d36 100644 --- a/raps/engine.py +++ b/raps/engine.py @@ -265,7 +265,7 @@ class Engine: if sim_config.live and not sim_config.replay: td = Telemetry(**sim_config_dict) - workload_result = td.load_from_live_system() + workload_data = td.load_from_live_system() elif sim_config.replay: # TODO: this will have issues if running separate systems or custom systems partition_short = partition.split("/")[-1] if partition else None @@ -284,20 +284,20 @@ class Engine: else: replay_files = sim_config.replay - workload_result = td.load_from_files(replay_files) + workload_data = td.load_from_files(replay_files) else: # Synthetic jobs wl = Workload(sim_config_args, system_config_dict) - workload_result = wl.generate_jobs() + workload_data = wl.generate_jobs() td = Telemetry(**sim_config_dict) - jobs = workload_result.jobs + jobs = workload_data.jobs # TODO refactor how stat/end/fastforward/time work if sim_config.fastforward is not None: - workload_result.telemetry_start = workload_result.telemetry_start + sim_config.fastforward + workload_data.telemetry_start = workload_data.telemetry_start + sim_config.fastforward if sim_config.time is not None: - workload_result.telemetry_end = workload_result.telemetry_end + sim_config.time + workload_data.telemetry_end = workload_data.telemetry_end + sim_config.time if sim_config.time_delta is not None: time_delta = sim_config.time_delta @@ -330,7 +330,7 @@ class Engine: system_config=system_config, ) - return engine, workload_result, time_delta + return engine, workload_data, time_delta def add_running_jobs_to_queue(self, jobs_to_submit: List): """ diff --git a/raps/multi_part_engine.py b/raps/multi_part_engine.py index bebf47b..f211b85 100644 --- a/raps/multi_part_engine.py +++ b/raps/multi_part_engine.py @@ -1,7 +1,7 @@ from collections.abc import Iterable from raps.engine import Engine, TickData from raps.sim_config import SimConfig -from raps.utils import WorkloadResult +from raps.utils import WorkloadData class MultiPartEngine: @@ -18,18 +18,18 @@ class MultiPartEngine: if len(root_systems) > 1: raise ValueError("Replay for multi-system runs is not supported") - workloads_by_partition: dict[str, WorkloadResult] = {} + workloads_by_partition: dict[str, WorkloadData] = {} engines: dict[str, Engine] = {} timestep_start, timestep_end, time_delta = 0, 0, 0 for partition in sim_config.system_configs: name = partition.system_name - engine, workload_result, time_delta = Engine.from_sim_config( + engine, workload_data, time_delta = Engine.from_sim_config( sim_config, partition=name, ) - for job in workload_result.jobs: + for job in workload_data.jobs: job.partition = name - workloads_by_partition[name] = workload_result + workloads_by_partition[name] = workload_data engines[name] = engine total_initial_jobs = sum(len(j.jobs) for j in workloads_by_partition.values()) for engine in engines.values(): diff --git a/raps/run_sim.py b/raps/run_sim.py index f1834c7..ceb80a9 100644 --- a/raps/run_sim.py +++ b/raps/run_sim.py @@ -76,18 +76,18 @@ def run_sim(sim_config: SimConfig): print("Use run-multi-part to run multi-partition simulations") sys.exit(1) - engine, workload_result, time_delta = Engine.from_sim_config(sim_config) + engine, workload_data, time_delta = Engine.from_sim_config(sim_config) out = sim_config.output if out: out.mkdir(parents=True) engine.telemetry.save_snapshot( dest=str(out), - result=workload_result, + result=workload_data, args=sim_config, ) - jobs = workload_result.jobs - timestep_start, timestep_end = workload_result.telemetry_start, workload_result.telemetry_end + jobs = workload_data.jobs + timestep_start, timestep_end = workload_data.telemetry_start, workload_data.telemetry_end total_timesteps = timestep_end - timestep_start downscale = sim_config.downscale diff --git a/raps/telemetry.py b/raps/telemetry.py index 93d7992..d82a6c5 100644 --- a/raps/telemetry.py +++ b/raps/telemetry.py @@ -28,7 +28,7 @@ from raps.plotting import ( plot_network_histogram ) from raps.utils import ( - next_arrival_byconfargs, pydantic_add_args, SubParsers, ExpandedPath, WorkloadResult, + next_arrival_byconfargs, pydantic_add_args, SubParsers, ExpandedPath, WorkloadData, ) @@ -86,7 +86,7 @@ class Telemetry: print(f"WARNING: Failed to load dataloader: {e}") self.dataloader = None - def save_snapshot(self, *, dest: str, result: WorkloadResult, args: SimConfig | TelemetryArgs): + def save_snapshot(self, *, dest: str, result: WorkloadData, args: SimConfig | TelemetryArgs): """Saves a snapshot of the jobs to a compressed file. """ np.savez_compressed(dest, jobs=[vars(j) for j in result.jobs], @@ -96,7 +96,7 @@ class Telemetry: args=args, ) - def load_snapshot(self, snapshot: str | Path) -> tuple[WorkloadResult, SimConfig | TelemetryArgs]: + def load_snapshot(self, snapshot: str | Path) -> tuple[WorkloadData, SimConfig | TelemetryArgs]: """Reads a snapshot from a compressed file :param str snapshot: Filename @@ -113,7 +113,7 @@ class Telemetry: start_date = data['start_date'].item() args = data['args'].item() - result = WorkloadResult( + result = WorkloadData( jobs=jobs, telemetry_start=telemetry_start, telemetry_end=telemetry_end, start_date=start_date, @@ -193,11 +193,11 @@ class Telemetry: assert self.dataloader return self.dataloader.cdu_pos(index, config=self.config) - def load_from_live_system(self) -> WorkloadResult: + def load_from_live_system(self) -> WorkloadData: result = self.load_live_data() return result - def load_from_files(self, files) -> WorkloadResult: + def load_from_files(self, files) -> WorkloadData: """ Load all files as combined jobs """ assert len(files) >= 1 files = [Path(f) for f in files] diff --git a/raps/utils.py b/raps/utils.py index 0e636f7..323ac8a 100644 --- a/raps/utils.py +++ b/raps/utils.py @@ -711,7 +711,7 @@ def yaml_dump(data): ) -class WorkloadResult(BaseModel): +class WorkloadData(BaseModel): """ Represents a workload, a list of jobs with some metadata. Returned by dataloaders load_data() function, and by Workload.generate_jobs(). diff --git a/raps/workload.py b/raps/workload.py index cc1ba09..338976d 100644 --- a/raps/workload.py +++ b/raps/workload.py @@ -31,7 +31,7 @@ from raps.utils import ( next_arrival_byconfargs, truncated_weibull, truncated_weibull_float, - WorkloadResult, + WorkloadData, ) import math import random @@ -67,7 +67,7 @@ class Workload: # This function calls the job generation function as specified by the workload keyword. # The respective funciton of this class is called. jobs = getattr(self, self.args.workload)(args=self.args) - return WorkloadResult( + return WorkloadData( jobs=jobs, telemetry_start=0, telemetry_end=self.args.time, start_date=self.args.start, diff --git a/tests/systems/test_engine.py b/tests/systems/test_engine.py index 4d57752..e483b18 100644 --- a/tests/systems/test_engine.py +++ b/tests/systems/test_engine.py @@ -22,10 +22,10 @@ def test_engine(system, system_config, sim_output): "system": system, "time": "2m", }) - engine, workload_result, time_delta = Engine.from_sim_config(sim_config) - jobs = workload_result.jobs - timestep_start = workload_result.telemetry_start - timestep_end = workload_result.telemetry_end + engine, workload_data, time_delta = Engine.from_sim_config(sim_config) + jobs = workload_data.jobs + timestep_start = workload_data.telemetry_start + timestep_end = workload_data.telemetry_end ticks = list(engine.run_simulation(jobs, timestep_start, timestep_end, time_delta)) assert len(ticks) == 120 -- GitLab From a38a3ad4be5947df595d4cde68a3f356834fb612 Mon Sep 17 00:00:00 2001 From: Jesse Hines Date: Thu, 4 Sep 2025 15:10:42 -0400 Subject: [PATCH 10/14] Allow multiple npz files --- raps/telemetry.py | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/raps/telemetry.py b/raps/telemetry.py index d82a6c5..1b5c256 100644 --- a/raps/telemetry.py +++ b/raps/telemetry.py @@ -203,14 +203,22 @@ class Telemetry: files = [Path(f) for f in files] if str(files[0]).endswith(".npz"): - file = files[0] - print(f"Loading {file}") - result, args_from_file = self.load_snapshot(file) - print(f"File was generated with: --system {args_from_file.system}") + data: WorkloadData | None = None + for file in files: + print(f"Loading {file}") + new_data, args_from_file = self.load_snapshot(file) + print(f"File was generated with: --system {args_from_file.system}") + if not data: + data = new_data + else: + data.jobs.extend(new_data.jobs) + data.telemetry_start = min(data.telemetry_start, new_data.telemetry_start) + data.telemetry_end = min(data.telemetry_end, new_data.telemetry_end) + data.start_date = min(data.start_date, new_data.start_date) else: # custom data loader - result = self.load_data(files) - self.update_jobs(result.jobs) - return result + data = self.load_data(files) + self.update_jobs(data.jobs) + return data def update_jobs(self, jobs: list[Job]): """ Updates jobs with new scale or random start times """ -- GitLab From b84a805c43d8536eb5d910b97a66de71533485b7 Mon Sep 17 00:00:00 2001 From: Jesse Hines Date: Thu, 4 Sep 2025 15:36:27 -0400 Subject: [PATCH 11/14] Fix bug --- raps/engine.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/raps/engine.py b/raps/engine.py index 3ef0d36..25a9b55 100644 --- a/raps/engine.py +++ b/raps/engine.py @@ -297,7 +297,7 @@ class Engine: workload_data.telemetry_start = workload_data.telemetry_start + sim_config.fastforward if sim_config.time is not None: - workload_data.telemetry_end = workload_data.telemetry_end + sim_config.time + workload_data.telemetry_end = workload_data.telemetry_start + sim_config.time if sim_config.time_delta is not None: time_delta = sim_config.time_delta -- GitLab From f4212243c39c6bb75ae32d249c090a30f75e4cfa Mon Sep 17 00:00:00 2001 From: Jesse Hines Date: Thu, 4 Sep 2025 16:58:12 -0400 Subject: [PATCH 12/14] Fix workload no time --- raps/workload.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/raps/workload.py b/raps/workload.py index 338976d..9e26c97 100644 --- a/raps/workload.py +++ b/raps/workload.py @@ -67,9 +67,10 @@ class Workload: # This function calls the job generation function as specified by the workload keyword. # The respective funciton of this class is called. jobs = getattr(self, self.args.workload)(args=self.args) + timestep_end = int(math.ceil(max([job.end_time for job in jobs]))) return WorkloadData( jobs=jobs, - telemetry_start=0, telemetry_end=self.args.time, + telemetry_start=0, telemetry_end=timestep_end, start_date=self.args.start, ) -- GitLab From d3dce958624c32c8aba811acd7911d92bc377e5f Mon Sep 17 00:00:00 2001 From: Jesse Hines Date: Thu, 4 Sep 2025 17:16:20 -0400 Subject: [PATCH 13/14] Fix workload --- raps/workload.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/raps/workload.py b/raps/workload.py index 9e26c97..6fb3c3b 100644 --- a/raps/workload.py +++ b/raps/workload.py @@ -981,7 +981,7 @@ def run_workload(sim_config: SimConfig): jobs = td.load_from_files(sim_config.replay).jobs else: workload = Workload(args, config) - jobs = getattr(workload, sim_config.workload)(args=sim_config.get_legacy_args) + jobs = getattr(workload, sim_config.workload)(args=sim_config.get_legacy_args()) plot_job_hist(jobs, config=config, dist_split=sim_config.multimodal, -- GitLab From 0469271890ec4642e9f35eb6c9c3b4fa1affaf45 Mon Sep 17 00:00:00 2001 From: Jesse Hines Date: Fri, 5 Sep 2025 09:15:47 -0400 Subject: [PATCH 14/14] Fix bug in update_jobs Scale can be 0 --- raps/telemetry.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/raps/telemetry.py b/raps/telemetry.py index 1b5c256..340b9ae 100644 --- a/raps/telemetry.py +++ b/raps/telemetry.py @@ -222,7 +222,7 @@ class Telemetry: def update_jobs(self, jobs: list[Job]): """ Updates jobs with new scale or random start times """ - if self.kwargs.get("scale") is not None: + if self.kwargs.get("scale"): for job in jobs: job.nodes_required = random.randint(1, self.kwargs['scale']) job.scheduled_nodes = None # Setting to None triggers scheduler to assign nodes -- GitLab