Commit 3c8fb136 authored by Brewer, Wes's avatar Brewer, Wes
Browse files

Modify mit_supercloud loader.py to use --arrival poisson, and add --episode_length option

parent ca3c30af
Loading
Loading
Loading
Loading
+51 −0
Original line number Diff line number Diff line
system:
  num_cdus: 12
  racks_per_cdu: 1
  nodes_per_rack: 40
  chassis_per_rack: 8
  nodes_per_blade: 1
  switches_per_chassis: 4
  nics_per_node: 4
  rectifiers_per_chassis: 4
  nodes_per_rectifier: 4
  missing_racks: []
  down_nodes: []
  cpus_per_node: 2
  cores_per_cpu: 24
  gpus_per_node: 0
  cpu_peak_flops: 2995200000000.0
  gpu_peak_flops: 0
  cpu_fp_ratio: 0.667
  gpu_fp_ratio: 0.667
power:
  power_gpu_idle: 88
  power_gpu_max: 560
  power_cpu_idle: 1
  power_cpu_max: 6
  power_mem: 74.26
  power_nvme: 30
  power_nic: 20
  power_cdu: 8473.47
  power_switch: 250
  power_update_freq: 15
  rectifier_peak_threshold: 13670
  sivoc_loss_constant: 13
  sivoc_efficiency: 0.98
  rectifier_loss_constant: 17
  rectifier_efficiency: 0.96
  power_cost: 0.094
scheduler:
  multitenant: true
  job_arrival_time: 1
  mtbf: 11
  trace_quanta: 10
  min_wall_time: 3600
  max_wall_time: 43200
  ui_update_freq: 900
  max_nodes_per_job: 3000
  job_end_probs:
    COMPLETED: 0.63
    FAILED: 0.13
    CANCELLED: 0.12
    TIMEOUT: 0.11
    NODE_FAIL: 0.01

experiments/mitrl.yaml

0 → 100644
+7 −0
Original line number Diff line number Diff line
system: "mit_supercloud"
replay:
  - /opt/data/mit_supercloud
start: 2021-05-21T21:00
end: 2021-05-21T22:00
episode_length: 500
arrival: poisson
+22 −10
Original line number Diff line number Diff line
@@ -118,7 +118,7 @@ from typing import Dict, Union, Optional
from collections import Counter

from raps.job import job_dict, Job
from raps.utils import summarize_ranges
from raps.utils import summarize_ranges, next_arrival
from .utils import proc_cpu_series, proc_gpu_series, to_epoch
from .utils import DEFAULT_START, DEFAULT_END

