diff --git a/.gitignore b/.gitignore index c5f7241ff90d5d4225c61e5570e1b581b8866d1e..3e87161fa5112e2302c1454258a841ba872cc772 100644 --- a/.gitignore +++ b/.gitignore @@ -9,3 +9,4 @@ models/fmu-models .shell-completion-cache raps-output-* ppo_raps_logs +/data diff --git a/config/selene.yaml b/config/selene.yaml index 0520da1822532e6de6962690e4bc5d075dbe7370..fcf8c1a34b967dd95c9d58073bf3a188624f1e25 100644 --- a/config/selene.yaml +++ b/config/selene.yaml @@ -4,7 +4,7 @@ system: nodes_per_rack: 4 rectifiers_per_rack: 32 chassis_per_rack: 4 - nodes_per_blade: 2 + nodes_per_blade: 1 switches_per_chassis: 4 nics_per_node: 4 rectifiers_per_chassis: 4 diff --git a/main.py b/main.py index 18ecd9a50688ec12705315c3ad6365a688727ada..8ae0c064a4706645f13feba4243d2bb910572077 100755 --- a/main.py +++ b/main.py @@ -69,7 +69,7 @@ def main(cli_args: list[str] | None = None): from raps.run_sim import run_sim_add_parser, run_parts_sim_add_parser, show_add_parser from raps.workloads import run_workload_add_parser - from raps.telemetry import run_telemetry_add_parser + from raps.telemetry import run_telemetry_add_parser, run_download_add_parser from raps.train_rl import train_rl_add_parser parser = argparse.ArgumentParser( @@ -85,6 +85,7 @@ def main(cli_args: list[str] | None = None): show_add_parser(subparsers) run_workload_add_parser(subparsers) run_telemetry_add_parser(subparsers) + run_download_add_parser(subparsers) train_rl_add_parser(subparsers) shell_completion_add_parser(subparsers) diff --git a/raps/dataloaders/adastraMI250.py b/raps/dataloaders/adastraMI250.py index ed60807d0818c5209eaa7998109b0712e47dd661..0f7f3663f32b06c787e1b6187d2ca6ff84b1ab4f 100644 --- a/raps/dataloaders/adastraMI250.py +++ b/raps/dataloaders/adastraMI250.py @@ -1,28 +1,30 @@ """ +# get the data +``` +raps download --system adastraMI250 +``` +This will download the dataset from https://zenodo.org/records/14007065/files/AdastaJobsMI250_15days.parquet - # get the data - Download `AdastaJobsMI250_15days.parquet` from - https://zenodo.org/records/14007065/files/AdastaJobsMI250_15days.parquet +# to simulate the dataset +raps run -f /path/to/AdastaJobsMI250_15days.parquet --system adastraMI250 +# to replay with different scheduling policy +raps run -f /path/to/AdastaJobsMI250_15days.parquet --system adastraMI250 --policy priority --backfill easy - # to simulate the dataset - raps run -f /path/to/AdastaJobsMI250_15days.parquet --system adastraMI250 - - # to replay with different scheduling policy - raps run -f /path/to/AdastaJobsMI250_15days.parquet --system adastraMI250 --policy priority --backfill easy - - # to run a specific time range - raps run -f /path/to/AdastaJobsMI250_15days.parquet --system adastraMI250 \ - --start 2024-11-01T00:00:00Z --end 2024-11-02T00:00:00Z - - # to analyze dataset - python -m raps.telemetry -f /path/to/AdastaJobsMI250_15days.parquet --system adastraMI250 -v +# to run a specific time range +raps run -f /path/to/AdastaJobsMI250_15days.parquet --system adastraMI250 \ + --start 2024-11-01T00:00:00Z --end 2024-11-02T00:00:00Z +# to analyze dataset +python -m raps.telemetry -f /path/to/AdastaJobsMI250_15days.parquet --system adastraMI250 -v """ import uuid import numpy as np import pandas as pd +from pathlib import Path +from datetime import datetime from tqdm import tqdm +import urllib.request from ..job import job_dict, Job from ..utils import WorkloadData @@ -279,3 +281,11 @@ def cdu_pos(index: int, config: dict) -> tuple[int, int]: name = CDU_NAMES[index - 1] row, col = int(name[2]), int(name[3:5]) return (row, col) + + +def download(dest: Path, start: datetime | None, end: datetime | None): + dest.mkdir(parents=True) + filename = "AdastaJobsMI250_15days.parquet" + print(f"Downloading {filename}") + urllib.request.urlretrieve(f"https://zenodo.org/records/14007065/files/{filename}", dest / filename) + print("Done!") diff --git a/raps/dataloaders/frontier.py b/raps/dataloaders/frontier.py index 23efd2fbcfb30476e050a19c8ebe4094572e6305..391a84ed99848cd1921553c40eb12b103f484556 100644 --- a/raps/dataloaders/frontier.py +++ b/raps/dataloaders/frontier.py @@ -1,22 +1,25 @@ """ - Note: Frontier telemetry data is not publicly available. +Note: Frontier telemetry data is not publicly available. - # To simulate - DATEDIR="date=2024-01-18" - DPATH=/path/to/data - raps run -f $DPATH/slurm/joblive/$DATEDIR,$DPATH/jobprofile/$DATEDIR +# To simulate +DATEDIR="date=2024-01-18" +DPATH=/path/to/data +raps run -f $DPATH/slurm/joblive/$DATEDIR,$DPATH/jobprofile/$DATEDIR - # To analyze the data - python -m raps.telemetry -f $DPATH/slurm/joblive/$DATEDIR,$DPATH/jobprofile/$DATEDIR +# To analyze the data +python -m raps.telemetry -f $DPATH/slurm/joblive/$DATEDIR,$DPATH/jobprofile/$DATEDIR """ import time -from datetime import datetime, timezone +from datetime import datetime, timezone, timedelta +from zoneinfo import ZoneInfo import numpy as np import pandas as pd +import subprocess from tqdm import tqdm +from pathlib import Path from ..job import job_dict, Job -from ..utils import power_to_utilization, encrypt, WorkloadData +from ..utils import power_to_utilization, encrypt, WorkloadData, date_range def aging_boost(nnodes): @@ -609,3 +612,42 @@ def cdu_pos(index: int, config: dict) -> tuple[int, int]: name = CDU_NAMES[index - 1] row, col = int(name[2]), int(name[3:5]) return (row, col) + + +def download(dest: Path, start: datetime | None, end: datetime | None): + HOST = "dtn.ccs.ornl.gov" + DATA_LAKE = "/lustre/orion/stf218/proj-shared/data/lake/frontier" + + print("Downloading the Frontier dataset requires access permissions.") + print("If you have access you can download via SSH.") + USERNAME = input("NCCS Username: ") + # jobs are indexed by submission time so download a few extra days to make sure we get all that + # ran over start -> end + if start: + start = (start - timedelta(days=2)).astimezone(ZoneInfo("UTC")) + else: + start = datetime.fromisoformat("2023-09-01T00:00:00Z") + if end: + end = (end + timedelta(days=2)).astimezone(ZoneInfo("UTC")) + else: + end = datetime.now(ZoneInfo("UTC")) + + days = list(date_range(start, end)) + + dest.mkdir(parents=True) + subprocess.run(["rsync", "-rvm", + *[f"--include=date={d.date().isoformat()}/***" for d in days], + "--exclude", '*', + f"{USERNAME}@{HOST}:{DATA_LAKE}/jobprofile/jobprofile/", + str(dest / "jobprofile") + ], check=True, text=True) + + (dest / 'slurm').mkdir(parents=True) + subprocess.run(["rsync", "-rvm", + *[f"--include=date={d.date().isoformat()}/***" for d in days], + "--exclude", '*', + f"{USERNAME}@{HOST}:{DATA_LAKE}/slurm/joblive/", + str(dest / "slurm/joblive") + ], check=True, text=True) + + print("Done!") diff --git a/raps/dataloaders/fugaku.py b/raps/dataloaders/fugaku.py index 5a531fa19742e97c8209d89a86c94b2e26fa105d..1442ad21df4d5d95904e50e438c10d87be70b4c2 100644 --- a/raps/dataloaders/fugaku.py +++ b/raps/dataloaders/fugaku.py @@ -1,21 +1,26 @@ """ - Download parquet files from https://zenodo.org/records/11467483 +Uses the fugaku dataset published at https://zenodo.org/records/11467483 - Note that F-Data doesn't give a list of nodes used, so we set 'scheduled_nodes' to None - which triggers the scheduler to schedule the nodes itself. +Note that F-Data doesn't give a list of nodes used, so we set 'scheduled_nodes' to None +which triggers the scheduler to schedule the nodes itself. - Also, power in F-Data is only given at node-level. We can use node-level power by - adding the --validate option. +Also, power in F-Data is only given at node-level. We can use node-level power by +adding the --validate option. - The '--arrival poisson' will compute submit times from Poisson distribution, instead of using - the submit times given in F-Data. +The '--arrival poisson' will compute submit times from Poisson distribution, instead of using +the submit times given in F-Data. - raps run --system fugaku -f /path/to/21_04.parquet - raps run --system fugaku -f /path/to/21_04.parquet --validate - raps run --system fugaku -f /path/to/21_04.parquet --policy priority --backfill easy +raps run --system fugaku -f /path/to/21_04.parquet +raps run --system fugaku -f /path/to/21_04.parquet --validate +raps run --system fugaku -f /path/to/21_04.parquet --policy priority --backfill easy """ import pandas as pd from tqdm import tqdm +from datetime import datetime +from pathlib import Path +from zoneinfo import ZoneInfo +import urllib.request +import requests from ..job import job_dict, Job from ..utils import WorkloadData @@ -180,3 +185,27 @@ def cdu_index_to_name(index: int, config: dict): def cdu_pos(index: int, config: dict) -> tuple[int, int]: """ Return (row, col) tuple for a cdu index """ return (0, index) # TODO + + +def download(dest: Path, start: datetime | None, end: datetime | None): + tz = ZoneInfo("Asia/Tokyo") + + files = requests.get("https://zenodo.org/api/records/11467483").json()["files"] + files = [f for f in files if f['key'].endswith(".parquet")] + files = sorted(files, key=lambda f: f['key']) + + # TODO: I think fugaku data is indexed by submission time not start time, so filtering by + # filename will probably miss some jobs that ran over start -> end + if start: + start_file = start.astimezone(tz).strftime("%y_%m.parquet") + files = [f for f in files if f['key'] >= start_file] + if end: + end_file = end.astimezone(tz).strftime("%y_%m.parquet") + files = [f for f in files if f['key'] <= end_file] + + dest.mkdir(parents=True) + for file in files: + print(f"Downloading {file['key']}") + urllib.request.urlretrieve(file['links']['self'], dest / file['key']) + + print("Done!") diff --git a/raps/dataloaders/lassen.py b/raps/dataloaders/lassen.py index db86513051cf22d86e6f9940bd0305ac6763b808..06d9a98dd66ef227f6e6dfc7ca474eefe0273578 100644 --- a/raps/dataloaders/lassen.py +++ b/raps/dataloaders/lassen.py @@ -10,23 +10,22 @@ Reference: Usage Instructions: - git clone https://github.com/LLNL/LAST/ && cd LAST - git lfs pull + raps download --system lassen # to analyze dataset and plot histograms - raps telemetry -f /path/to/LAST/Lassen-Supercomputer-Job-Dataset --system lassen --plot + raps telemetry -f ./data/lassen/Lassen-Supercomputer-Job-Dataset --system lassen --plot # to simulate the dataset as submitted - raps run -f /path/to/LAST/Lassen-Supercomputer-Job-Dataset --system lassen + raps run -f ./data/lassen/Lassen-Supercomputer-Job-Dataset --system lassen # to modify the submit times of the telemetry according to Poisson distribution - raps run -f /path/to/LAST/Lassen-Supercomputer-Job-Dataset --system lassen --arrival poisson + raps run -f ./data/lassen/Lassen-Supercomputer-Job-Dataset --system lassen --arrival poisson # to fast-forward 365 days and replay for 1 day. This region day has 2250 jobs with 1650 jobs executed. - raps run -f /path/to/LAST/Lassen-Supercomputer-Job-Dataset --system lassen --start '2019-08-22T00:00:00+00:00' -t 1d + raps run -f ./data/lassen/Lassen-Supercomputer-Job-Dataset --system lassen --start '2019-08-22T00:00:00+00:00' -t 1d # For the network replay this command gives suiteable snapshots: - raps run -f /path/to/LAST/Lassen-Supercomputer-Job-Dataset --system lassen --policy fcfs --backfill firstfit -t 12h --arrival poisson # noqa + raps run -f ./data/lassen/Lassen-Supercomputer-Job-Dataset --system lassen --policy fcfs --backfill firstfit -t 12h --arrival poisson # noqa """ import math @@ -35,6 +34,9 @@ import uuid import numpy as np import pandas as pd from tqdm import tqdm +from pathlib import Path +import subprocess +import shutil from datetime import datetime, timedelta from ..job import job_dict, Job @@ -339,3 +341,12 @@ if __name__ == "__main__": tx_sequence, rx_sequence = generate_network_sequences(total_ib_tx, total_ib_rx, intervals, lambda_poisson) print(tx_sequence, rx_sequence) + + +def download(dest: Path, start: datetime | None, end: datetime | None): + dest.mkdir(parents=True) + subprocess.run(["git", "clone", "https://github.com/LLNL/LAST/", str(dest / 'repo')], check=True, text=True) + subprocess.run(["git", "lfs", "pull"], check=True, text=True, cwd=dest / "repo") + (dest / "repo" / "Lassen-Supercomputer-Job-Dataset").rename(dest / "Lassen-Supercomputer-Job-Dataset") + shutil.rmtree(dest / 'repo') + print("Done!") diff --git a/raps/dataloaders/marconi100.py b/raps/dataloaders/marconi100.py index 4b3c5c6ba9a5ef9fbbb6ce55843b07b336607766..3ee7570ae94b9aa269e70846f254549b20f36832 100644 --- a/raps/dataloaders/marconi100.py +++ b/raps/dataloaders/marconi100.py @@ -1,31 +1,38 @@ """ - # Reference - Antici, Francesco, et al. "PM100: A Job Power Consumption Dataset of a - Large-scale Production HPC System." Proceedings of the SC'23 Workshops - of The International Conference on High Performance Computing, - Network, Storage, and Analysis. 2023. - - # get the data - Download `job_table.parquet` from https://zenodo.org/records/10127767 - - # to simulate the dataset - raps run -f /path/to/job_table.parquet --system marconi100 - - # to replay using differnt schedulers - raps run -f /path/to/job_table.parquet --system marconi100 --policy fcfs --backfill easy - raps run -f /path/to/job_table.parquet --system marconi100 --policy priority --backfill firstfit - - # to fast-forward 60 days and replay for 1 day - raps run -f /path/to/job_table.parquet --system marconi100 --start 2020-07-05T00:00:00+00:00 -t 1d - - # to analyze dataset - python -m raps.telemetry -f /path/to/job_table.parquet --system marconi100 -v - +# Reference +Antici, Francesco, et al. "PM100: A Job Power Consumption Dataset of a +Large-scale Production HPC System." Proceedings of the SC'23 Workshops +of The International Conference on High Performance Computing, +Network, Storage, and Analysis. 2023. + +# get the data +Download the dataset with +``` +raps download --system marconi100 +``` +This will download the dataset from https://zenodo.org/records/10127767 + +# to simulate the dataset +raps run -f /path/to/job_table.parquet --system marconi100 + +# to replay using differnt schedulers +raps run -f /path/to/job_table.parquet --system marconi100 --policy fcfs --backfill easy +raps run -f /path/to/job_table.parquet --system marconi100 --policy priority --backfill firstfit + +# to fast-forward 60 days and replay for 1 day +raps run -f /path/to/job_table.parquet --system marconi100 --start 2020-07-05T00:00:00+00:00 -t 1d + +# to analyze dataset +python -m raps.telemetry -f /path/to/job_table.parquet --system marconi100 -v """ import uuid import numpy as np import pandas as pd from tqdm import tqdm +from pathlib import Path +from datetime import datetime +import requests +import urllib.request from ..job import job_dict, Job from ..utils import power_to_utilization, WorkloadData @@ -241,3 +248,15 @@ def cdu_index_to_name(index: int, config: dict): def cdu_pos(index: int, config: dict) -> tuple[int, int]: """ Return (row, col) tuple for a cdu index """ return (0, index) # TODO + + +def download(dest: Path, start: datetime | None, end: datetime | None): + files = requests.get("https://zenodo.org/api/records/10127767").json()["files"] + + # marconi100 is just one big parquet, nothing to pre-filter + dest.mkdir(parents=True) + for file in files: + print(f"Downloading {file['key']}") + urllib.request.urlretrieve(file['links']['self'], dest / file['key']) + + print("Done!") diff --git a/raps/engine.py b/raps/engine.py index d502e45ff5d695f6bf74019c9abbb3be2d9a009d..a9ea4a07f448a72ac9459e90d125ece904a0b4b9 100644 --- a/raps/engine.py +++ b/raps/engine.py @@ -200,11 +200,9 @@ class Engine: if sim_config.start: start = sim_config.start diff = start - wd.start_date - if diff.total_seconds() < 0: - raise Exception( - f"{start.isoformat()} is before data range in workload. " - + f"Workload data begins at {wd.start_date.isoformat()}" - ) + # diff may be negative if start is before the first job in the workload. We'll still + # shift telemetry_start to match with sim_config.start, even if that leaves a blank + # spot at the beginning. wd.telemetry_start += int(diff.total_seconds()) wd.start_date = start else: diff --git a/raps/schedulers/replay.py b/raps/schedulers/replay.py deleted file mode 100644 index 7a0abcf4b5671367104ec806fea001c6e8789d8a..0000000000000000000000000000000000000000 --- a/raps/schedulers/replay.py +++ /dev/null @@ -1,61 +0,0 @@ -from ..policy import PolicyType - - -class Scheduler: - """ - Mock Scheduler only considering start time. - There is no scheduling going on but job placement according to start time. - - Default job scheduler with various scheduling policies. - """ - - def __init__(self, config, policy, resource_manager=None): - self.config = config - self.policy = PolicyType(policy) - if resource_manager is None: - raise ValueError("Scheduler requires a ResourceManager instance") - self.resource_manager = resource_manager - self.debug = False - - def sort_jobs(self, queue, accounts=None): - """Sort jobs based on the selected scheduling policy.""" - return sorted(queue, key=lambda job: job.start_time) - - def prepare_system_state(self, queue, running): - return queue - - def schedule(self, queue, running, current_time, accounts=None, sorted=False, debug=False): - # Sort the queue in place. - if not sorted: - queue[:] = self.sort_jobs(queue, accounts) - - for job in queue[:]: - # Skip jobs in queue with start time in the future - if job.start_time >= current_time: - continue - - nodes_available = False - if job.nodes_required <= len(self.resource_manager.available_nodes): - if self.policy == PolicyType.REPLAY and job.scheduled_nodes: # Check if we need exact set - # is exact set available: - nodes_available = set(job.scheduled_nodes).issubset(set(self.resource_manager.available_nodes)) - else: - # we dont need the exact set: - nodes_available = True # Checked above - if job.nodes_required == 0: - raise ValueError(f"Job Requested zero nodes: {job}") - # clear scheduled nodes - job.scheduled_nodes = [] - else: - pass # not enough nodes available - - if nodes_available: - self.resource_manager.assign_nodes_to_job(job, current_time) - running.append(job) - queue.remove(job) - else: - # This is a replay so this should not happen - raise ValueError( - f"Nodes not available!\nRequested:{job.scheduled_nodes}\n" - f"Available:{self.resource_manager.available_nodes}\n{job.__dict__}; " - f"Policy: {self.policy}") diff --git a/raps/sim_config.py b/raps/sim_config.py index a12512f2e87b7d55c4846604b9de7b2fcfc65aa3..32cc043408670ab72cc5404ce44cdbc58b52451d 100644 --- a/raps/sim_config.py +++ b/raps/sim_config.py @@ -118,6 +118,13 @@ class SimConfig(RAPSBaseModel, abc.ABC): replay: list[ResolvedPath] | None = None """ Either: path/to/joblive path/to/jobprofile OR filename.npz """ + dataloader: str | None = None + """ + Python module path to use as the dataloader when loading replay data. Only relevant if replay is + set. E.g. Defaults to "raps.dataloaders." but can be set to your own custom dataloader + as well. + """ + encrypt: bool = False """ Encrypt sensitive data in telemetry """ @@ -312,12 +319,18 @@ class SimConfig(RAPSBaseModel, abc.ABC): if td is not None: convert_to_time_unit(td, self.time_unit) # will throw if invalid - if "workload" not in self.model_fields_set and self.replay: - self.workload = "replay" # default to replay if --replay is set - if self.workload == "replay" and not self.replay: - raise ValueError('--replay must be set when workload type is "replay"') - elif self.workload != "replay" and self.replay: - raise ValueError('workload must be either omitted or "replay" when --replay is set') + if self.replay: + if "workload" not in self.model_fields_set: + self.workload = "replay" # default to replay if --replay is set + if not self.policy: + self.policy = "replay" + if self.workload != "replay" or self.policy != 'replay': + raise ValueError('workload & policy must be either omitted or "replay" when --replay is set') + if self.scheduler != 'default': + raise ValueError('scheduler must be omitted or set to default when --replay is set') + else: + if self.workload == "replay" or self.policy == "replay": + raise ValueError('--replay must be set when workload type is "replay"') if self.cooling: self.layout = "layout2" diff --git a/raps/stats.py b/raps/stats.py index 7df6208a3c15f4bafcf5a3125307fe3294c1c471..6906e67b09995a546d4d63182d4cde5df1d08088 100644 --- a/raps/stats.py +++ b/raps/stats.py @@ -454,7 +454,7 @@ class RunningStats: # Infinite generator used for the RunningStats logic def running_sum_values(values, last_value, last_index): return last_value + sum_values(values[last_index:]) - + def running_min_value(values, last_value, last_index): if last_index < len(values): new_min = min_value(values[last_index:]) diff --git a/raps/telemetry.py b/raps/telemetry.py index b7f29b79e2c983b22e422c334a0d50dfeaba88ba..0da391a5762ecfa6a57adae892fb001b1918e4b9 100644 --- a/raps/telemetry.py +++ b/raps/telemetry.py @@ -9,7 +9,7 @@ helper functions for data encryption and conversion between node name and index from typing import Literal import random from pathlib import Path -# import json +from datetime import datetime from typing import Optional from types import ModuleType import importlib @@ -21,6 +21,7 @@ from pydantic import model_validator from raps.sim_config import SimConfig from raps.system_config import get_system_config from raps.job import Job, job_dict +from raps.utils import AutoAwareDatetime import matplotlib.pyplot as plt from raps.plotting import ( plot_jobs_gantt, @@ -84,9 +85,13 @@ class Telemetry: self.system = kwargs['system'] self.config = kwargs.get('config') + if kwargs.get("dataloader"): + module = kwargs['dataloader'] + else: + module = f"raps.dataloaders.{self.system.split('/')[0]}" + try: - module = self.system.split("/")[0] - self.dataloader = importlib.import_module(f"raps.dataloaders.{module}", package=__package__) + self.dataloader = importlib.import_module(module, package=__package__) except ImportError as e: print(f"WARNING: Failed to load dataloader: {e}") self.dataloader = None @@ -183,6 +188,13 @@ class Telemetry: assert self.dataloader return self.dataloader.load_live_data(**self.kwargs) + def download_data(self, dest: Path, start: datetime | None, end: datetime | None): + """Load telemetry data using custom data loaders.""" + assert self.dataloader + if not hasattr(self.dataloader, "download"): + raise ValueError("Dataloader does not support download") + return self.dataloader.download(dest, start, end) + def node_index_to_name(self, index: int): """ Convert node index into a name""" assert self.dataloader @@ -359,3 +371,25 @@ def run_telemetry(args: TelemetryArgs): print(f"Saved to: {filename}") else: plt.show() + + +class DownloadArgs(RAPSBaseModel): + system: str + dest: ResolvedPath | None = None + start: AutoAwareDatetime | None = None + end: AutoAwareDatetime | None = None + + +def run_download_add_parser(subparsers: SubParsers): + parser = subparsers.add_parser("download", description=""" + Download telemetry data + """) + model_validate = pydantic_add_args(parser, DownloadArgs) + parser.set_defaults(impl=lambda args: run_download(model_validate(args, {}))) + + +def run_download(args: DownloadArgs): + config = get_system_config(args.system).get_legacy() + td = Telemetry(system=args.system, config=config) + dest = args.dest if args.dest else Path("./data").resolve() / args.system + td.download_data(dest, args.start, args.end) diff --git a/raps/utils.py b/raps/utils.py index d98be2a7073bd01a8f16436364b33aeb1af7b1a0..0c77d3f02e8669091e90145bc796305032544b9f 100644 --- a/raps/utils.py +++ b/raps/utils.py @@ -6,7 +6,8 @@ generating random numbers, summarizing and expanding ranges, determining job sta """ -from datetime import datetime, timedelta, timezone +from datetime import datetime, timedelta, timezone, date +from collections.abc import Iterable from enum import Enum import os import hashlib @@ -71,6 +72,16 @@ def to_dict(arg): raise ValueError(f"Cannot convert {arg} to dict") +DateType = TypeVar("DateType", date, datetime) + + +def date_range(start: DateType, end: DateType, step=timedelta(days=1)) -> Iterable[DateType]: + window_start = start + while window_start < end: + yield window_start + window_start += step + + def sum_values(values): return sum(x[1] for x in values) if values else 0 @@ -637,15 +648,16 @@ def get_current_utilization(trace, job: Job): """Return utilization for a trace at the job's current running time. Note: this should move to a trace.py and a Trace class! """ - if not job.trace_quanta: - raise ValueError("job.trace_quanta is not set; cannot compute utilization.") - - time_quanta_index = int((job.current_run_time - job.trace_start_time) // job.trace_quanta) - if time_quanta_index < 0: - time_quanta_index = 0 - if (isinstance(trace, list) and trace) or \ (isinstance(trace, np.ndarray) and trace.size != 0): + + if not job.trace_quanta: + raise ValueError("job.trace_quanta is not set; cannot compute utilization.") + + time_quanta_index = int((job.current_run_time - job.trace_start_time) // job.trace_quanta) + if time_quanta_index < 0: + time_quanta_index = 0 + if time_quanta_index < len(trace): util = get_utilization(trace, time_quanta_index) else: @@ -653,6 +665,7 @@ def get_current_utilization(trace, job: Job): elif isinstance(trace, (float, int)): util = trace else: + raise ValueError(f"trace is of unexpected type: {type(trace)}.") util = 0.0 return util diff --git a/scripts/get_data.sh b/scripts/get_data.sh deleted file mode 100755 index a4263fda3ac83e8b3148122e8c122b7db61adf65..0000000000000000000000000000000000000000 --- a/scripts/get_data.sh +++ /dev/null @@ -1,20 +0,0 @@ -#!/bin/bash -# Note: -# recommend setting up ~/.ssh/config to specify User and HostName -# Host mymachine -# User jdoe -# HostName mymachine.com - -machine="mymachine" -mkdir -p jobprofile slurm/jobcomplete slurm/joblive - -if [ -n "$1" ]; then - DATE=$1 -else - DATE="2024-01-19" -fi - -DPATH=/path/to/data/lake - -/usr/bin/scp -r $machine:$DPATH/jobprofile/jobprofile/date=$DATE jobprofile -/usr/bin/scp -r $machine:$DPATH/slurm/joblive/date=$DATE slurm/joblive diff --git a/tests/systems/test_main_network_run.py b/tests/systems/test_main_network_run.py index ea693b49e5d7c89beb20d6bcaf82f3e9b32db562..5c139895a4559f71b39bd461317dad56c5e915e4 100644 --- a/tests/systems/test_main_network_run.py +++ b/tests/systems/test_main_network_run.py @@ -11,7 +11,10 @@ pytestmark = [ ] -def test_main_network_run(system, system_config, sim_output): +def test_main_network_run(system, system_config, sim_output, pytestconfig): + if system == "lassen" and not pytestconfig.getoption("--runlong"): + pytest.skip("This test for \"lassen\" is very long; pass --runlong to run it") + if not system_config.get("main", False): pytest.skip(f"{system} does not support basic main run.") diff --git a/tests/systems/test_main_withdata_run.py b/tests/systems/test_main_withdata_run.py index ed1a9442e7d8386d01a4e8607a07218b14286914..1ca64134f47b31faffa852f7005832ef2a6abfdf 100644 --- a/tests/systems/test_main_withdata_run.py +++ b/tests/systems/test_main_withdata_run.py @@ -16,11 +16,11 @@ def test_main_withdata_run(system, system_config, system_files, sim_output): engine, stats = run_engine({ "system": system, - "time": "10m", + "time": "20m", "replay": system_files, }) # Check that it at least loaded some data - assert stats['tick_count'] == 10 * 60 + assert stats['tick_count'] == 20 * 60 assert stats['job']['jobs_total'] > 0 assert len(stats['job']['jobs_still_running']) + stats['job']['jobs_completed'] > 0 diff --git a/tests/unit/test_net_dragonfly.py b/tests/unit/test_net_dragonfly.py index a36afdc46e0cc6c38f127c4043bf0990f4383e12..b8711fd055fde30a86a51fc055b77d25ad80e064 100644 --- a/tests/unit/test_net_dragonfly.py +++ b/tests/unit/test_net_dragonfly.py @@ -1,32 +1,42 @@ -import pytest from raps.network.dragonfly import build_dragonfly, dragonfly_node_id_to_host_name + def test_build_dragonfly(): """Test building a small dragonfly network.""" - D, A, P = 2, 2, 2 + D = 2 # Routers per group + A = 2 # Gloobal connections per router + P = 2 # Compute nodes per router G = build_dragonfly(D, A, P) # Check number of nodes - num_routers = D * A - num_hosts = D * A * P + num_routers = D * (A + 1) + num_hosts = num_routers * P total_nodes = num_routers + num_hosts assert len(G.nodes) == total_nodes # Check number of edges - # Intra-group edges (clique) - intra_group_edges = D * (A * (A - 1) // 2) + routers_per_group = D + # Edges of the router clique: + router_clique_edges_per_group = ((routers_per_group * (routers_per_group - 1)) // 2) + # Edges for all router compute nodes: + compute_node_edges_per_router = P + # Total Intra-group edges: + intra_group_edges = router_clique_edges_per_group + compute_node_edges_per_router * D + # Inter-group edges - inter_group_edges = A * (D * (D - 1) // 2) + total_groups = A + 1 + inter_group_edges_simple_clique = ((total_groups * (total_groups-1)) // 2) + inter_group_edges = inter_group_edges_simple_clique * D # Host to router edges - host_router_edges = num_hosts - total_edges = intra_group_edges + inter_group_edges + host_router_edges + total_edges = intra_group_edges * total_groups + inter_group_edges assert len(G.edges) == total_edges # Check node types - node_types = [data["type"] for _, data in G.nodes(data=True)] + node_types = [data["layer"] for _, data in G.nodes(data=True)] assert node_types.count("router") == num_routers assert node_types.count("host") == num_hosts + def test_dragonfly_node_id_to_host_name(): """Test the dragonfly_node_id_to_host_name function.""" D, A, P = 2, 2, 2 diff --git a/tests/unit/test_net_torus3d.py b/tests/unit/test_net_torus3d.py index b18cbfa2caec2d48e9e5efc1a2e6d492ae6de589..3e38eb420dd090b2109a30788b31a6c49cea5ebb 100644 --- a/tests/unit/test_net_torus3d.py +++ b/tests/unit/test_net_torus3d.py @@ -1,6 +1,6 @@ -import pytest from raps.network.torus3d import build_torus3d, torus_route_xyz + def test_build_torus3d(): """Test building a small 3D torus network.""" dims = (2, 2, 2) @@ -8,23 +8,25 @@ def test_build_torus3d(): # Check number of nodes num_routers = dims[0] * dims[1] * dims[2] - num_hosts = num_routers # hosts_per_router=1 + hosts_per_router = 1 # Default! Assumption + num_hosts = num_routers * hosts_per_router total_nodes = num_routers + num_hosts assert len(G.nodes) == total_nodes # Check number of edges # Router to router edges - router_edges = (num_routers * 3) // 2 # Each router has 3 neighbors in a 3D torus + router_edges = (num_routers * 3) # Each router has 3 neighbors in a 3D torus # Host to router edges - host_router_edges = num_hosts + host_router_edges = num_routers * hosts_per_router total_edges = router_edges + host_router_edges assert len(G.edges) == total_edges # Check node types - node_types = [data["kind"] for _, data in G.nodes(data=True)] + node_types = [data["type"] for _, data in G.nodes(data=True)] assert node_types.count("router") == num_routers assert node_types.count("host") == num_hosts + def test_torus_route_xyz(): """Test the torus_route_xyz function.""" dims = (4, 4, 4)