Commit badf9df6 authored by Brewer, Wes's avatar Brewer, Wes
Browse files

Fix some issues to get calculon workloads running. Add ability to use previously cached results

parent 49d9afa9
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -38,7 +38,7 @@ scheduler:
  seed: 42
  job_arrival_time: 900
  mtbf: 11
  trace_quanta: 15
  trace_quanta: 10
  min_wall_time: 3600
  max_wall_time: 43200
  ui_update_freq: 900
+1 −1
Original line number Diff line number Diff line
@@ -38,7 +38,7 @@ scheduler:
  seed: 42
  job_arrival_time: 900
  mtbf: 11
  trace_quanta: 15
  trace_quanta: 10
  min_wall_time: 3600
  max_wall_time: 43200
  ui_update_freq: 900
+58 −51
Original line number Diff line number Diff line
@@ -21,10 +21,19 @@ Selene and Perlmutter. Example run commands:
    python main.py run --system perlmutter -w calculon

This code is currently setup to generate synthetic traces for four different LLM models:
megatron-22B, gpt3-175B, turing-530B, and megatron-1T. Adjust these by modifying 
llm_model_tests below.
megatron-22B, gpt3-175B, turing-530B, and megatron-1T. These four tests can take a couple
**hours** to run. On first run, consider commenting out the last three models to only test
the smallest case, megatron-22B. The parameter `llm_models_tests` below defines which tests
are run.

Finally, the code below is setup to uses previously cached results, so once the json 
files are generated by Calculon, they can be rerun very quickly again and again. 
The caveat to this is if you want to change some Calculon configurations, 
you will need to delete the cached json files in the calculon/optimal_executions folder,
to force it to regenerate new files.

"""
import math
import json
import os
import random
@@ -33,7 +42,7 @@ from pathlib import Path

import numpy as np

from raps.job import job_dict
from raps.job import Job, job_dict

from .constants import ACCT_NAMES

@@ -69,14 +78,17 @@ class Calculon:
                )

                # derive job stats
                num_iters = 3000
                num_iters = 1000000  # realistic number is probably in the millions
                trace_quanta = config["TRACE_QUANTA"]

                job_time = total_batch_time * num_iters
                num_samples = int(job_time // trace_quanta)
                num_samples = math.ceil(job_time / trace_quanta) + 1
                end_time = num_samples * trace_quanta   # align job to tick grid

                system_util = np.full(num_samples, mfu)
                # use random CPU utilizations for now
                cpu_util = random.random() * config["CPUS_PER_NODE"]
                cpu_trace = cpu_util * np.ones(num_iters)
                cpu_trace = np.full(num_samples, cpu_util)  # same length
                gpu_trace = np.full(num_samples, mfu)   # length matches simulation steps

                net_tx, net_rx = [], []
                num_nodes = num_nodes // config["GPUS_PER_NODE"]
@@ -90,7 +102,7 @@ class Calculon:
                        name=f"{llm_model} training for {num_iters} iterations",
                        account=ACCT_NAMES[0],
                        cpu_trace=cpu_trace,
                        gpu_trace=system_util,
                        gpu_trace=gpu_trace,
                        ntx_trace=net_tx,
                        nrx_trace=net_rx,
                        end_state="COMPLETED",
@@ -99,12 +111,15 @@ class Calculon:
                        partition=partition,
                        time_limit=job_time + 1,
                        start_time=0,
                        end_time=job_time,
                        end_time=end_time,
                        expected_run_time=end_time,
                        trace_quanta=trace_quanta,
                        trace_time=job_time,
                        trace_start_time=0,
                        trace_end_time=job_time,
                    )
                    jobs.append(job_info)
                    job = Job(job_info)
                    jobs.append(job)
                    wall_time += job_time

        return jobs
@@ -112,15 +127,26 @@ class Calculon:
    def _run_calculon(self, model, system, max_batch_size, num_nodes, data_type, output):
        """Internal: run Calculon subprocess and parse result."""
        base_path = Path("third_party/calculon")

        # paths
        model_path = base_path / "models" / f"{model}.json"
        system_path = base_path / "systems" / f"{system}.json"
        raw_path   = base_path / "optimal_executions" / output.replace(".json", "_raw.json")
        exec_path  = base_path / "optimal_executions" / output.replace(".json", "_exec.json")
        stats_path = base_path / "optimal_executions" / output.replace(".json", "_stats.json")

        # Run llm-optimal-execution to generate candidate executions
        output_dir = base_path / "optimal_executions"
        output_dir.mkdir(exist_ok=True)

        # expected files
        raw_file = output_dir / f"{output.replace('.json', '_raw.json')}"
        exec_file = output_dir / f"{output.replace('.json', '_exec.json')}"
        stats_file = output_dir / f"{output.replace('.json', '_stats.json')}"

        # if all three exist, skip running
        if raw_file.exists() and exec_file.exists() and stats_file.exists():
            print(f"[INFO] Using cached Calculon results for {output}")
            with open(raw_file) as f:
                data = json.load(f)
            first_key = list(data.keys())[0]
            stats = data[first_key]["stats"]
            mfu = stats.get("sample_rate", 0)   # or compute MFU if you want
            batch_time = stats.get("block_fw_time", 0)  # example placeholder
            return mfu, batch_time

        # otherwise, run Calculon
        opt_cmd = [
            "./bin/calculon", "llm-optimal-execution",
            f"models/{model}.json",
@@ -128,46 +154,27 @@ class Calculon:
            str(max_batch_size),
            data_type,
            f"systems/{system}.json",
            f"optimal_executions/{output.replace('.json', '_raw.json')}",
            str(raw_file),
        ]
        subprocess.run(opt_cmd, check=True, cwd=base_path, env={**os.environ, "PYTHONPATH": "."})

        # Read raw output, pick first/best execution and dump it as exec.json
        with open(raw_path) as f:
            raw_data = json.load(f)

        # get first (or best) key
        first_key = sorted(raw_data.keys(), key=lambda k: float(k))[0]
        best_exec = raw_data[first_key]["execution"]

        with open(exec_path, "w") as f:
            json.dump(best_exec, f, indent=2)

        # Run llm with chosen execution, system, and model → stats.json
        llm_cmd = [
            "./bin/calculon", "llm",
            f"models/{model}.json",
            f"optimal_executions/{output.replace('.json', '_exec.json')}",
            str(exec_file),
            f"systems/{system}.json",
            f"optimal_executions/{output.replace('.json', '_stats.json')}",
            str(stats_file),
        ]
        subprocess.run(llm_cmd, check=True, cwd=base_path, env={**os.environ, "PYTHONPATH": "."})

        # Parse stats.json to extract metrics
        with open(stats_path) as f:
            stats_data = json.load(f)

        stats = stats_data.get("stats", {})
        subprocess.run(opt_cmd, check=True, cwd=base_path, env={**os.environ, "PYTHONPATH": "."})
        subprocess.run(llm_cmd, check=True, cwd=base_path, env={**os.environ, "PYTHONPATH": "."})

        # These keys may vary depending on Calculon version
        mfu = stats.get("model_flops_utilization") \
            or stats.get("sample_rate") \
            or stats.get("best_sample_rate") \
            or 0.0
        # parse output
        with open(raw_file) as f:
            data = json.load(f)
        first_key = list(data.keys())[0]
        stats = data[first_key]["stats"]

        total_batch_time = stats.get("block_fw_time") \
            or stats.get("batch_time") \
            or stats.get("total_time") \
            or 0.0
        mfu = stats.get("sample_rate", 0)
        batch_time = stats.get("block_fw_time", 0)

        return mfu, total_batch_time
        return mfu, batch_time