diff --git a/.gitignore b/.gitignore index ed10fabfb9ebac419c6c83f1721ac53bf7e81a9f..c5f7241ff90d5d4225c61e5570e1b581b8866d1e 100644 --- a/.gitignore +++ b/.gitignore @@ -7,3 +7,5 @@ venv simulation_results/ models/fmu-models .shell-completion-cache +raps-output-* +ppo_raps_logs diff --git a/README.md b/README.md index 9c708afd5e5f92aa574d3a6ca998781ff470df6e..160549fc97b53e55677a5e6d30625e02a8c6875e 100644 --- a/README.md +++ b/README.md @@ -77,6 +77,14 @@ For MIT Supercloud # Reinforcement learning test case raps train-rl --system mit_supercloud/part-cpu -f /opt/data/mit_supercloud/202201 +Microsoft Azure - 2017 Philly Traces + + # Synthetic + python main.py run-parts -x philly -w multitenant + + # Telemetry replay + python main.py run-parts -x philly -f /opt/data/philly/trace-data --start 2017-10-03T00:14:56Z --end 2017-10-04T00:00 + For Lumi # Synthetic test for Lumi: diff --git a/config/perlmutter.yaml b/config/perlmutter.yaml new file mode 100644 index 0000000000000000000000000000000000000000..e8de04c5720f506413142152ab89b8948c5c82c2 --- /dev/null +++ b/config/perlmutter.yaml @@ -0,0 +1,51 @@ +system: + num_cdus: 36 + racks_per_cdu: 3 + nodes_per_rack: 128 + rectifiers_per_rack: 32 + chassis_per_rack: 8 + nodes_per_blade: 2 + switches_per_chassis: 4 + nics_per_node: 4 + rectifiers_per_chassis: 4 + nodes_per_rectifier: 4 + missing_racks: [] + down_nodes: [] + cpus_per_node: 1 + gpus_per_node: 4 + cpu_peak_flops: 3580000000000.0 + gpu_peak_flops: 9700000000000.0 + cpu_fp_ratio: 0.667 + gpu_fp_ratio: 0.667 +power: + power_gpu_idle: 88 + power_gpu_max: 300 + power_cpu_idle: 90 + power_cpu_max: 280 + power_mem: 74.26 + power_nic: 20 + power_nvme: 30 + power_switch: 250 + power_cdu: 8473.47 + power_update_freq: 15 + rectifier_peak_threshold: 13670 + sivoc_loss_constant: 13 + sivoc_efficiency: 0.98 + rectifier_loss_constant: 17 + rectifier_efficiency: 0.96 + power_cost: 0.094 +scheduler: + seed: 42 + job_arrival_time: 900 + mtbf: 11 + trace_quanta: 10 + min_wall_time: 3600 + max_wall_time: 43200 + ui_update_freq: 900 + max_nodes_per_job: 3000 + job_end_probs: + COMPLETED: 0.63 + FAILED: 0.13 + CANCELLED: 0.12 + TIMEOUT: 0.11 + NODE_FAIL: 0.01 diff --git a/config/philly/2-gpu.yaml b/config/philly/2-gpu.yaml new file mode 100644 index 0000000000000000000000000000000000000000..d7201b2e1b86e4aa38948b11c679d96d89817095 --- /dev/null +++ b/config/philly/2-gpu.yaml @@ -0,0 +1,51 @@ +system: + num_cdus: 1 + racks_per_cdu: 1 + nodes_per_rack: 321 + chassis_per_rack: 3 + nodes_per_blade: 2 + switches_per_chassis: 4 + nics_per_node: 4 + rectifiers_per_chassis: 4 + nodes_per_rectifier: 4 + missing_racks: [] + down_nodes: [] + cpus_per_node: 2 + cores_per_cpu: 20 + gpus_per_node: 2 + cpu_peak_flops: 1248000000000.0 # assume Xeon E5-2690v4 CPU 64-bit + gpu_peak_flops: 9300000000000.0 # assume 12G P100 32-bit + cpu_fp_ratio: 0.667 + gpu_fp_ratio: 0.667 +power: + power_gpu_idle: 30 + power_gpu_max: 250 + power_cpu_idle: 90 + power_cpu_max: 270 + power_mem: 74.26 + power_nvme: 30 + power_nic: 20 + power_cdu: 8473.47 + power_switch: 250 + power_update_freq: 15 + rectifier_peak_threshold: 13670 + sivoc_loss_constant: 13 + sivoc_efficiency: 0.98 + rectifier_loss_constant: 17 + rectifier_efficiency: 0.96 + power_cost: 0.094 +scheduler: + multitenant: true + job_arrival_time: 900 + mtbf: 11 + trace_quanta: 20 + min_wall_time: 3600 + max_wall_time: 43200 + ui_update_freq: 900 + max_nodes_per_job: 192 + job_end_probs: + COMPLETED: 0.63 + FAILED: 0.13 + CANCELLED: 0.12 + TIMEOUT: 0.11 + NODE_FAIL: 0.01 diff --git a/config/philly/8-gpu.yaml b/config/philly/8-gpu.yaml new file mode 100644 index 0000000000000000000000000000000000000000..1e922826ca1ea3cbef6b5562f3ed72d601dfd842 --- /dev/null +++ b/config/philly/8-gpu.yaml @@ -0,0 +1,51 @@ +system: + num_cdus: 1 + racks_per_cdu: 1 + nodes_per_rack: 231 + chassis_per_rack: 3 + nodes_per_blade: 2 + switches_per_chassis: 4 + nics_per_node: 4 + rectifiers_per_chassis: 4 + nodes_per_rectifier: 4 + missing_racks: [] + down_nodes: [] + cpus_per_node: 2 + cores_per_cpu: 20 + gpus_per_node: 8 + cpu_peak_flops: 1248000000000.0 # assume Xeon E5-2690v4 CPU 64-bit + gpu_peak_flops: 12000000000000.0 # assume 24G P40 32-bit + cpu_fp_ratio: 0.667 + gpu_fp_ratio: 0.667 +power: + power_gpu_idle: 50 + power_gpu_max: 250 + power_cpu_idle: 90 + power_cpu_max: 270 + power_mem: 74.26 + power_nvme: 30 + power_nic: 20 + power_cdu: 8473.47 + power_switch: 250 + power_update_freq: 15 + rectifier_peak_threshold: 13670 + sivoc_loss_constant: 13 + sivoc_efficiency: 0.98 + rectifier_loss_constant: 17 + rectifier_efficiency: 0.96 + power_cost: 0.094 +scheduler: + multitenant: true + job_arrival_time: 900 + mtbf: 11 + trace_quanta: 20 + min_wall_time: 3600 + max_wall_time: 43200 + ui_update_freq: 900 + max_nodes_per_job: 192 + job_end_probs: + COMPLETED: 0.63 + FAILED: 0.13 + CANCELLED: 0.12 + TIMEOUT: 0.11 + NODE_FAIL: 0.01 diff --git a/config/selene.yaml b/config/selene.yaml new file mode 100644 index 0000000000000000000000000000000000000000..0520da1822532e6de6962690e4bc5d075dbe7370 --- /dev/null +++ b/config/selene.yaml @@ -0,0 +1,51 @@ +system: + num_cdus: 20 + racks_per_cdu: 7 + nodes_per_rack: 4 + rectifiers_per_rack: 32 + chassis_per_rack: 4 + nodes_per_blade: 2 + switches_per_chassis: 4 + nics_per_node: 4 + rectifiers_per_chassis: 4 + nodes_per_rectifier: 4 + missing_racks: [] + down_nodes: [] + cpus_per_node: 2 + gpus_per_node: 8 + cpu_peak_flops: 3481000000000.0 + gpu_peak_flops: 624000000000000.0 # BF8 performance + cpu_fp_ratio: 0.667 + gpu_fp_ratio: 0.667 +power: + power_gpu_idle: 88 + power_gpu_max: 400 + power_cpu_idle: 90 + power_cpu_max: 280 + power_mem: 74.26 + power_nic: 20 + power_nvme: 30 + power_switch: 250 + power_cdu: 8473.47 + power_update_freq: 15 + rectifier_peak_threshold: 13670 + sivoc_loss_constant: 13 + sivoc_efficiency: 0.98 + rectifier_loss_constant: 17 + rectifier_efficiency: 0.96 + power_cost: 0.094 +scheduler: + seed: 42 + job_arrival_time: 900 + mtbf: 11 + trace_quanta: 10 + min_wall_time: 3600 + max_wall_time: 43200 + ui_update_freq: 900 + max_nodes_per_job: 3000 + job_end_probs: + COMPLETED: 0.63 + FAILED: 0.13 + CANCELLED: 0.12 + TIMEOUT: 0.11 + NODE_FAIL: 0.01 diff --git a/raps/dataloaders/philly.py b/raps/dataloaders/philly.py new file mode 100644 index 0000000000000000000000000000000000000000..ca8121bbedb7e701b2f360d1dec05ffe3b5209fd --- /dev/null +++ b/raps/dataloaders/philly.py @@ -0,0 +1,447 @@ +""" +This is the dataloader for the Philly traces which is documented in this paper: + + Jeon, Myeongjae, et al. "Analysis of Large-Scale Multi-Tenant GPU clusters + for DNN training workloads." 2019 USENIX Annual Technical Conference + (USENIX ATC 19). 2019. https://www.usenix.org/system/files/atc19-jeon.pdf + +Note on hardware specs: + + Philly only provides GPU memory sizes (12G & 24G) without clarifying GPU models. + Hu et al. (2024) https://arxiv.org/html/2403.07648v1 + + For estimating system power and FLOPS performance, we assume that the 2-GPU + nodes used Tesla P100 (12 GB) GPUs and the 8-GPU nodes used Tesla P40 (24 GB) + GPUs, consistent with hardware Microsoft deployed around 2017. Training is + assumed to have been performed in 32-bit (FP32), and the CPUs are assumed + to be 64-bit Intel Xeon E5-2690 v4. + +The repository is available here: + + https://github.com/msr-fiddle/philly-traces + +The data portion of the repo can be downloaded using one of the following methods: + + git clone https://github.com/msr-fiddle/philly-traces.git + cd philly-traces + git lfs pull + + wget https://github.com/msr-fiddle/philly-traces/raw/master/trace-data.tar.gz + + curl -L -o trace-data.tar.gz \ + https://github.com/msr-fiddle/philly-traces/raw/master/trace-data.tar.gz + +After the file is downloaded, assuming its in /opt/data/philly/trace-data directory: + + /opt/data/philly/trace-data/trace-data.tar.gz + + cd /opt/data/philly/trace-data + + run `tar xvfz trace-data.tar.gz` which will unpack the following files: + + cluster_cpu_util 1.5G + cluster_gpu_util 2.8G + cluster_mem_util 2.2G + cluster_job_log 37M + cluster_machine_list 8K + + then run the following: + + python /path/to/raps/scripts/parse_philly_traces.py cluster_cpu_util + python /path/to/raps/scripts/parse_philly_traces.py cluster_gpu_util + + this will parse these two files into two directories, cpu_by_day and gpu_by_day, + creating one file for each day and adding the lines for that day into the files. + + sanity checks: + + wc -l cluster_cpu_util + 45028261 cluster_cpu_util + wc -l cpu_by_day/*.csv + 45350898 total + + wc -l cluster_gpu_util + 44750641 cluster_gpu_util + wc -l gpu_by_day/*.csv + 44750640 total + +Running a replay simulation: + + python main.py run-parts -x philly -f /opt/data/philly/trace-data \ + --start 2017-10-03T00:00 --end 2017-10-04T00:00 + +Once the dataloader has been run at least once, it will dump npz files into a directory, +so they can be replayed again without having to go through the expensive extractoin process, +using e.g.: + + python main.py run-parts -x philly -f raps-output-5efefa3 + +Note: it is possible to run simulations for an user-defined length of time between +10/3/2017 to 12/15/2017. + +""" + +import csv +import json +import os +from datetime import datetime, timedelta, timezone + +import pandas as pd +from tqdm import tqdm + +from raps.job import Job, job_dict +from raps.utils import WorkloadData + +DATE_FORMAT_STR = "%Y-%m-%d %H:%M:%S" +DEFAULT_START = "2017-10-03T00:00" +DEFAULT_END = "2017-10-04T00:00" + + +def to_epoch(ts_str): + """Convert a timestamp string or int/float into epoch seconds.""" + if ts_str is None: + return None + if isinstance(ts_str, (int, float)): + return int(ts_str) + if "T" in ts_str: + dt = datetime.fromisoformat(ts_str) + else: + dt = datetime.strptime(ts_str, DATE_FORMAT_STR) + return int(dt.timestamp()) + + +def parse_timestamp(val): + """ + Convert Philly job log timestamps to datetime. + Handles integers (epoch) and strings with PST/PDT. + Returns datetime or None. + """ + if val is None or val == "None": + return None + if isinstance(val, (int, float)): + return datetime.fromtimestamp(int(val), tz=timezone.utc).replace(tzinfo=None) + if isinstance(val, str): + val = val.replace(" PST", "").replace(" PDT", "") + try: + return datetime.strptime(val, DATE_FORMAT_STR).replace(tzinfo=None) + except ValueError: + return None + return None + + +def load_traces_by_day(trace_dir, start_dt, end_dt, colname): + """Load CPU or GPU traces between start_dt and end_dt.""" + traces = {} + current = start_dt.date() + + while current <= end_dt.date(): + daily_file = os.path.join(trace_dir, f"{current}.csv") + if os.path.exists(daily_file): + df = pd.read_csv( + daily_file, + names=["time", "machineId", colname], # no header in daily CSVs + dtype={"machineId": str, colname: str}, # avoid DtypeWarning + ) + + # Normalize time column (strip PST/PDT, parse datetime) + df["time"] = df["time"].str.replace(" PST", "").str.replace(" PDT", "") + df["time"] = pd.to_datetime( + df["time"], errors="coerce", format=DATE_FORMAT_STR + ) + + # Convert util column to numeric (NA/invalid → NaN) + df[colname] = pd.to_numeric(df[colname], errors="coerce") + + traces[current] = df + else: + print(f"⚠ No trace file for {current}") + current += timedelta(days=1) + + if not traces: + return {} + + return traces + + +def parse_date(s): + """Parse a Philly trace date string into a datetime object.""" + if not s or s == "None": + return None + # strip possible timezone labels like "PST"/"PDT" + s = s.replace(" PST", "").replace(" PDT", "") + return datetime.strptime(s, DATE_FORMAT_STR) + + +def load_data(files, **kwargs): + """ + Load Philly trace into ExaDigiT Job objects. + + Args: + files (list[str]): A list with one directory path (e.g., ['/opt/data/philly/trace-data']). + + Returns: + list[Job] + """ + debug = kwargs.get("debug") + print("started reading of philly traces... please be patient...", flush=True) + + # extract --start from kwargs + start_ts = to_epoch(kwargs.get("start", DEFAULT_START)) + end_ts = to_epoch(kwargs.get("end", DEFAULT_END)) + + assert len(files) == 1, "Expecting a single directory path" + trace_dir = files[0] + gpu_trace_dir = os.path.join(files[0], "gpu_by_day") + config = kwargs.get("config") + gpus_per_node = config.get("GPUS_PER_NODE") + if gpus_per_node is None: + raise ValueError("Must pass gpus_per_node (2 or 8)") + + # --- 1. Machine list --- + machine_file = os.path.join(trace_dir, "cluster_machine_list") + machines = {} + with open(machine_file, encoding="utf-8") as f: + reader = csv.DictReader(f) + for row in reader: + mid = row["machineId"] + machines[mid] = { + "num_gpus": int(row[" number of GPUs"]), + "gpu_mem": row[" single GPU mem"].strip(), + } + + partition_machines = { + mid: info for mid, info in machines.items() if info["num_gpus"] == gpus_per_node + } + + # Build node → index mapping for this partition + node_mapping = { + mid: idx for idx, mid in enumerate(sorted(partition_machines.keys())) + } + + # Assign partition ID (e.g. 0 for 2-GPU, 1 for 8-GPU) + partition_id = 0 if gpus_per_node == 2 else 1 + + # --- 3. GPU util --- + start_dt = datetime.fromtimestamp(start_ts) + end_dt = datetime.fromtimestamp(end_ts) + + # --- 4. Job log --- + job_file = os.path.join(trace_dir, "cluster_job_log") + with open(job_file, encoding="utf-8") as f: + job_log = json.load(f) + + # --- First pass: filter jobs by date range --- + filtered_log = [] + for raw in job_log: + submitted = raw.get("submitted_time") + if submitted is None or submitted == "None": + continue + if isinstance(submitted, (int, float)): + submitted_dt = datetime.fromtimestamp(int(submitted)) + else: + submitted_dt = parse_date(submitted) + if submitted_dt and start_dt <= submitted_dt <= end_dt: + filtered_log.append(raw) + job_log = filtered_log + + # Filter job_log to only jobs matching the partition's gpus_per_node + if gpus_per_node is not None: + filtered_log = [] + for raw in job_log: + attempts = raw.get("attempts", []) + if attempts and "detail" in attempts[0]: + # Count GPUs from the first detail + gpus = sum( + len(detail.get("gpus", [])) for detail in attempts[0]["detail"] + ) + if gpus > 0 and (gpus % gpus_per_node == 0): + filtered_log.append(raw) + job_log = filtered_log + + # --- First pass: find earliest submit time --- + start_ts = None + + for raw in job_log: + submitted = raw.get("submitted_time") + if submitted is None or submitted == "None": + continue + + # Philly uses either string dates or epoch ints + if isinstance(submitted, (int, float)): + t = int(submitted) + else: + t = parse_date(submitted).timestamp() + + if start_ts is None or t < start_ts: + start_ts = t + + if start_ts is None: + raise ValueError("No valid submitted_time found in Philly traces") + + # --- Pre-load all traces for the given date range --- + cpu_trace_dir = os.path.join(trace_dir, "cpu_by_day") + gpu_trace_dir = os.path.join(trace_dir, "gpu_by_day") + all_cpu_traces = load_traces_by_day(cpu_trace_dir, start_dt, end_dt, "cpu_util") + all_gpu_traces = load_traces_by_day(gpu_trace_dir, start_dt, end_dt, "gpu_util") + + # --- Second pass: build jobs --- + jobs_list = [] + for raw in tqdm(job_log, desc="Building Job objects"): + jobid = raw.get("jobid") + user = raw.get("user") + status = raw.get("status") + + # Submitted time + submitted = raw.get("submitted_time") + if isinstance(submitted, (int, float)): + submitted = datetime.fromtimestamp(int(submitted)) + else: + submitted = parse_date(submitted) + + attempts = raw.get("attempts", []) + start, end = None, None + if attempts: + st = attempts[0].get("start_time") + et = attempts[-1].get("end_time") + + if isinstance(st, (int, float)): + start = datetime.fromtimestamp(int(st)) + elif st: + start = parse_date(st) + + if isinstance(et, (int, float)): + end = datetime.fromtimestamp(int(et)) + elif et: + end = parse_date(et) + + wall_time = None + if start and end: + wall_time = (end - start).total_seconds() + + # Which machines did this job run on? + machine_ids, gpus = [], 0 + if attempts and "detail" in attempts[0]: + for detail in attempts[0]["detail"]: + mid = detail["ip"] + machine_ids.append(mid) + gpus += len(detail.get("gpus", [])) + + num_nodes = len(machine_ids) + if num_nodes == 0: + continue + gpus_per_node = gpus // num_nodes + + # --- absolute datetimes (used for filtering traces) --- + submitted_dt = parse_timestamp(raw.get("submitted_time")) + + job_start = start + job_end = end + + if not job_start or not job_end: + continue + + # --- CPU utilization traces --- + cpu_dfs = [] + current_date = job_start.date() + while current_date <= job_end.date(): + if current_date in all_cpu_traces: + cpu_dfs.append(all_cpu_traces[current_date]) + current_date += timedelta(days=1) + + if not cpu_dfs: + job_cpu_trace = [] + else: + job_cpu_df = pd.concat(cpu_dfs, ignore_index=True) + mask = ( + (job_cpu_df["machineId"].isin(machine_ids)) + & (job_cpu_df["time"] >= start) + & (job_cpu_df["time"] <= end) + ) + job_cpu = job_cpu_df.loc[mask].copy() + + if len(machine_ids) > 1: + job_cpu = job_cpu.groupby("time")["cpu_util"].mean().reset_index() + + job_cpu_trace = (job_cpu["cpu_util"].to_numpy() * 0.01).tolist() + + # --- GPU utilization traces --- + gpu_dfs = [] + current_date = job_start.date() + while current_date <= job_end.date(): + if current_date in all_gpu_traces: + gpu_dfs.append(all_gpu_traces[current_date]) + current_date += timedelta(days=1) + + if not gpu_dfs: + job_gpu_trace = [] + else: + job_gpu_df = pd.concat(gpu_dfs, ignore_index=True) + mask = ( + (job_gpu_df["machineId"].isin(machine_ids)) + & (job_gpu_df["time"] >= start) + & (job_gpu_df["time"] <= end) + ) + job_gpu = job_gpu_df.loc[mask].copy() + + if len(machine_ids) > 1: + job_gpu = job_gpu.groupby("time")["gpu_util"].mean().reset_index() + + job_gpu_trace = ( + job_gpu["gpu_util"].to_numpy() * 0.01 * gpus_per_node + ).tolist() + + if machine_ids: + submit_time = submitted.timestamp() + start_time = start.timestamp() + end_time = end.timestamp() + + if not submit_time or not start_time or not end_time: + tqdm.write( + f"skipped {jobid} b/c missing submit_time, start_time, or end_time" + ) + continue + + scheduled_nodes = [ + node_mapping[mid] for mid in machine_ids if mid in node_mapping + ] + + if submit_time and start_time and end_time: + + job = job_dict( + id=jobid, + name=f"philly-{jobid}", + account=user if user else "unknown", + nodes_required=len(machine_ids), + partition=partition_id, + cpu_cores_required=1, + gpu_units_required=gpus_per_node, + end_state=status, + scheduled_nodes=scheduled_nodes, + cpu_trace=job_cpu_trace, + gpu_trace=job_gpu_trace, + ntx_trace=[], + nrx_trace=[], + submit_time=submit_time, + start_time=start_time, + end_time=end_time, + time_limit=end_time, + expected_run_time=wall_time if wall_time else 0, + trace_start_time=start_time, # None, + trace_end_time=end_time, # None, + trace_quanta=60, + trace_missing_values=False + ) + if job_cpu_trace and job_gpu_trace: + jobs_list.append(Job(job)) + else: + tqdm.write(f"skipping {job['id']} b/c either no cpu or gpu trace") + + if debug: + tqdm.write(f"{job['id']} start: {job['start_time']} end: {job['end_time']}") + + return WorkloadData( + jobs=jobs_list, + telemetry_start=start_ts, + telemetry_end=end_ts, + start_date=datetime.fromtimestamp(start_ts, timezone.utc), + ) diff --git a/raps/sim_config.py b/raps/sim_config.py index da3541c596a7e679edb56535f16b6579d8cd9c30..a69c2884716d0199b7c10024ed6ee85223a4bab6 100644 --- a/raps/sim_config.py +++ b/raps/sim_config.py @@ -134,8 +134,8 @@ class SimConfig(RAPSBaseModel, abc.ABC): """ Grab data from live system. """ # Workload arguments (TODO split into separate model) - workload: Literal['random', 'benchmark', 'peak', 'idle', - 'synthetic', 'multitenant', 'replay', 'randomAI'] = "random" + workload: Literal['random', 'benchmark', 'peak', 'idle', 'synthetic', + 'multitenant', 'replay', 'randomAI', 'calculon'] = "random" """ Type of synthetic workload """ multimodal: list[float] = [1.0] diff --git a/raps/utils.py b/raps/utils.py index de565d48d9999ea312e1899f0c5c6c505b1bab19..e232bce5cc9986547340b0944ea85a5f05a34432 100644 --- a/raps/utils.py +++ b/raps/utils.py @@ -634,22 +634,23 @@ def convert_numpy_to_builtin(obj): def get_current_utilization(trace, job: Job): - # Return utilizaiton for a trace at the jobs current running time. - # Note: this should move to a trace.py and a Trace class! - util = 0.0 + """Return utilization for a trace at the job's current running time. + Note: this should move to a trace.py and a Trace class! + """ + if not job.trace_quanta: + raise ValueError("job.trace_quanta is not set; cannot compute utilization.") - if job.trace_quanta: - time_quanta_index = int((job.running_time - job.trace_start_time) // job.trace_quanta) - if time_quanta_index < 0: - time_quanta_index = 0 + time_quanta_index = int((job.running_time - job.trace_start_time) // job.trace_quanta) + if time_quanta_index < 0: + time_quanta_index = 0 - if (isinstance(trace, list) and trace != []) or \ + if (isinstance(trace, list) and trace) or \ (isinstance(trace, np.ndarray) and trace.size != 0): if time_quanta_index < len(trace): util = get_utilization(trace, time_quanta_index) else: util = get_utilization(trace, max(0, len(trace) - 1)) - elif isinstance(trace, float) or isinstance(trace, int): + elif isinstance(trace, (float, int)): util = trace else: util = 0.0 diff --git a/raps/workloads/__init__.py b/raps/workloads/__init__.py index a34261a9996082bedb6f8a5395ebb24d5d4661e3..d61befc50078cee6654b40acf2f70eb5278b31ef 100644 --- a/raps/workloads/__init__.py +++ b/raps/workloads/__init__.py @@ -10,6 +10,7 @@ from raps.sim_config import SingleSimConfig from raps.telemetry import Telemetry from .basic import BasicWorkload +from .calculon import Calculon from .constants import JOB_NAMES, ACCT_NAMES, MAX_PRIORITY from .distribution import DistributionWorkload from .live import continuous_job_generation @@ -51,7 +52,8 @@ class Workload( BaseWorkload, DistributionWorkload, BasicWorkload, - MultitenantWorkload + MultitenantWorkload, + Calculon ): """Final workload class with all workload types.""" pass diff --git a/raps/workloads/calculon.py b/raps/workloads/calculon.py new file mode 100644 index 0000000000000000000000000000000000000000..f843084eafa4e6a547288074922da29a1761165a --- /dev/null +++ b/raps/workloads/calculon.py @@ -0,0 +1,180 @@ +""" +Calculon is a analytical model for estimating LLM training times for given architectures +on particular hardware. It is described in the paper: + + Isaev, Mikhail, et al. "Calculon: a methodology and tool for high-level co-design of + systems and large language models." SC23 Proceedings + https://dl.acm.org/doi/pdf/10.1145/3581784.3607102 + +The code is available at https://github.com/calculon-ai/calculon +which this module assumes is already cloned into the third_party directory. + +Calculon requires installing `psutil`, which can be pip installed via: + + pip install psutil + +Since Calculon by default supports A100 GPUs, we are able to use the default files that +are already setup in Calculon, and therefore have added two systems which have A100 GPUs: +Selene and Perlmutter. Example run commands: + + python main.py run --system selene -w calculon + python main.py run --system perlmutter -w calculon + +This code is currently setup to generate synthetic traces for four different LLM models: +megatron-22B, gpt3-175B, turing-530B, and megatron-1T. These four tests can take a couple +**hours** to run. On first run, consider commenting out the last three models to only test +the smallest case, megatron-22B. The parameter `llm_models_tests` below defines which tests +are run. + +Finally, the code below is setup to uses previously cached results, so once the json +files are generated by Calculon, they can be rerun very quickly again and again. +The caveat to this is if you want to change some Calculon configurations, +you will need to delete the cached json files in the calculon/optimal_executions folder, +to force it to regenerate new files. + +""" +import math +import json +import os +import random +import subprocess +from pathlib import Path + +import numpy as np + +from raps.job import Job, job_dict + +from .constants import ACCT_NAMES + + +class Calculon: + """Calculon workload mixin for Workload class.""" + + def __init__(self, *args, **kwargs): + # NOTE: mixins usually accept (sim_config_args, system_config_dict) through Workload + super().__init__(*args, **kwargs) + + def calculon(self, **kwargs): + """Generate workload using Calculon backend + job trace synthesis.""" + jobs = [] + + llm_models_test = [ + ["megatron-22B", 8, 4], + ["gpt3-175B", 64, 64], + ["turing-530B", 280, 280], + ["megatron-1T", 512, 512], + ] + + for llm_model, num_nodes, max_batch_size in llm_models_test: + for partition in self.partitions: + config = self.config_map[partition] + gpu_system = "a100_80g" + data_type = "float16" + output = f"{llm_model}_{gpu_system}_{max_batch_size}_{data_type}_{num_nodes}.json" + + # call Calculon binary/subprocess to get MFU + batch time + mfu, total_batch_time = self._run_calculon( + llm_model, gpu_system, max_batch_size, num_nodes, data_type, output + ) + + # derive job stats + num_iters = 1000000 # realistic number is probably in the millions + trace_quanta = config["TRACE_QUANTA"] + + job_time = total_batch_time * num_iters + num_samples = math.ceil(job_time / trace_quanta) + 1 + end_time = num_samples * trace_quanta # align job to tick grid + + # use random CPU utilizations for now + cpu_util = random.random() * config["CPUS_PER_NODE"] + cpu_trace = np.full(num_samples, cpu_util) # same length + gpu_trace = np.full(num_samples, mfu) # length matches simulation steps + + net_tx, net_rx = [], [] + num_nodes = num_nodes // config["GPUS_PER_NODE"] + + epochs = 1 + wall_time = job_time + for i in range(epochs): + job_info = job_dict( + nodes_required=num_nodes, + scheduled_nodes=[], + name=f"{llm_model} training for {num_iters} iterations", + account=ACCT_NAMES[0], + cpu_trace=cpu_trace, + gpu_trace=gpu_trace, + ntx_trace=net_tx, + nrx_trace=net_rx, + end_state="COMPLETED", + id=None, + priority=100, + partition=partition, + time_limit=job_time + 1, + start_time=0, + end_time=end_time, + expected_run_time=end_time, + trace_quanta=trace_quanta, + trace_time=job_time, + trace_start_time=0, + trace_end_time=job_time, + ) + job = Job(job_info) + jobs.append(job) + wall_time += job_time + + return jobs + + def _run_calculon(self, model, system, max_batch_size, num_nodes, data_type, output): + """Internal: run Calculon subprocess and parse result.""" + base_path = Path("third_party/calculon") + output_dir = base_path / "optimal_executions" + output_dir.mkdir(exist_ok=True) + + # expected files + raw_file = output_dir / f"{output.replace('.json', '_raw.json')}" + exec_file = output_dir / f"{output.replace('.json', '_exec.json')}" + stats_file = output_dir / f"{output.replace('.json', '_stats.json')}" + + # if all three exist, skip running + if raw_file.exists() and exec_file.exists() and stats_file.exists(): + print(f"[INFO] Using cached Calculon results for {output}") + with open(raw_file) as f: + data = json.load(f) + first_key = list(data.keys())[0] + stats = data[first_key]["stats"] + mfu = stats.get("sample_rate", 0) # or compute MFU if you want + batch_time = stats.get("block_fw_time", 0) # example placeholder + return mfu, batch_time + + # otherwise, run Calculon + opt_cmd = [ + "./bin/calculon", "llm-optimal-execution", + f"models/{model}.json", + str(num_nodes), + str(max_batch_size), + data_type, + f"systems/{system}.json", + str(raw_file), + ] + + llm_cmd = [ + "./bin/calculon", "llm", + f"models/{model}.json", + str(exec_file), + f"systems/{system}.json", + str(stats_file), + ] + + subprocess.run(opt_cmd, check=True, cwd=base_path, env={**os.environ, "PYTHONPATH": "."}) + subprocess.run(llm_cmd, check=True, cwd=base_path, env={**os.environ, "PYTHONPATH": "."}) + + # parse output + with open(raw_file) as f: + data = json.load(f) + first_key = list(data.keys())[0] + stats = data[first_key]["stats"] + + mfu = stats.get("sample_rate", 0) + batch_time = stats.get("block_fw_time", 0) + + return mfu, batch_time diff --git a/scripts/parse_philly_traces.py b/scripts/parse_philly_traces.py new file mode 100644 index 0000000000000000000000000000000000000000..57ce8c2bed61ef07dd49b4e06fb91fbc7112f31a --- /dev/null +++ b/scripts/parse_philly_traces.py @@ -0,0 +1,73 @@ +""" +See raps/dataloaders/philly.py for how to download philly traces. + +Run following to parse philly traces into separate files for each day: + + python /path/to/raps/scripts/parse_philly_traces.py cluster_cpu_util + python /path/to/raps/scripts/parse_philly_traces.py cluster_gpu_util + +This will parse these two files into two directories, cpu_by_day and gpu_by_day, +creating one file for each day and adding the lines for that day into the files. +""" +import os +import sys +from datetime import datetime +from tqdm import tqdm + +if len(sys.argv) < 2: + print("Usage: python parse_by_day.py ") + sys.exit(1) + +input_file = sys.argv[1] + +with open(input_file) as f: + total_lines = sum(1 for _ in f) - 1 + +with open(input_file) as f: + header = f.readline().strip().split(",") + print("Header:", header) + + # detect file type from header + is_cpu = "cpu_util" in [h.lower() for h in header] + + # pick output dir name based on file type + output_dir = "cpu_by_day" if is_cpu else "gpu_by_day" + os.makedirs(output_dir, exist_ok=True) + + for line in tqdm(f, total=total_lines, desc="Processing lines"): + parts = line.strip().split(",") + + if len(parts) < 3: + continue + + raw_time = parts[0].replace(" PST", "").replace(" PDT", "") + try: + ts = datetime.strptime(raw_time, "%Y-%m-%d %H:%M:%S") + except ValueError: + continue + + machine_id = parts[1] + + if is_cpu: + try: + value = float(parts[2]) + except ValueError: + value = 0.0 + label = "cpu_util" + else: + utils = [] + for v in parts[2:]: + try: + utils.append(float(v)) + except ValueError: + pass + value = sum(utils) / max(1, len([u for u in utils if u > 0])) + label = "gpu_util" + + day_str = ts.strftime("%Y-%m-%d") + out_path = os.path.join(output_dir, f"{day_str}.csv") + + with open(out_path, "a") as out: + if out.tell() == 0: # only write header if file is new + out.write(f"time,machine_id,{label}\n") + out.write(f"{ts},{machine_id},{value:.3f}\n")