@@ -209,6 +209,8 @@ def load_data(local_dataset_path, **kwargs):
       jobs_list, sim_start_time, sim_end_time
    """
    debug = kwargs.get("debug")
    config = kwargs.get("config")
    arrival = kwargs.get("arrival")
    NL_PATH = os.path.dirname(__file__)

    skip_counts = Counter()
@@ -302,8 +304,7 @@ def load_data(local_dataset_path, **kwargs):

    # handle single-partition configs (e.g., mit_supercloud.yaml)
    if not cpu_only and not mixed:
        config = kwargs.get("config")
        gpus_per_node = config.get("gpus_per_node")
        gpus_per_node = config.get("GPUS_PER_NODE")

        if gpus_per_node == 0:
            cpu_only = True
@@ -528,7 +529,6 @@ def load_data(local_dataset_path, **kwargs):
    jobs_list = []

    # Get CPUS_PER_NODE and GPUS_PER_NODE from config
    config = kwargs.get('config', {})
    cpus_per_node = config.get('CPUS_PER_NODE')
    cores_per_cpu = config.get('CORES_PER_CPU')
    # gpus_per_node = config.get('GPUS_PER_NODE')  # Unused
@@ -585,7 +585,21 @@ def load_data(local_dataset_path, **kwargs):
        cpu_peak = cpu_cores_req / cores_per_cpu / cpus_per_node  # Is this per CPU?
        cpu_tr = [min(x/cores_per_cpu/cpus_per_node, cpu_peak) for x in cpu_tr]

        submit_time = rec.get("time_submit", t0) - start_ts
        if arrival == "poisson":
            job_arrival_time = config.get("JOB_ARRIVAL_TIME")
            submit_time = next_arrival(1 / job_arrival_time)
            start_time = submit_time
            end_time = None
            scheduled_nodes = None
            telemetry_start = 0
            telemetry_end = 86640
        else:  # replay
            start_time = t0 - start_ts
            end_time = t1 - start_ts
            submit_time = rec.get("time_submit") - start_ts
            scheduled_nodes = rec.get("scheduled_nodes")
            telemetry_start = int(sl.time_start.min())
            telemetry_end = int(sl.time_end.max())

        current_job_dict = job_dict(
            nodes_required=nr,
@@ -599,12 +613,12 @@ def load_data(local_dataset_path, **kwargs):
            nrx_trace=[],
            end_state=rec.get("state_end", "unknown"),
            id=jid,
            scheduled_nodes=rec.get("scheduled_nodes"),
            scheduled_nodes=scheduled_nodes,
            priority=rec.get("priority", 0),
            submit_time=submit_time,
            time_limit=rec.get("timelimit", 0),
            start_time=t0 - start_ts,
            end_time=t1 - start_ts,
            start_time=start_time,
            end_time=end_time,
            expected_run_time=max(0, t1-t0),
            trace_time=len(cpu_tr)*quanta,
            trace_start_time=0,
@@ -615,8 +629,6 @@ def load_data(local_dataset_path, **kwargs):
        jobs_list.append(job)

    # Calculate min_overall_utime and max_overall_utime
    telemetry_start = int(sl.time_start.min())
    telemetry_end = int(sl.time_end.max())
    # min_overall_utime = int(sl.time_submit.min())
    # max_overall_utime = int(sl.time_submit.max())

+57 −27
Original line number Diff line number Diff line
@@ -40,6 +40,8 @@ def print_stats(stats, step=0):
        if section in stats:
            for k, v in stats[section].items():
                if k.lower() in wanted_keys:
                    if k.lower() == "jobs still running" and isinstance(v, list):
                        v = len(v)
                    logger.record(wanted_keys[k.lower()], v)

    logger.dump(step=step)
@@ -103,7 +105,7 @@ class RAPSEnv(gym.Env):
        )

        self.timestep_start = 0
        self.timestep_end = self.config.get("SIM_END", 1000)
        self.timestep_end = getattr(self.cli_args, "episode_length")

        self.generator = self.layout_manager.run_stepwise(
            self.jobs,
@@ -180,7 +182,29 @@ class RAPSEnv(gym.Env):

        return self._get_state()

    def _compute_reward(self, tick_data, alpha=1.0, beta=0.001, gamma=0.1):
    def _compute_reward(self, tick_data):
        """
        Reward function: minimize carbon footprint per job completed.
        Encourages the agent to complete jobs while keeping emissions low.
        """
        reward = 0.0

        # Jobs completed this tick
        jobs_completed = len(getattr(tick_data, "completed", []))

        # Carbon emitted so far (metric tons CO2)
        carbon_so_far = getattr(self.engine, "carbon emissions", 0.0)

        if jobs_completed > 0:
            # Reward is higher when more jobs finish with less carbon
            reward = jobs_completed / (carbon_so_far + 1e-6)
        else:
            # Small penalty if no jobs finished (encourages progress)
            reward = -0.01

        return reward

    def _compute_reward2(self, tick_data, alpha=10.0, beta=1.0, gamma=2.0):
        completed = getattr(tick_data, "completed", None)
        jobs_completed = len(completed) if completed else 0
        power = self.power_manager.history[-1][1]
@@ -188,40 +212,46 @@ class RAPSEnv(gym.Env):

        reward = alpha * jobs_completed - beta * power - gamma * queue_len

        if self.args_dict.get("debug", False):
        print(f"[t={self.engine.current_timestep}] jobs_completed={jobs_completed}, "
              f"power={power}, queue_len={queue_len}, reward={reward}")

        return reward

    def step(self, action):
        chosen_job = None

        # Advance simulation by one step via generator
        try:
            tick_data = next(self.generator)
        except StopIteration:
            # Simulation finished
            return self._get_state(), 0.0, True, {}
        queue = self.engine.queue
        invalid_action = False

        # Store action for scheduler to pick up
        self.scheduler.pending_action = action
        # If queue empty or index out of range → invalid
        if len(queue) == 0 or action >= len(queue):
            invalid_action = True
        else:
            job = queue[int(action)]
            available = len(self.engine.scheduler.resource_manager.available_nodes)
            if job.nodes_required <= available:
                # Valid scheduling
                self.engine.scheduler.place_job_and_manage_queues(
                    job, queue, self.engine.running, self.engine.current_timestep
                )
            else:
                invalid_action = True

        # Advance one step (scheduler.schedule() is called inside generator)
        # advance simulation by one tick
        tick_data = next(self.generator)

        # compute reward
        if invalid_action:
            reward = -1.0
        else:
            reward = self._compute_reward(tick_data)

        obs = self._get_state()
        done = self.engine.current_timestep >= min(self.engine.timestep_end, 1000)
        if done:
        # Print stats
        stats = self.get_stats()
        print_stats(stats)

        info = {
            "scheduled_job": getattr(chosen_job, "id", None),
            "power": getattr(tick_data, "power", 0.0),
            "completed": getattr(tick_data, "completed", []),
        }
        obs = self._get_state()
        done = self.engine.current_timestep >= self.engine.timestep_end
        info = {}

        return obs, reward, done, info

    def _get_state(self):
+4 −0
Original line number Diff line number Diff line
@@ -208,6 +208,10 @@ class SimConfig(BaseModel):
    maxqueue: int = 50
    """ Specify the max queue length for continuous job generation """

    # Reinforcment Learning
    episode_length: int = 1000
    """ Number of timesteps per RL episode (default 1000) """

    @model_validator(mode="before")
    def _parse_times(cls, data):
        time_fields = [
Loading