diff --git a/raps/dataloaders/adastraMI250.py b/raps/dataloaders/adastraMI250.py index 90201c84caba695e419092f1f1661a6df7687760..6eec26e8431f54ba08c03561c6c518f882b3546d 100644 --- a/raps/dataloaders/adastraMI250.py +++ b/raps/dataloaders/adastraMI250.py @@ -24,7 +24,7 @@ import pandas as pd from tqdm import tqdm from ..job import job_dict, Job -from ..utils import next_arrival_byconfkwargs +from ..utils import WorkloadData def load_data(jobs_path, **kwargs): @@ -58,7 +58,6 @@ def load_data_from_df(jobs_df: pd.DataFrame, **kwargs): """ count_jobs_notOK = 0 config = kwargs.get('config') - arrival = kwargs.get('arrival') validate = kwargs.get('validate') jid = kwargs.get('jid', '*') @@ -146,15 +145,11 @@ def load_data_from_df(jobs_df: pd.DataFrame, **kwargs): priority = int(jobs_df.loc[jidx, 'priority']) - if arrival == 'poisson': # Modify the arrival times of the jobs according to Poisson distribution - scheduled_nodes = None - submit_time = next_arrival_byconfkwargs(config, kwargs) - else: # Prescribed replay - scheduled_nodes = (jobs_df.loc[jidx, 'nodes']).tolist() + scheduled_nodes = (jobs_df.loc[jidx, 'nodes']).tolist() - submit_timestamp = jobs_df.loc[jidx, 'submit_time'] - diff = submit_timestamp - telemetry_start_timestamp - submit_time = int(diff.total_seconds()) + submit_timestamp = jobs_df.loc[jidx, 'submit_time'] + diff = submit_timestamp - telemetry_start_timestamp + submit_time = int(diff.total_seconds()) time_limit = jobs_df.loc[jidx, 'time_limit'] # in seconds @@ -205,7 +200,11 @@ def load_data_from_df(jobs_df: pd.DataFrame, **kwargs): count_jobs_notOK += 1 print("jobs not added: ", count_jobs_notOK) - return jobs, telemetry_start_time, telemetry_end_time + return WorkloadData( + jobs=jobs, + telemetry_start=telemetry_start_time, telemetry_end=telemetry_end_time, + start_date=telemetry_start_timestamp.tz_localize("UTC"), + ) def xname_to_index(xname: str, config: dict): diff --git a/raps/dataloaders/bluewaters.py b/raps/dataloaders/bluewaters.py index e26c51f5959334e5e3db775e9d85807f3bca5eec..9d5dc4d0dd5162b568b403d3f49cde2b6233afec 100644 --- a/raps/dataloaders/bluewaters.py +++ b/raps/dataloaders/bluewaters.py @@ -42,8 +42,10 @@ import math import re import pandas as pd from pathlib import Path +from datetime import datetime, timezone from pprint import pprint from raps.telemetry import Job, job_dict +from raps.utils import WorkloadData def throughput_traces(total_tx, total_rx, intervals): @@ -345,9 +347,16 @@ def load_data(local_dataset_path, **kwargs): j.trace_start_time -= t0 j.trace_end_time -= t0 + # pprint(jobs) + if debug: pprint(jobs) - simulation_start = 0 - simulation_end = max((j.end_time for j in jobs), default=0) - return jobs, simulation_start, simulation_end + telemetry_start = 0 + telemetry_end = max((j.end_time for j in jobs), default=0) + + return WorkloadData( + jobs=jobs, + telemetry_start=telemetry_start, telemetry_end=telemetry_end, + start_date=datetime.fromtimestamp(t0, timezone.utc), + ) diff --git a/raps/dataloaders/frontier.py b/raps/dataloaders/frontier.py index 8491617e8886afbfda35930ddcd49cf0e24a0e9a..0e1bdcde2acb39f395fe794d14ba9195da38a162 100644 --- a/raps/dataloaders/frontier.py +++ b/raps/dataloaders/frontier.py @@ -10,12 +10,13 @@ python -m raps.telemetry -f $DPATH/slurm/joblive/$DATEDIR,$DPATH/jobprofile/$DATEDIR """ import time +from datetime import datetime, timezone import numpy as np import pandas as pd from tqdm import tqdm from ..job import job_dict, Job -from ..utils import power_to_utilization, next_arrival_byconfkwargs, encrypt +from ..utils import power_to_utilization, encrypt, WorkloadData def aging_boost(nnodes): @@ -136,7 +137,6 @@ def load_data_from_df(jobs_df: pd.DataFrame, jobprofile_df: pd.DataFrame, **kwar """ config = kwargs.get('config') encrypt_bool = kwargs.get('encrypt') - arrival = kwargs.get('arrival') validate = kwargs.get('validate') jid = kwargs.get('jid', '*') debug = kwargs.get('debug') @@ -266,20 +266,12 @@ def load_data_from_df(jobs_df: pd.DataFrame, jobprofile_df: pd.DataFrame, **kwar if '' in xnames: continue - if arrival == 'poisson': # Modify the arrival times of the jobs according to Poisson distribution - scheduled_nodes = None - submit_time = next_arrival_byconfkwargs(config, kwargs) - start_time = None # ? - end_time = None # ? - priority = aging_boost(nodes_required) - - else: # Prescribed replay - scheduled_nodes = [] - # priority = 0 # not used for replay - priority = aging_boost(nodes_required) - for xname in xnames: - indices = xname_to_index(xname, config) - scheduled_nodes.append(indices) + scheduled_nodes = [] + # priority = 0 # not used for replay + priority = aging_boost(nodes_required) + for xname in xnames: + indices = xname_to_index(xname, config) + scheduled_nodes.append(indices) # Throw out jobs that are not valid! if gpu_trace.size == 0: @@ -325,7 +317,12 @@ def load_data_from_df(jobs_df: pd.DataFrame, jobprofile_df: pd.DataFrame, **kwar job = Job(job_info) jobs.append(job) - return jobs, telemetry_start, telemetry_end + return WorkloadData( + jobs=jobs, + telemetry_start=telemetry_start, + telemetry_end=telemetry_end, + start_date=telemetry_start_timestamp, + ) def load_live_data(**kwargs): @@ -537,7 +534,12 @@ def load_live_data(**kwargs): job = Job(job_info) jobs.append(job) - return jobs, telemetry_start, telemetry_end + return WorkloadData( + jobs=jobs, + telemetry_start=telemetry_start, + telemetry_end=telemetry_end, + start_date=datetime.fromtimestamp(telemetry_start, timezone.utc), + ) def xname_to_index(xname: str, config: dict): diff --git a/raps/dataloaders/fugaku.py b/raps/dataloaders/fugaku.py index 734fa61158ca661678a65e6a4191e8ed28c091e6..4ccc8859b9b345b4d2e1d912f25ccd6796350504 100644 --- a/raps/dataloaders/fugaku.py +++ b/raps/dataloaders/fugaku.py @@ -17,6 +17,7 @@ import pandas as pd from tqdm import tqdm from ..job import job_dict, Job +from ..utils import WorkloadData def load_data(path, **kwargs): @@ -134,13 +135,6 @@ def load_data_from_df(df, **kwargs): trace_missing_values = False # Sane Choice? trace_quanta = config['TRACE_QUANTA'] - # Should we still have this? - # if arrival == 'poisson': # Modify the arrival times of according to Poisson distribution - # time_offset = next_arrival(1/config['JOB_ARRIVAL_TIME']) - # else: - # time_offset = (submit_time - min_time).total_seconds() # Compute time offset in seconds - # Removed from job_dict: time_offset=time_offset, - # Create job dictionary job_info = job_dict( nodes_required=nodes_required, @@ -167,7 +161,11 @@ def load_data_from_df(df, **kwargs): job = Job(job_info) job_list.append(job) - return job_list, telemetry_start, telemetry_end + return WorkloadData( + jobs=job_list, + telemetry_start=telemetry_start, telemetry_end=telemetry_end, + start_date=telemetry_start_timestamp, + ) def node_index_to_name(index: int, config: dict): diff --git a/raps/dataloaders/gcloudv2.py b/raps/dataloaders/gcloudv2.py index e19b0e8f5f5d95f307162c704d0afbd8581b7429..6f05a878b74df09d9cdd0d63b1b93494c6a4f067 100644 --- a/raps/dataloaders/gcloudv2.py +++ b/raps/dataloaders/gcloudv2.py @@ -1,13 +1,14 @@ import os import re +from datetime import datetime from tqdm import tqdm -from typing import List, Optional, Generator, Tuple, Any, Union +from typing import List, Optional, Generator, Any, Union import numpy as np import pandas as pd -from raps.job import job_dict -from raps.job import Job +from raps.job import job_dict, Job +from raps.utils import WorkloadData """ Official instructions are here: @@ -200,7 +201,7 @@ class GoogleClusterV2DataLoader: yield pd.concat(dfs, ignore_index=True) -def load_data(data_path: Union[str, List[str]], **kwargs: Any) -> Tuple[List[Any], float, float]: +def load_data(data_path: Union[str, List[str]], **kwargs: Any): config = kwargs.get('config') # Unpack list if isinstance(data_path, list): @@ -331,6 +332,11 @@ def load_data(data_path: Union[str, List[str]], **kwargs: Any) -> Tuple[List[Any jobs.append(Job(job_d)) # Compute simulation span: start at t=0, end at the latest job finish - simulation_start = 0 - simulation_end = int(max(usage_map_end.values()) - t0) - return jobs, simulation_start, simulation_end + telemetry_start = 0 + telemetry_end = int(max(usage_map_end.values()) - t0) + return WorkloadData( + jobs=jobs, + telemetry_start=telemetry_start, telemetry_end=telemetry_end, + # gcloud dataset timestamps are already relative, and it doesn't list a start exact date. + start_date=datetime.fromisoformat("2011-05-02T00:00:00Z"), + ) diff --git a/raps/dataloaders/kestrel.py b/raps/dataloaders/kestrel.py index 8b8470a8cef5aeb1696fbdbe866e45ab090ceeb8..c9efd704f47de86b77d679414da033725b166121 100644 --- a/raps/dataloaders/kestrel.py +++ b/raps/dataloaders/kestrel.py @@ -6,7 +6,7 @@ import pandas as pd from tqdm import tqdm from ..job import job_dict, Job -from ..utils import power_to_utilization, next_arrival +from ..utils import power_to_utilization, next_arrival, WorkloadData def load_data(jobs_path, **kwargs): @@ -153,7 +153,11 @@ def load_data_from_df(jobs_df: pd.DataFrame, **kwargs): trace_quanta=trace_quanta) jobs.append(Job(job_info)) - return jobs, telemetry_start, telemetry_end + return WorkloadData( + jobs=jobs, + telemetry_start=telemetry_start, telemetry_end=telemetry_end, + start_date=telemetry_start_timestamp, + ) def node_index_to_name(index: int, config: dict): diff --git a/raps/dataloaders/lassen.py b/raps/dataloaders/lassen.py index c9aae0d4411e0c786ef11a12c25cb18d51b056af..fd0e364314a4f11354c72bd19ffdb4f73118995d 100644 --- a/raps/dataloaders/lassen.py +++ b/raps/dataloaders/lassen.py @@ -38,7 +38,7 @@ from tqdm import tqdm from datetime import timedelta from ..job import job_dict, Job -from ..utils import power_to_utilization, next_arrival_byconfkwargs, parse_td +from ..utils import power_to_utilization, parse_td, WorkloadData def load_data(path, **kwargs): @@ -60,7 +60,6 @@ def load_data_from_df(allocation_df, node_df, step_df, **kwargs): config = kwargs.get('config') jid = kwargs.get('jid', '*') validate = kwargs.get('validate') - arrival = kwargs.get('arrival') verbose = kwargs.get('verbose') fastforward = kwargs.get('fastforward') # int in seconds @@ -198,16 +197,10 @@ def load_data_from_df(allocation_df, node_df, step_df, **kwargs): priority = row.get('priority', 0) partition = row.get('partition', "0") - if arrival == 'poisson': # Modify the submit times according to Poisson process - scheduled_nodes = None - submit_time = fastforward + next_arrival_byconfkwargs(config, kwargs) - start_time = submit_time # Pretend Job could start immediately # Alternative: None - end_time = submit_time + wall_time # Alternative: None - else: # Prescribed replay - scheduled_nodes = get_scheduled_nodes(row['allocation_id'], node_df) - submit_time = compute_time_offset(row['job_submit_timestamp'], telemetry_start_timestamp) - start_time = compute_time_offset(row['begin_timestamp'], telemetry_start_timestamp) - end_time = compute_time_offset(row['end_timestamp'], telemetry_start_timestamp) + scheduled_nodes = get_scheduled_nodes(row['allocation_id'], node_df) + submit_time = compute_time_offset(row['job_submit_timestamp'], telemetry_start_timestamp) + start_time = compute_time_offset(row['begin_timestamp'], telemetry_start_timestamp) + end_time = compute_time_offset(row['end_timestamp'], telemetry_start_timestamp) time_limit = row['time_limit'] @@ -249,7 +242,11 @@ def load_data_from_df(allocation_df, node_df, step_df, **kwargs): job = Job(job_info) job_list.append(job) - return job_list, telemetry_start_time, telemetry_end_time + return WorkloadData( + jobs=job_list, + telemetry_start=telemetry_start_time, telemetry_end=telemetry_end_time, + start_date=telemetry_start_timestamp, + ) def get_scheduled_nodes(allocation_id, node_df): diff --git a/raps/dataloaders/marconi100.py b/raps/dataloaders/marconi100.py index fef8ec0357547f2f2e05519e42c998cedd9c154a..a10e1e8a28ba42886c093c5c456c449330618e92 100644 --- a/raps/dataloaders/marconi100.py +++ b/raps/dataloaders/marconi100.py @@ -28,7 +28,7 @@ import pandas as pd from tqdm import tqdm from ..job import job_dict, Job -from ..utils import power_to_utilization, next_arrival_byconfkwargs +from ..utils import power_to_utilization, WorkloadData def load_data(jobs_path, **kwargs): @@ -60,7 +60,6 @@ def load_data_from_df(jobs_df: pd.DataFrame, **kwargs): """ config = kwargs.get('config') # min_time = kwargs.get('min_time', None) # Unused - arrival = kwargs.get('arrival') validate = kwargs.get('validate') jid = kwargs.get('jid', '*') debug = kwargs.get('debug') @@ -165,17 +164,11 @@ def load_data_from_df(jobs_df: pd.DataFrame, **kwargs): print("wall_time != (end_time - start_time)") print(f"{wall_time} != {(end_time - start_time)}") - if arrival == 'poisson': # Modify the arrival times according to Poisson distribution - scheduled_nodes = None - submit_time = next_arrival_byconfkwargs(config, kwargs) - start_time = None - end_time = None - else: # Prescribed replay - scheduled_nodes = (jobs_df.loc[jidx, 'nodes']).tolist() + scheduled_nodes = (jobs_df.loc[jidx, 'nodes']).tolist() - submit_timestamp = jobs_df.loc[jidx, 'submit_time'] - diff = submit_timestamp - telemetry_start_timestamp - submit_time = int(diff.total_seconds()) + submit_timestamp = jobs_df.loc[jidx, 'submit_time'] + diff = submit_timestamp - telemetry_start_timestamp + submit_time = int(diff.total_seconds()) trace_time = gpu_trace.size * config['TRACE_QUANTA'] # seconds trace_start_time = 0 @@ -233,7 +226,11 @@ def load_data_from_df(jobs_df: pd.DataFrame, **kwargs): job = Job(job_info) jobs.append(job) - return jobs, telemetry_start, telemetry_end + return WorkloadData( + jobs=jobs, + telemetry_start=telemetry_start, telemetry_end=telemetry_end, + start_date=telemetry_start_timestamp, + ) def node_index_to_name(index: int, config: dict): diff --git a/raps/dataloaders/mit_supercloud/loader.py b/raps/dataloaders/mit_supercloud/loader.py index 605721084c6585a57d2494cca4756783530b64fc..e3103ba93bb35b8d3ec0a47bd462382340c54dc8 100644 --- a/raps/dataloaders/mit_supercloud/loader.py +++ b/raps/dataloaders/mit_supercloud/loader.py @@ -116,9 +116,9 @@ import re from tqdm import tqdm from typing import Dict, Union, Optional from collections import Counter - +from datetime import datetime, timezone from raps.job import job_dict, Job -from raps.utils import summarize_ranges +from raps.utils import summarize_ranges, WorkloadData from .utils import proc_cpu_series, proc_gpu_series, to_epoch from .utils import DEFAULT_START, DEFAULT_END @@ -602,20 +602,12 @@ def load_data(local_dataset_path, **kwargs): job = Job(current_job_dict) jobs_list.append(job) - # Calculate min_overall_utime and max_overall_utime - telemetry_start = int(sl.time_start.min()) - telemetry_end = int(sl.time_end.max()) - # min_overall_utime = int(sl.time_submit.min()) - # max_overall_utime = int(sl.time_submit.max()) - - # args_namespace = SimpleNamespace( - # fastforward=min_overall_utime, - # system='mit_supercloud', - # time=max_overall_utime - # ) - print("\nSkipped jobs summary:") for reason, count in skip_counts.items(): print(f"- {reason}: {count}") - return jobs_list, telemetry_start, telemetry_end # min_overall_utime, max_overall_utime, args_namespace + return WorkloadData( + jobs=jobs_list, + telemetry_start=0, telemetry_end=int(end_ts - start_ts), + start_date=datetime.fromtimestamp(start_ts, timezone.utc), + ) diff --git a/raps/engine.py b/raps/engine.py index d07b9c57a95635315fff9e0aadf9e940fe86b589..23a2605733ccaddae240ab2987ed525bd669c4ca 100644 --- a/raps/engine.py +++ b/raps/engine.py @@ -10,12 +10,11 @@ import os import select import time import random -import math from raps.job import Job, JobState from raps.policy import PolicyType from raps.utils import ( summarize_ranges, - get_current_utilization + get_current_utilization, ) from raps.resmgr import ResourceManager from raps.schedulers import load_scheduler @@ -41,9 +40,9 @@ from raps.downtime import Downtime from raps.weather import Weather from raps.sim_config import SimConfig from raps.system_config import SystemConfig - from bisect import bisect_right + @dataclasses.dataclass class TickData: """ Represents the state output from the simulation each tick """ @@ -266,8 +265,7 @@ class Engine: if sim_config.live and not sim_config.replay: td = Telemetry(**sim_config_dict) - jobs, timestep_start, timestep_end = \ - td.load_jobs_times_args_from_live_system() + workload_data = td.load_from_live_system() elif sim_config.replay: # TODO: this will have issues if running separate systems or custom systems partition_short = partition.split("/")[-1] if partition else None @@ -286,27 +284,20 @@ class Engine: else: replay_files = sim_config.replay - jobs, timestep_start, timestep_end, args_from_file = td.load_jobs_times_args_from_files( - files=replay_files, - args=sim_config_args, config=system_config_dict, - ) + workload_data = td.load_from_files(replay_files) else: # Synthetic jobs wl = Workload(sim_config_args, system_config_dict) - jobs = wl.generate_jobs() - timestep_start = 0 - if hasattr(jobs[0], 'end_time'): - timestep_end = int(math.ceil(max([job.end_time for job in jobs]))) - else: - timestep_end = 88200 # 24 hours - + workload_data = wl.generate_jobs() td = Telemetry(**sim_config_dict) + jobs = workload_data.jobs + # TODO refactor how stat/end/fastforward/time work if sim_config.fastforward is not None: - timestep_start = timestep_start + sim_config.fastforward + workload_data.telemetry_start = workload_data.telemetry_start + sim_config.fastforward if sim_config.time is not None: - timestep_end = timestep_start + sim_config.time + workload_data.telemetry_end = workload_data.telemetry_start + sim_config.time if sim_config.time_delta is not None: time_delta = sim_config.time_delta @@ -339,7 +330,7 @@ class Engine: system_config=system_config, ) - return engine, jobs, timestep_start, timestep_end, time_delta + return engine, workload_data, time_delta def add_running_jobs_to_queue(self, jobs_to_submit: List): """ @@ -536,7 +527,10 @@ class Engine: job.running_time = self.current_timestep - job.start_time if job.current_state != JobState.RUNNING: - raise ValueError(f"Job {job.id} is in running list, but state is not RUNNING: job.state == {job.current_state}") + raise ValueError( + f"Job {job.id} is in running list, " + + "but state is not RUNNING: job.state == {job.current_state}" + ) else: # if job.state == JobState.RUNNING: # Error checks if job.running_time > job.time_limit and job.end_time is not None: diff --git a/raps/multi_part_engine.py b/raps/multi_part_engine.py index 461425b4cadaac906c641c6c33adbe65e4d1079e..f211b8544d02d9cecabace278ee9483c34df2792 100644 --- a/raps/multi_part_engine.py +++ b/raps/multi_part_engine.py @@ -1,6 +1,7 @@ from collections.abc import Iterable from raps.engine import Engine, TickData from raps.sim_config import SimConfig +from raps.utils import WorkloadData class MultiPartEngine: @@ -17,29 +18,29 @@ class MultiPartEngine: if len(root_systems) > 1: raise ValueError("Replay for multi-system runs is not supported") - jobs_by_partition = {} + workloads_by_partition: dict[str, WorkloadData] = {} engines: dict[str, Engine] = {} timestep_start, timestep_end, time_delta = 0, 0, 0 for partition in sim_config.system_configs: name = partition.system_name - engine, jobs, timestep_start, timestep_end, time_delta = Engine.from_sim_config( + engine, workload_data, time_delta = Engine.from_sim_config( sim_config, partition=name, ) - for job in jobs: + for job in workload_data.jobs: job.partition = name - jobs_by_partition[name] = jobs + workloads_by_partition[name] = workload_data engines[name] = engine - total_initial_jobs = sum(len(j) for j in jobs_by_partition.values()) + total_initial_jobs = sum(len(j.jobs) for j in workloads_by_partition.values()) for engine in engines.values(): engine.total_initial_jobs = total_initial_jobs multi_engine = MultiPartEngine( engines=engines, - jobs=jobs_by_partition, + jobs={p: w.jobs for p, w in workloads_by_partition.items()}, ) - return multi_engine, jobs_by_partition, timestep_start, timestep_end, time_delta + return multi_engine, workloads_by_partition, timestep_start, timestep_end, time_delta def run_simulation(self, jobs: dict, timestep_start, timestep_end, time_delta=1 ) -> Iterable[dict[str, TickData | None]]: diff --git a/raps/run_sim.py b/raps/run_sim.py index 7587dbbbdfe52da37dd1278fd1a0e5612547e798..ceb80a9a428f2850b3eb42a48fb86ebb5733e2b9 100644 --- a/raps/run_sim.py +++ b/raps/run_sim.py @@ -76,18 +76,18 @@ def run_sim(sim_config: SimConfig): print("Use run-multi-part to run multi-partition simulations") sys.exit(1) - engine, jobs, timestep_start, timestep_end, time_delta = Engine.from_sim_config(sim_config) + engine, workload_data, time_delta = Engine.from_sim_config(sim_config) out = sim_config.output if out: out.mkdir(parents=True) engine.telemetry.save_snapshot( - jobs=jobs, - timestep_start=timestep_start, - timestep_end=timestep_end, - args=sim_config.get_legacy_args(), filename=str(out), + dest=str(out), + result=workload_data, + args=sim_config, ) - + jobs = workload_data.jobs + timestep_start, timestep_end = workload_data.telemetry_start, workload_data.telemetry_end total_timesteps = timestep_end - timestep_start downscale = sim_config.downscale @@ -242,7 +242,8 @@ def run_multi_part_sim_add_parser(subparsers: SubParsers): def run_multi_part_sim(sim_config: SimConfig): - multi_engine, jobs, timestep_start, timestep_end, time_delta = MultiPartEngine.from_sim_config(sim_config) + multi_engine, workload_results, timestep_start, timestep_end, time_delta = \ + MultiPartEngine.from_sim_config(sim_config) # TODO: The mit_supercloud dataloader seems to be outputting the wrong timesteps? mit_supercloud # is the only multi-partition system with replay, so just manually overriding the timesteps here @@ -253,11 +254,11 @@ def run_multi_part_sim(sim_config: SimConfig): if sim_config.output: for part, engine in multi_engine.engines.items(): engine.telemetry.save_snapshot( - jobs=jobs[part], - timestep_start=timestep_start, timestep_end=timestep_end, - filename=part.split('/')[-1], - args=sim_config.get_legacy_args(), + dest=str(sim_config.output / part.split('/')[-1]), + result=workload_results[part], + args=sim_config, ) + jobs = {p: w.jobs for p, w in workload_results.items()} ui_update_freq = sim_config.system_configs[0].scheduler.ui_update_freq gen = multi_engine.run_simulation(jobs, timestep_start, timestep_end, time_delta) diff --git a/raps/sim_config.py b/raps/sim_config.py index f003a3e5f4e600a7735a15ebecb0d78f4d3cf4a4..a3091bf0669a3275be26d8d73cf91b983d299e35 100644 --- a/raps/sim_config.py +++ b/raps/sim_config.py @@ -48,9 +48,9 @@ class SimConfig(BaseModel): def downscale(self) -> int: return int(timedelta(seconds=1) / self.time_unit) - start: str = "2021-05-21T13:00" + start: str = "2021-05-21T13:00:00-04:00" """ ISO8601 start of simulation """ - end: str = "2021-05-21T14:00" + end: str = "2021-05-21T14:00:00-04:00" """ ISO8601 end of simulation """ numjobs: int = 100 diff --git a/raps/telemetry.py b/raps/telemetry.py index 3f883def7053f91e0891fb8267f76a760b1182d6..340b9ae5e0cdceddbd5288f4f262fc81b824bcb5 100644 --- a/raps/telemetry.py +++ b/raps/telemetry.py @@ -7,20 +7,18 @@ The module defines a `Telemetry` class for managing telemetry data and several helper functions for data encryption and conversion between node name and index formats. """ from typing import Literal -import sys import random from pathlib import Path # import json from typing import Optional from types import ModuleType - import importlib import numpy as np import pandas as pd -from tqdm import tqdm -from pydantic import BaseModel +from pydantic import BaseModel, model_validator # from rich.progress import track +from raps.sim_config import SimConfig from raps.system_config import get_system_config from raps.job import Job, job_dict import matplotlib.pyplot as plt @@ -30,10 +28,49 @@ from raps.plotting import ( plot_network_histogram ) from raps.utils import ( - next_arrival_byconfargs, convert_to_time_unit, pydantic_add_args, SubParsers, ExpandedPath, + next_arrival_byconfargs, pydantic_add_args, SubParsers, ExpandedPath, WorkloadData, ) +# TODO: should reuse this model in SimConfig +class TelemetryArgs(BaseModel): + jid: str = '*' + """ Replay job id """ + replay: list[ExpandedPath] | None = None + """ path/to/joblive path/to/jobprofile -or- filename.npz (overrides --workload option) """ + plot: list[Literal["jobs", "nodes"]] | None = None + is_results_file: bool = False + """ Output plots """ + gantt_nodes: bool = False + """ Print Gannt with nodes required as line thickness (default false) """ + time: str | None = None + """ Length of time to simulate, e.g., 123, 123s, 27m, 3h, 7d """ + system: str = 'frontier' + """ System config to use """ + arrival: Literal['prescribed', 'poisson'] = "prescribed" + """ Modify arrival distribution ({choices[1]}) or use the original submit times """ + verbose: bool = False + output: str | None = None + """ Store output in --output file. """ + live: bool = False + """ Grab data from live system. """ + + @model_validator(mode="after") + def _validate_after(self): + if not self.live and not self.replay: + raise ValueError("Either --live or --replay is required") + return self + + +shortcuts = { + "replay": "f", + "plot": "p", + "time": "t", + "verbose": "v", + "output": "o", +} + + class Telemetry: """A class for handling telemetry data, including reading/parsing job data, and loading/saving snapshots.""" dataloader: Optional[ModuleType] @@ -49,16 +86,18 @@ class Telemetry: print(f"WARNING: Failed to load dataloader: {e}") self.dataloader = None - def save_snapshot(self, *, jobs: list, timestep_start: int, timestep_end: int, args: dict, filename: str): + def save_snapshot(self, *, dest: str, result: WorkloadData, args: SimConfig | TelemetryArgs): """Saves a snapshot of the jobs to a compressed file. """ - list_of_job_dicts = [] - for job in jobs: - list_of_job_dicts.append(job.__dict__) - np.savez_compressed(filename, jobs=list_of_job_dicts, timestep_start=timestep_start, - timestep_end=timestep_end, args=args) + np.savez_compressed(dest, + jobs=[vars(j) for j in result.jobs], + telemetry_start=result.telemetry_start, + telemetry_end=result.telemetry_end, + start_date=result.start_date, + args=args, + ) - def load_snapshot(self, snapshot: str, downscale=1) -> list: - """Reads a snapshot from a compressed file and return 4 values: joblist, timestep_start, timestep_end and args. + def load_snapshot(self, snapshot: str | Path) -> tuple[WorkloadData, SimConfig | TelemetryArgs]: + """Reads a snapshot from a compressed file :param str snapshot: Filename :returns: @@ -68,28 +107,19 @@ class Telemetry: - args, which were used to generate the loaded snapshot """ data = np.load(snapshot, allow_pickle=True, mmap_mode='r') - jobs = [] - list_of_job_dicts = data['jobs'].tolist() - for job_info in list_of_job_dicts: - jobs.append(Job(job_info)) - if 'timestep_start' in data: - timestep_start = int(data['timestep_start']) - else: - timestep_start = 0 - if 'timestep_end' in data: - timestep_end = int(data['timestep_end']) - else: - timestep_end = np.inf - raise ValueError("Invalid timestep_end in snapshot") - if 'args' in data: - args_from_file = data['args'].tolist() - else: - args_from_file = None + jobs = [Job(j) for j in data['jobs']] + telemetry_start = data['telemetry_start'].item() + telemetry_end = data['telemetry_end'].item() + start_date = data['start_date'].item() + args = data['args'].item() - return jobs, \ - timestep_start, \ - timestep_end, \ - args_from_file + result = WorkloadData( + jobs=jobs, + telemetry_start=telemetry_start, telemetry_end=telemetry_end, + start_date=start_date, + ) + + return result, args def load_csv_results(self, file): jobs = [] @@ -136,7 +166,7 @@ class Telemetry: # else: # args_from_file = None - return jobs, time_start, time_end, None + return jobs, time_start, time_end def load_data(self, files): """Load telemetry data using custom data loaders.""" @@ -148,43 +178,6 @@ class Telemetry: assert self.dataloader return self.dataloader.load_live_data(**self.kwargs) - def load_data_from_df(self, *args, **kwargs): - """Load telemetry data using custom data loaders.""" - assert self.dataloader - return self.dataloader.load_data_from_df(*args, **kwargs) - - def load_data_from_csv(self, file, *args, **kwargs): - jobs = [] - df = pd.read_csv(file, chunksize=1, header='infer') - for d in df: - # print(d['name'].astype(str)) - job_info = job_dict(nodes_required=None, - name=d['name'].astype(str).item(), - account=d['account'].astype(str).item(), - cpu_trace=None, - gpu_trace=None, - ntx_trace=None, - nrx_trace=None, - end_state=d['state'].astype(str).item(), - scheduled_nodes=d['scheduled_nodes'].item(), - id=d['id'].astype(int).item(), - priority=None, - partition=None, - submit_time=d['submit_time'].astype(int).item(), - time_limit=None, - start_time=d['start_time'].astype(int).item(), - end_time=d['end_time'].astype(int).item(), - wall_time=d['end_time'].astype(int).item() - d['start_time'].astype(int).item(), - trace_time=None, - trace_start_time=None, - trace_end_time=None, - trace_missing_values=None - ) - jobs.append(job_info) - minstarttime = min([x['start_time'] for x in jobs]) - maxendtime = max([x['end_time'] for x in jobs]) - return jobs, minstarttime, maxendtime, None - def node_index_to_name(self, index: int): """ Convert node index into a name""" assert self.dataloader @@ -200,105 +193,46 @@ class Telemetry: assert self.dataloader return self.dataloader.cdu_pos(index, config=self.config) - def load_jobs_times_args_from_live_system(self): - jobs, timestep_start, timestep_end = self.load_live_data() - # data_args = None - return jobs, timestep_start, timestep_end + def load_from_live_system(self) -> WorkloadData: + result = self.load_live_data() + return result - def load_jobs_times_args_from_files(self, *, files, args, config, downscale=1): + def load_from_files(self, files) -> WorkloadData: """ Load all files as combined jobs """ - # Read telemetry data (either npz file or via custom data loader) - # TODO: Merge args? See main.py:79 - timestep_end = 0 - timestep_start = sys.maxsize - jobs = [] - trigger_custom_dataloader = False - for i, file in enumerate(files): - file = str(Path(file)) - if hasattr(args, 'is_results_file') and args.is_results_file: - if file.endswith(".csv"): - jobs, timestep_start, timestep, _ = self.load_csv_results(file) - - elif file.endswith(".npz"): # Replay .npz file - print(f"Loading {file}...") - jobs_from_file, timestep_start_from_file, timestep_end_from_file, args_from_file = self.load_snapshot( - file) - if args_from_file is not None: - print(f"File was generated with:" - f"\n--system {args_from_file.system} ") - if hasattr(args_from_file, 'fastforward'): - print(f"--ff {args_from_file.fastforward} ") - if hasattr(args_from_file, 'time'): - print(f"-t {args_from_file.time}") - print(f"All Args:\n{args_from_file}" - "\nTo use these set them from the commandline!") + assert len(files) >= 1 + files = [Path(f) for f in files] + + if str(files[0]).endswith(".npz"): + data: WorkloadData | None = None + for file in files: + print(f"Loading {file}") + new_data, args_from_file = self.load_snapshot(file) + print(f"File was generated with: --system {args_from_file.system}") + if not data: + data = new_data else: - print("No generation arguments extracted from input file!") - # Args are usually extracted to tell the users how to reporduce results. - # They are not processed and re-set to said arguments automatily - jobs.extend(jobs_from_file) - timestep_start = min(timestep_start, timestep_start_from_file) - timestep_end = max(timestep_end, timestep_end_from_file) - - if hasattr(args, 'scale') and args.scale: - for job in tqdm(jobs, desc=f"Scaling jobs to {args.scale} nodes"): - job['nodes_required'] = random.randint(1, args.scale) - job['scheduled_nodes'] = None # Setting to None triggers scheduler to assign nodes - - if hasattr(args, 'arrival') and args.arrival == 'poisson': - print("available nodes:", config['AVAILABLE_NODES']) - for job in tqdm(jobs, desc="Rescheduling jobs"): - job['scheduled_nodes'] = None - job['submit_time'] = next_arrival_byconfargs(config, args) - else: - trigger_custom_dataloader = True - break - - if trigger_custom_dataloader: # custom data loader - try: - jobs, timestep_start_from_data, timestep_end_from_data = self.load_data(args.replay) - except AssertionError: - raise ValueError("Forgot --is-results-file ?") - timestep_start = min(timestep_start, timestep_start_from_data) - timestep_end = max(timestep_end, timestep_end_from_data) - if args.time: - timestep_end = timestep_start + convert_to_time_unit(args.time) - elif not timestep_end: - timestep_end = int(max(job.wall_time + job.start_time for job in jobs)) + 1 - - return jobs, timestep_start, timestep_end, args - - -class TelemetryArgs(BaseModel): - jid: str = '*' - """ Replay job id """ - replay: list[ExpandedPath] | None = None - """ path/to/joblive path/to/jobprofile -or- filename.npz (overrides --workload option) """ - plot: list[Literal["jobs", "nodes"]] | None = None - """ Output plots """ - is_results_file: bool = False - gantt_nodes: bool = False - """ Print Gannt with nodes required as line thickness (default false) """ - time: str | None = None - """ Length of time to simulate, e.g., 123, 123s, 27m, 3h, 7d """ - system: str = 'frontier' - """ System config to use """ - arrival: Literal['prescribed', 'poisson'] = "prescribed" - """ Modify arrival distribution ({choices[1]}) or use the original submit times """ - verbose: bool = False - output: str | None = None - """ Store output in --output file. """ - live: bool = False - """ Grab data from live system. """ - - -shortcuts = { - "replay": "f", - "plot": "p", - "time": "t", - "verbose": "v", - "output": "o", -} + data.jobs.extend(new_data.jobs) + data.telemetry_start = min(data.telemetry_start, new_data.telemetry_start) + data.telemetry_end = min(data.telemetry_end, new_data.telemetry_end) + data.start_date = min(data.start_date, new_data.start_date) + else: # custom data loader + data = self.load_data(files) + self.update_jobs(data.jobs) + return data + + def update_jobs(self, jobs: list[Job]): + """ Updates jobs with new scale or random start times """ + if self.kwargs.get("scale"): + for job in jobs: + job.nodes_required = random.randint(1, self.kwargs['scale']) + job.scheduled_nodes = None # Setting to None triggers scheduler to assign nodes + + if self.kwargs['arrival'] == "poisson": + for job in jobs: + job.scheduled_nodes = None + job.submit_time = next_arrival_byconfargs(self.config, self.kwargs) + job.start_time = None + job.end_time = None def run_telemetry_add_parser(subparsers: SubParsers): @@ -317,25 +251,20 @@ def run_telemetry(args: TelemetryArgs): args_dict['config'] = config td = Telemetry(**args_dict) + if args.is_results_file and args.replay: + file = str(args.replay[0]) + jobs, timestep_start, timestep_end = td.load_csv_results(file) if args.live and not args.replay: - td = Telemetry(**args_dict) - jobs, timestep_start, timestep_end = \ - td.load_jobs_times_args_from_live_system() - if args.output: - td.save_snapshot( - jobs=jobs, timestep_start=timestep_start, - timestep_end=timestep_end, args=args, filename=args.output, - ) - - elif args.replay: - jobs, timestep_start, timestep_end, _ = \ - td.load_jobs_times_args_from_files(files=args.replay, - args=args, - config=config) - + result = td.load_from_live_system() + jobs = result.jobs + timestep_start, timestep_end = result.telemetry_start, result.telemetry_end else: - print("Either --live or --replay is required") - sys.exit(1) + result = td.load_from_files(args.replay) + jobs = result.jobs + timestep_start, timestep_end = result.telemetry_start, result.telemetry_end + + if args.output: + td.save_snapshot(dest=args.output, result=result, args=args) timesteps = timestep_end - timestep_start diff --git a/raps/utils.py b/raps/utils.py index fe7af8fbda13727ddbd909940339ac934bdc4c9b..323ac8af423094ff421ff009c230fb0056997f25 100644 --- a/raps/utils.py +++ b/raps/utils.py @@ -21,7 +21,7 @@ import json import argparse from pathlib import Path from typing import Annotated as A, TypeVar, Callable, TypeAlias -from pydantic import BaseModel, TypeAdapter, AfterValidator +from pydantic import BaseModel, TypeAdapter, AfterValidator, ConfigDict, AwareDatetime from pydantic_settings import BaseSettings, SettingsConfigDict, CliApp, CliSettingsSource import yaml from raps.job import Job @@ -633,6 +633,9 @@ ExpandedPath = A[Path, AfterValidator(lambda v: Path(v).expanduser().resolve())] """ Type that that expands ~ and environment variables in a path string """ +SmartTimedelta = A[timedelta, AfterValidator(parse_td)] +""" Can be passed as ISO 8601 format like PT5M, or a string like 9s, or a number of seconds """ + T = TypeVar("T", bound=BaseModel) @@ -706,3 +709,96 @@ def yaml_dump(data): indent=2, allow_unicode=True, ) + + +class WorkloadData(BaseModel): + """ + Represents a workload, a list of jobs with some metadata. Returned by dataloaders load_data() + function, and by Workload.generate_jobs(). + + jobs: + The list of parsed jobs. + + telemetry_start + the first timestep in which the simulation be executed. + + telemetry_end + the last timestep in which the simulation can be executed. + + start_date + The actual date that telemetry_start represents. + ---- + Explanation regarding times: + + The loaded dataframe contains + a first timestamp with associated data + and a last timestamp with associated data + + These form the maximum extent of the simuluation time. + telemetry_start and telemetry_end. + + [ ] + ^ ^ + telemetry_start telemetry_end + + These values form the maximum extent of the simulation. + telemetry_start is typically 0, but any int can be used as long as all the times in the + jobs are relative to the telemetry_start. + + Next is the actual extent of the simulation: + + [ ] + ^ ^ + simulation_start simulation_end + + The simulation will start at telemetry_start by default, but the user can specify an explicit + simulation start time. + + Additionally, jobs can have started before telemetry_start, + And can have a recorded ending after simulation_end, + [ ] + ^ ^ + first_start_timestamp last_end_timestamp + + This means that the time between first_start_timestamp and telemetry_start + has no associated values in the traces! + The missing values after simulation_end can be ignored, as the simulatuion + will have stoped before. + + However, the times before telemetry_start have to be padded to generate + correct offsets within their data! + Within the simulation a job's current time is specified as the difference + between its start_time and the current timestep of the simulation. + + With this each job's + - submit_time + - time_limit + - start_time # Maybe Null + - end_time # Maybe Null + - expected_run_time (end_time - start_time) # Maybe Null + - current_run_time (How long did the job run already, when loading) # Maybe zero + - trace_time (lenght of each trace in seconds) # Maybe Null + - trace_start_time (time offset in seconds after which the trace starts) # Maybe Null + - trace_end_time (time offset in seconds after which the trace ends) # Maybe Null + - trace_quanta (job's associated trace quanta, to correctly replay with different trace quanta) # Maybe Null + has to be set for use within the simulation + + The values trace_start_time are similar to the telemetry_start and + telemetry_stop but may different due to missing data, for each job. + + The returned values are these: + - The list of parsed jobs. (as a Job object) + - telemetry_start: int (in seconds) + - telemetry_end: int (in seconds) + - start_date: datetime + """ + jobs: list[Job] + telemetry_start: int + telemetry_end: int + # TODO: It might make more sense to make start_timestep/end_timestep always unix time, then we + # wouldn't need this extra start_date field. + start_date: AwareDatetime + + model_config = ConfigDict( + arbitrary_types_allowed=True, + ) diff --git a/raps/workload.py b/raps/workload.py index f8ac77bf73a0ab2e785fd92fbd2991baac1c24c3..6fb3c3b1896b5780fdbb4b677180385878a55053 100644 --- a/raps/workload.py +++ b/raps/workload.py @@ -30,7 +30,8 @@ from raps.utils import ( determine_state, next_arrival, next_arrival_byconfargs, truncated_weibull, - truncated_weibull_float + truncated_weibull_float, + WorkloadData, ) import math import random @@ -66,7 +67,12 @@ class Workload: # This function calls the job generation function as specified by the workload keyword. # The respective funciton of this class is called. jobs = getattr(self, self.args.workload)(args=self.args) - return jobs + timestep_end = int(math.ceil(max([job.end_time for job in jobs]))) + return WorkloadData( + jobs=jobs, + telemetry_start=0, telemetry_end=timestep_end, + start_date=self.args.start, + ) def compute_traces(self, cpu_util: float, @@ -972,10 +978,10 @@ def run_workload(sim_config: SimConfig): if sim_config.replay: td = Telemetry(**args_dict) - jobs, _, _, _ = td.load_jobs_times_args_from_files(files=sim_config.replay, args=args, config=config) + jobs = td.load_from_files(sim_config.replay).jobs else: workload = Workload(args, config) - jobs = getattr(workload, sim_config.workload)(args=sim_config.get_legacy_args) + jobs = getattr(workload, sim_config.workload)(args=sim_config.get_legacy_args()) plot_job_hist(jobs, config=config, dist_split=sim_config.multimodal, @@ -994,5 +1000,5 @@ def continuous_job_generation(*, engine, timestep, jobs): # print("if len(engine.queue) <= engine.continuous_workload.args.maxqueue:") # print(f"if {len(engine.queue)} <= {engine.continuous_workload.args.maxqueue}:") if len(engine.queue) <= engine.continuous_workload.args.maxqueue: - new_jobs = engine.continuous_workload.generate_jobs() + new_jobs = engine.continuous_workload.generate_jobs().jobs jobs.extend(new_jobs) diff --git a/tests/systems/conftest.py b/tests/systems/conftest.py index 269d101fc5c4b0c017a046565790014f468ceab1..2703755135de9a9ad6028267d51839df0651e3bc 100644 --- a/tests/systems/conftest.py +++ b/tests/systems/conftest.py @@ -63,6 +63,18 @@ def system_config(system): "time_delta": True, "net": False, }, + "bluewaters": { + "main": True, + "telemetry": True, + "multi-part-sim": False, + "withdata": True, + "cooling": False, + "uncertainty": False, + "time": True, + "fastforward": True, + "time_delta": True, + "net": False, + }, "frontier": { "main": True, "telemetry": True, @@ -89,10 +101,10 @@ def system_config(system): "net": False, }, "gcloudv2": { - "main": False, - "telemetry": False, + "main": True, + "telemetry": True, "multi-part-sim": False, - "withdata": False, + "withdata": True, "cooling": False, "uncertainty": False, "time": True, @@ -185,6 +197,7 @@ def system_files(system): files = { "40frontiers": [], "adastraMI250": ["adastraMI250/AdastaJobsMI250_15days.parquet"], + "bluewaters": ["bluewaters"], "frontier": ["frontier/slurm/joblive/date=2024-01-18/", "frontier/jobprofile/date=2024-01-18/"], "fugaku": ["fugaku/21_04.parquet"], "gcloudv2": ["gcloud/v2/google_cluster_data_2011_sample"], diff --git a/tests/systems/test_engine.py b/tests/systems/test_engine.py index ce4087891e17b8d64d9fc6eea88cd983af17c672..e483b18b15b7c7eecc84be180ad5cadbcd0aab37 100644 --- a/tests/systems/test_engine.py +++ b/tests/systems/test_engine.py @@ -22,7 +22,10 @@ def test_engine(system, system_config, sim_output): "system": system, "time": "2m", }) - engine, jobs, timestep_start, timestep_end, time_delta = Engine.from_sim_config(sim_config) + engine, workload_data, time_delta = Engine.from_sim_config(sim_config) + jobs = workload_data.jobs + timestep_start = workload_data.telemetry_start + timestep_end = workload_data.telemetry_end ticks = list(engine.run_simulation(jobs, timestep_start, timestep_end, time_delta)) assert len(ticks) == 120