From 6cf9686075e58ea52943059a64ecf32c28d1ea51 Mon Sep 17 00:00:00 2001 From: Wes Brewer Date: Tue, 26 Aug 2025 17:58:21 -0400 Subject: [PATCH 01/29] Initial implementation of rl framework --- raps/envs/raps_env.py | 149 ++++++++++++++++++++++++++++++++++++++++++ train_rl.py | 17 +++++ 2 files changed, 166 insertions(+) create mode 100644 raps/envs/raps_env.py create mode 100644 train_rl.py diff --git a/raps/envs/raps_env.py b/raps/envs/raps_env.py new file mode 100644 index 0000000..45314dc --- /dev/null +++ b/raps/envs/raps_env.py @@ -0,0 +1,149 @@ +import gym +import numpy as np +from gym import spaces + +from raps.engine import Engine +from raps.power import PowerManager, compute_node_power +from raps.flops import FLOPSManager +from raps.telemetry import Telemetry +from raps.workload import Workload +from raps.ui import LayoutManager + + +class RAPSEnv(gym.Env): + """ + Minimal Gym-compatible wrapper around RAPS Engine + for RL job scheduling experiments. + """ + + metadata = {"render.modes": ["human"]} + + def __init__(self, **kwargs): + super().__init__() + # Store everything in self.args + self.args_dict = kwargs # dict + self.cli_args = kwargs.get("args") # Namespace + self.config = kwargs.get("config") + if self.cli_args is None: + raise ValueError("RAPSEnv requires 'args' (argparse.Namespace) in kwargs") + if self.config is None: + raise ValueError("RAPSEnv requires 'config' in kwargs") + + # --- managers (minimal versions) --- + self.power_manager = PowerManager(compute_node_power, **self.config) + self.flops_manager = FLOPSManager(**self.args_dict) + self.telemetry = Telemetry(**self.args_dict) + + # --- workload (synthetic for now) --- + wl = Workload(self.cli_args, self.config) + jobs = wl.generate_jobs() + #print("***", jobs) + + timestep_start = 0 + #timestep_end = int(max(job.wall_time for job in jobs)) + timestep_end = 100 + + # --- minimal engine instantiation --- + #self.engine = Engine( + # power_manager=self.power_manager, + # flops_manager=self.flops_manager, + # telemetry=self.telemetry, + # jobs=jobs, + # timestep_start=timestep_start, + # timestep_end=timestep_end, + # time_delta=self.args.get("time_delta"), + # continuous_workload=None, + # args=self.args, + # config=self.config + #) + + self.engine = Engine( + power_manager=self.power_manager, + flops_manager=self.flops_manager, + jobs=jobs, + **self.args_dict + ) + + self.layout_manager = LayoutManager( + self.args_dict.get("layout"), engine=self.engine, + debug=self.args_dict.get("debug", False), + total_timesteps=self.args_dict.get("time", 1000), + args_dict=self.args_dict, + **self.config + ) + + # --- RL spaces --- + max_jobs = 100 + job_features = 4 # [nodes, runtime, priority, wait_time] + self.observation_space = spaces.Box( + low=0, high=1, shape=(max_jobs, job_features), dtype=np.float32 + ) + self.action_space = spaces.Discrete(max_jobs) + + def reset(self, **kwargs): + """Reset environment (new workload + engine).""" + wl = Workload(self.cli_args, self.config) + jobs = wl.generate_jobs() + + self.engine.jobs = jobs + self.engine.timestep_start = 0 + #self.engine.timestep_end = int(max(job.wall_time for job in jobs)) + self.engine.timestep_end = 100 + self.engine.current_timestep = 0 + + return self._get_state() + + def step(self, action): + """ + Apply scheduling action. + For now: action = index of job in queue to attempt scheduling. + """ + # TODO: hook into Engine to apply scheduling + # Placeholder: random reward for scaffolding + reward = np.random.rand() + done = self.engine.current_timestep >= self.engine.timestep_end + + obs = self._get_state() + info = { + "utilization": self.telemetry.utilization(), + "power": self.telemetry.power(), + "queue_length": self.telemetry.queue_length(), + } + + self.engine.current_timestep += 1 + return obs, reward, done, info + + def _get_state(self): + """Construct simple state representation from engine's job queue.""" + # Example: take waiting jobs + job_queue = [j for j in self.engine.jobs if not j.started] + + max_jobs, job_features = self.observation_space.shape + state = np.zeros((max_jobs, job_features), dtype=np.float32) + + for i, job in enumerate(job_queue[:max_jobs]): + # fill with features of interest; adapt to what Job exposes + features = [ + getattr(job, "nodes_required", 0), + getattr(job, "wall_time", 0), + getattr(job, "priority", 0), + getattr(job, "wait_time", 0), + ] + state[i, :len(features)] = features + + return state + + #def _get_state(self): + # """Very simple state vector: truncate/pad job queue.""" + # jobs = self.telemetry.get_job_queue_features() + # max_jobs, job_features = self.observation_space.shape + # state = np.zeros((max_jobs, job_features), dtype=np.float32) +# +# for i, job in enumerate(jobs[:max_jobs]): +# state[i, : len(job)] = job +# return state + + def render(self, mode="human"): + print("Timestep:", self.engine.current_timestep, + "Utilization:", self.telemetry.utilization(), + "Power:", self.telemetry.power()) diff --git a/train_rl.py b/train_rl.py new file mode 100644 index 0000000..4a997b6 --- /dev/null +++ b/train_rl.py @@ -0,0 +1,17 @@ +import gym +from stable_baselines3 import PPO +from raps.envs.raps_env import RAPSEnv +from raps.system_config import get_system_config +from raps.sim_config import args, args_dict + +config = get_system_config(args.system).get_legacy() +args_dict['config'] = config +args_dict['args'] = args + +env = RAPSEnv(**args_dict) + +model = PPO("MlpPolicy", env, verbose=1) +model.learn(total_timesteps=10000) + +# Save trained model +model.save("ppo_raps") -- GitLab From d7caed6109b94ebfd72c5da2ec6e560d37ea0437 Mon Sep 17 00:00:00 2001 From: Wes Brewer Date: Tue, 26 Aug 2025 18:05:05 -0400 Subject: [PATCH 02/29] Got initial version working --- raps/envs/raps_env.py | 61 ++++++++++++++----------------------------- 1 file changed, 19 insertions(+), 42 deletions(-) diff --git a/raps/envs/raps_env.py b/raps/envs/raps_env.py index 45314dc..ec65c4c 100644 --- a/raps/envs/raps_env.py +++ b/raps/envs/raps_env.py @@ -21,8 +21,8 @@ class RAPSEnv(gym.Env): def __init__(self, **kwargs): super().__init__() # Store everything in self.args - self.args_dict = kwargs # dict - self.cli_args = kwargs.get("args") # Namespace + self.args_dict = kwargs # dict + self.cli_args = kwargs.get("args") # Namespace self.config = kwargs.get("config") if self.cli_args is None: raise ValueError("RAPSEnv requires 'args' (argparse.Namespace) in kwargs") @@ -37,25 +37,6 @@ class RAPSEnv(gym.Env): # --- workload (synthetic for now) --- wl = Workload(self.cli_args, self.config) jobs = wl.generate_jobs() - #print("***", jobs) - - timestep_start = 0 - #timestep_end = int(max(job.wall_time for job in jobs)) - timestep_end = 100 - - # --- minimal engine instantiation --- - #self.engine = Engine( - # power_manager=self.power_manager, - # flops_manager=self.flops_manager, - # telemetry=self.telemetry, - # jobs=jobs, - # timestep_start=timestep_start, - # timestep_end=timestep_end, - # time_delta=self.args.get("time_delta"), - # continuous_workload=None, - # args=self.args, - # config=self.config - #) self.engine = Engine( power_manager=self.power_manager, @@ -87,7 +68,7 @@ class RAPSEnv(gym.Env): self.engine.jobs = jobs self.engine.timestep_start = 0 - #self.engine.timestep_end = int(max(job.wall_time for job in jobs)) + # self.engine.timestep_end = int(max(job.wall_time for job in jobs)) self.engine.timestep_end = 100 self.engine.current_timestep = 0 @@ -98,16 +79,23 @@ class RAPSEnv(gym.Env): Apply scheduling action. For now: action = index of job in queue to attempt scheduling. """ - # TODO: hook into Engine to apply scheduling - # Placeholder: random reward for scaffolding + # TODO: integrate action with real scheduling logic reward = np.random.rand() done = self.engine.current_timestep >= self.engine.timestep_end obs = self._get_state() + + # Compute info manually + running_nodes = sum(getattr(j, "nodes_required", 0) + for j in self.engine.jobs + if getattr(j, "start_time", None) is not None) + total_nodes = self.config.get("SC_NODES", 1) + utilization = running_nodes / total_nodes + info = { - "utilization": self.telemetry.utilization(), - "power": self.telemetry.power(), - "queue_length": self.telemetry.queue_length(), + "utilization": utilization, + "power": getattr(self.power_manager, "total_power", 0.0), + "queue_length": len([j for j in self.engine.jobs if getattr(j, "start_time", None) is None]), } self.engine.current_timestep += 1 @@ -115,34 +103,23 @@ class RAPSEnv(gym.Env): def _get_state(self): """Construct simple state representation from engine's job queue.""" - # Example: take waiting jobs - job_queue = [j for j in self.engine.jobs if not j.started] + # Example: take waiting jobs (haven’t started yet) + job_queue = [j for j in self.engine.jobs if getattr(j, "start_time", None) is None] max_jobs, job_features = self.observation_space.shape state = np.zeros((max_jobs, job_features), dtype=np.float32) for i, job in enumerate(job_queue[:max_jobs]): - # fill with features of interest; adapt to what Job exposes features = [ getattr(job, "nodes_required", 0), getattr(job, "wall_time", 0), getattr(job, "priority", 0), - getattr(job, "wait_time", 0), + getattr(job, "wait_time", 0), # may need to compute from current_timestep - qdt ] - state[i, :len(features)] = features + state[i, : len(features)] = features return state - #def _get_state(self): - # """Very simple state vector: truncate/pad job queue.""" - # jobs = self.telemetry.get_job_queue_features() - # max_jobs, job_features = self.observation_space.shape - # state = np.zeros((max_jobs, job_features), dtype=np.float32) -# -# for i, job in enumerate(jobs[:max_jobs]): -# state[i, : len(job)] = job -# return state - def render(self, mode="human"): print("Timestep:", self.engine.current_timestep, "Utilization:", self.telemetry.utilization(), -- GitLab From e44917571c5d2eb76bda435689f4086ff98b125e Mon Sep 17 00:00:00 2001 From: Wes Brewer Date: Tue, 26 Aug 2025 19:50:16 -0400 Subject: [PATCH 03/29] Add real reward function - previously was using random reward --- raps/envs/raps_env.py | 67 +++++++++++++++++++++++++++++++------------ 1 file changed, 48 insertions(+), 19 deletions(-) diff --git a/raps/envs/raps_env.py b/raps/envs/raps_env.py index ec65c4c..8f7dbe8 100644 --- a/raps/envs/raps_env.py +++ b/raps/envs/raps_env.py @@ -74,31 +74,60 @@ class RAPSEnv(gym.Env): return self._get_state() + def _compute_reward(self, tick_data, alpha=1.0, beta=0.001, gamma=0.1): + completed = getattr(tick_data, "completed", None) + jobs_completed = len(completed) if completed else 0 + power = getattr(tick_data, "power", 0.0) or 0.0 + queue_len = len(self.engine.queue) + + reward = alpha * jobs_completed - beta * power - gamma * queue_len + + if self.args_dict.get("debug", False): + print(f"[t={self.engine.current_timestep}] jobs_completed={jobs_completed}, " + f"power={power}, queue_len={queue_len}, reward={reward}") + + return reward + def step(self, action): - """ - Apply scheduling action. - For now: action = index of job in queue to attempt scheduling. - """ - # TODO: integrate action with real scheduling logic - reward = np.random.rand() - done = self.engine.current_timestep >= self.engine.timestep_end + # 1. Jobs waiting in the queue + job_queue = list(self.engine.queue) + chosen_job = None - obs = self._get_state() + if job_queue and action < len(job_queue): + chosen_job = job_queue[action] + + # 2. Let RAPS handle all scheduling logic + self.engine.scheduler.place_job_and_manage_queues( + chosen_job, + self.engine.queue, + self.engine.running, + self.engine.current_timestep, + ) - # Compute info manually - running_nodes = sum(getattr(j, "nodes_required", 0) - for j in self.engine.jobs - if getattr(j, "start_time", None) is not None) - total_nodes = self.config.get("SC_NODES", 1) - utilization = running_nodes / total_nodes + # 3. Advance simulation by one tick + # Update bookkeeping so tick() doesn't crash + if not hasattr(self.engine, "num_active_nodes"): + self.engine.num_active_nodes = 0 + if not hasattr(self.engine, "num_free_nodes"): + self.engine.num_free_nodes = self.config["AVAILABLE_NODES"] + + self.engine.num_active_nodes = sum(len(j.scheduled_nodes) for j in self.engine.running) + self.engine.num_free_nodes = self.config["AVAILABLE_NODES"] - self.engine.num_active_nodes + + tick_data = self.engine.tick() + + # 4. Compute reward (throughput vs. power) + reward = self._compute_reward(tick_data) + + # 5. Build next observation + obs = self._get_state() + done = self.engine.current_timestep >= self.engine.timestep_end info = { - "utilization": utilization, - "power": getattr(self.power_manager, "total_power", 0.0), - "queue_length": len([j for j in self.engine.jobs if getattr(j, "start_time", None) is None]), + "scheduled_job": getattr(chosen_job, "id", None), + "power": getattr(tick_data, "power", None), + "completed": getattr(tick_data, "completed", []), } - - self.engine.current_timestep += 1 return obs, reward, done, info def _get_state(self): -- GitLab From 45f13b461225f27e24bcb45c3f24cc320f057d2a Mon Sep 17 00:00:00 2001 From: Wes Brewer Date: Tue, 26 Aug 2025 20:00:24 -0400 Subject: [PATCH 04/29] Expose more settings to PPO --- train_rl.py | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/train_rl.py b/train_rl.py index 4a997b6..9cefbd4 100644 --- a/train_rl.py +++ b/train_rl.py @@ -1,4 +1,3 @@ -import gym from stable_baselines3 import PPO from raps.envs.raps_env import RAPSEnv from raps.system_config import get_system_config @@ -10,7 +9,18 @@ args_dict['args'] = args env = RAPSEnv(**args_dict) -model = PPO("MlpPolicy", env, verbose=1) +model = PPO( + "MlpPolicy", + env, + n_steps=512, # shorter rollouts (quicker feedback loop) + batch_size=128, # must divide n_steps evenly + n_epochs=10, # # of minibatch passes per update + gamma=0.99, # discount (keeps long-term credit) + learning_rate=3e-4, # default Adam lr, can try 1e-4 if unstable + ent_coef=0.01, # encourage exploration + verbose=1, +) + model.learn(total_timesteps=10000) # Save trained model -- GitLab From 90188c2d832331ae8ca5c701b673ed1820ddeb5e Mon Sep 17 00:00:00 2001 From: Wes Brewer Date: Tue, 26 Aug 2025 20:53:25 -0400 Subject: [PATCH 05/29] Fix issues with time not advancing and power not computed correctly --- raps/envs/raps_env.py | 41 +++++++++++++++++------------------------ 1 file changed, 17 insertions(+), 24 deletions(-) diff --git a/raps/envs/raps_env.py b/raps/envs/raps_env.py index 8f7dbe8..398009c 100644 --- a/raps/envs/raps_env.py +++ b/raps/envs/raps_env.py @@ -53,6 +53,13 @@ class RAPSEnv(gym.Env): **self.config ) + self.generator = self.layout_manager.run_stepwise( + jobs, + timestep_start=0, + timestep_end=self.config.get("SIM_END", 1000), + time_delta=self.args_dict.get("time_delta", 1), + ) + # --- RL spaces --- max_jobs = 100 job_features = 4 # [nodes, runtime, priority, wait_time] @@ -77,7 +84,7 @@ class RAPSEnv(gym.Env): def _compute_reward(self, tick_data, alpha=1.0, beta=0.001, gamma=0.1): completed = getattr(tick_data, "completed", None) jobs_completed = len(completed) if completed else 0 - power = getattr(tick_data, "power", 0.0) or 0.0 + power = self.power_manager.history[-1][1] queue_len = len(self.engine.queue) reward = alpha * jobs_completed - beta * power - gamma * queue_len @@ -89,43 +96,29 @@ class RAPSEnv(gym.Env): return reward def step(self, action): - # 1. Jobs waiting in the queue job_queue = list(self.engine.queue) chosen_job = None if job_queue and action < len(job_queue): chosen_job = job_queue[action] - - # 2. Let RAPS handle all scheduling logic self.engine.scheduler.place_job_and_manage_queues( - chosen_job, - self.engine.queue, - self.engine.running, - self.engine.current_timestep, + chosen_job, self.engine.queue, self.engine.running, self.engine.current_timestep ) - # 3. Advance simulation by one tick - # Update bookkeeping so tick() doesn't crash - if not hasattr(self.engine, "num_active_nodes"): - self.engine.num_active_nodes = 0 - if not hasattr(self.engine, "num_free_nodes"): - self.engine.num_free_nodes = self.config["AVAILABLE_NODES"] + # Advance simulation by one step via generator + try: + tick_data = next(self.generator) + except StopIteration: + # Simulation finished + return self._get_state(), 0.0, True, {} - self.engine.num_active_nodes = sum(len(j.scheduled_nodes) for j in self.engine.running) - self.engine.num_free_nodes = self.config["AVAILABLE_NODES"] - self.engine.num_active_nodes - - tick_data = self.engine.tick() - - # 4. Compute reward (throughput vs. power) reward = self._compute_reward(tick_data) - - # 5. Build next observation obs = self._get_state() - done = self.engine.current_timestep >= self.engine.timestep_end + done = self.engine.current_timestep >= min(self.engine.timestep_end, 1000) info = { "scheduled_job": getattr(chosen_job, "id", None), - "power": getattr(tick_data, "power", None), + "power": getattr(tick_data, "power", 0.0), "completed": getattr(tick_data, "completed", []), } return obs, reward, done, info -- GitLab From 414c0f22a661c786f1b87e330b8d39b489ea054e Mon Sep 17 00:00:00 2001 From: Wes Brewer Date: Tue, 26 Aug 2025 21:39:07 -0400 Subject: [PATCH 06/29] Add new RL scheduler schedulers/rl.py --- raps/envs/raps_env.py | 31 ++++++++++++++++++++++++------- raps/resmgr/default.py | 3 ++- raps/schedulers/rl.py | 35 +++++++++++++++++++++++++++++++++++ 3 files changed, 61 insertions(+), 8 deletions(-) create mode 100644 raps/schedulers/rl.py diff --git a/raps/envs/raps_env.py b/raps/envs/raps_env.py index 398009c..97fa77e 100644 --- a/raps/envs/raps_env.py +++ b/raps/envs/raps_env.py @@ -8,6 +8,9 @@ from raps.flops import FLOPSManager from raps.telemetry import Telemetry from raps.workload import Workload from raps.ui import LayoutManager +from raps.schedulers.rl import Scheduler +# from raps.resmgr.default import MultiTenantResourceManager as ResourceManager +from raps.resmgr.default import ExclusiveNodeResourceManager as ResourceManager class RAPSEnv(gym.Env): @@ -45,6 +48,21 @@ class RAPSEnv(gym.Env): **self.args_dict ) + resmgr = ResourceManager( + total_nodes=self.config["TOTAL_NODES"], + down_nodes=self.config.get("DOWN_NODES", []), + config=self.config + ) + + # Plug in RL scheduler + self.scheduler = Scheduler( + config=self.config, + policy="fcfs", # or None if you want no heuristic fallback + resource_manager=resmgr, + env=self + ) + self.engine.scheduler = self.scheduler + self.layout_manager = LayoutManager( self.args_dict.get("layout"), engine=self.engine, debug=self.args_dict.get("debug", False), @@ -96,15 +114,8 @@ class RAPSEnv(gym.Env): return reward def step(self, action): - job_queue = list(self.engine.queue) chosen_job = None - if job_queue and action < len(job_queue): - chosen_job = job_queue[action] - self.engine.scheduler.place_job_and_manage_queues( - chosen_job, self.engine.queue, self.engine.running, self.engine.current_timestep - ) - # Advance simulation by one step via generator try: tick_data = next(self.generator) @@ -112,7 +123,13 @@ class RAPSEnv(gym.Env): # Simulation finished return self._get_state(), 0.0, True, {} + # Store action for scheduler to pick up + self.scheduler.pending_action = action + + # Advance one step (scheduler.schedule() is called inside generator) + tick_data = next(self.generator) reward = self._compute_reward(tick_data) + obs = self._get_state() done = self.engine.current_timestep >= min(self.engine.timestep_end, 1000) diff --git a/raps/resmgr/default.py b/raps/resmgr/default.py index c7791f5..1429a5f 100644 --- a/raps/resmgr/default.py +++ b/raps/resmgr/default.py @@ -42,7 +42,8 @@ class ExclusiveNodeResourceManager: """Assigns full nodes to a job (replay or count-based).""" # Ensure enough free nodes if len(self.available_nodes) < job.nodes_required: - raise ValueError(f"Not enough available nodes to schedule job {job.id}") + raise ValueError(f"Not enough available nodes to schedule job {job.id}", + f"{len(self.available_nodes)} < {job.nodes_required}") if policy == PolicyType.REPLAY and job.scheduled_nodes: # Telemetry replay: use the exact nodes diff --git a/raps/schedulers/rl.py b/raps/schedulers/rl.py new file mode 100644 index 0000000..2272e9d --- /dev/null +++ b/raps/schedulers/rl.py @@ -0,0 +1,35 @@ +from raps.schedulers.default import Scheduler as DefaultScheduler + + +class Scheduler(DefaultScheduler): + """ + Scheduler driven by RL agent actions. + RAPSEnv.step(action) sets env.pending_action, + then RLScheduler.schedule() reads it and acts. + """ + + def __init__(self, config, policy, resource_manager, env=None, *args, **kwargs): + super().__init__(config=config, policy=policy, resource_manager=resource_manager, *args, **kwargs) + self.env = env + self.pending_action = None + + def schedule(self, queue, running, current_time, **kwargs): + if not queue or self.pending_action is None: + return + + action = self.pending_action + if action >= len(queue): + return + + job = queue[action] + + # Check feasibility + if job.nodes_required <= len(self.resource_manager.available_nodes): + self.place_job_and_manage_queues(job, queue, running, current_time) + else: + # Invalid action → skip or log + if self.config.args.get("debug", False): + print(f"[t={current_time}] RL chose invalid job {job.id} (needs {job.nodes_required})") + + # Reset action after use + self.pending_action = None -- GitLab From ba563312a9f37c85304e4d6a99395e326eccae9b Mon Sep 17 00:00:00 2001 From: Wes Brewer Date: Wed, 27 Aug 2025 15:39:37 -0400 Subject: [PATCH 07/29] Modify mit_supercloud/loader.py to support single partition simulations --- raps/dataloaders/mit_supercloud/loader.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/raps/dataloaders/mit_supercloud/loader.py b/raps/dataloaders/mit_supercloud/loader.py index 6057210..abe0092 100644 --- a/raps/dataloaders/mit_supercloud/loader.py +++ b/raps/dataloaders/mit_supercloud/loader.py @@ -300,6 +300,18 @@ def load_data(local_dataset_path, **kwargs): cpu_only = (part == "part-cpu") mixed = (part == "part-gpu") + # handle single-partition configs (e.g., mit_supercloud.yaml) + if not cpu_only and not mixed: + config = kwargs.get("config") + gpus_per_node = config.get("gpus_per_node") + + if gpus_per_node == 0: + cpu_only = True + part = "part-cpu" + else: + mixed = True + part = "part-gpu" + # create nodelist mapping if cpu_only: with open(os.path.join(NL_PATH, "cpu_nodes.txt")) as f: -- GitLab From 00420f05e2f63b18ac74095e3ec3280c87e4a31b Mon Sep 17 00:00:00 2001 From: Wes Brewer Date: Wed, 27 Aug 2025 16:06:00 -0400 Subject: [PATCH 08/29] Fix bug in mit_supercloud/loader.py --- raps/dataloaders/mit_supercloud/loader.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/raps/dataloaders/mit_supercloud/loader.py b/raps/dataloaders/mit_supercloud/loader.py index abe0092..dfa9e44 100644 --- a/raps/dataloaders/mit_supercloud/loader.py +++ b/raps/dataloaders/mit_supercloud/loader.py @@ -602,7 +602,7 @@ def load_data(local_dataset_path, **kwargs): scheduled_nodes=rec.get("scheduled_nodes"), priority=rec.get("priority", 0), submit_time=submit_time, - time_limit=rec.get("time_limit", 0), + time_limit=rec.get("timelimit", 0), start_time=t0 - start_ts, end_time=t1 - start_ts, expected_run_time=max(0, t1-t0), -- GitLab From 456688064518ed0ab30cd34dc8647ad404fb1528 Mon Sep 17 00:00:00 2001 From: Wes Brewer Date: Wed, 27 Aug 2025 16:11:56 -0400 Subject: [PATCH 09/29] More fixes to mit_supercloud. Fix experiments/mit.yaml and README.md to be consistent with recent changes --- README.md | 9 +++++---- experiments/mit.yaml | 3 +-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index 665a64a..831ef63 100644 --- a/README.md +++ b/README.md @@ -62,16 +62,17 @@ For MIT Supercloud python -m raps.dataloaders.mit_supercloud.cli download --start 2021-05-21T13:00 --end 2021-05-21T14:00 # Load data and run simulation - will save data as part-cpu.npz and part-gpu.npz files - python multi-part-sim.py -x 'mit_supercloud/*' -f $DPATH --system mit_supercloud \ - --start 2021-05-21T13:00 --end 2021-05-21T14:00 + python multi-part-sim.py -x mit_supercloud -f $DPATH --start 2021-05-21T13:00 --end 2021-05-21T14:00 + # or simply + python multi-part-sim.py experiments/mit.yaml # Note: if no start, end dates provided will default to run 24 hours between # 2021-05-21T00:00 to 2021-05-22T00:00 set by defaults in raps/dataloaders/mit_supercloud/utils.py # Re-run simulation using npz files (much faster load) - python multi-part-sim.py -x mit_supercloud/* -f part-*.npz --system mit_supercloud + python multi-part-sim.py -x mit_supercloud -f part-*.npz # Synthetic tests for verification studies: - python multi-part-sim.py -x 'mit_supercloud/*' -w multitenant + python multi-part-sim.py -x mit_supercloud -w multitenant For Lumi diff --git a/experiments/mit.yaml b/experiments/mit.yaml index bc718e4..83892f9 100644 --- a/experiments/mit.yaml +++ b/experiments/mit.yaml @@ -1,6 +1,5 @@ -system: mit_supercloud partitions: ["mit_supercloud/part-cpu", "mit_supercloud/part-gpu"] replay: - - ~/data/mit/202201 + - /opt/data/mit_supercloud start: 2021-05-21T13:00 end: 2021-05-21T14:00 -- GitLab From d6ddb07acaf721877d494dcb54ac9429ad68476b Mon Sep 17 00:00:00 2001 From: Wes Brewer Date: Wed, 27 Aug 2025 16:51:24 -0400 Subject: [PATCH 10/29] Get RL working with mit_supercloud telemetry --- raps/envs/raps_env.py | 83 ++++++++++++++++++++++++++++++++++--------- 1 file changed, 66 insertions(+), 17 deletions(-) diff --git a/raps/envs/raps_env.py b/raps/envs/raps_env.py index 97fa77e..31d7aa1 100644 --- a/raps/envs/raps_env.py +++ b/raps/envs/raps_env.py @@ -37,14 +37,13 @@ class RAPSEnv(gym.Env): self.flops_manager = FLOPSManager(**self.args_dict) self.telemetry = Telemetry(**self.args_dict) - # --- workload (synthetic for now) --- - wl = Workload(self.cli_args, self.config) - jobs = wl.generate_jobs() + # --- Build initial jobs & time bounds --- + self.jobs, self.timestep_start, self.timestep_end = self._build_jobs() self.engine = Engine( power_manager=self.power_manager, flops_manager=self.flops_manager, - jobs=jobs, + jobs=self.jobs, **self.args_dict ) @@ -71,11 +70,14 @@ class RAPSEnv(gym.Env): **self.config ) + self.timestep_start = 0 + self.timestep_end = self.config.get("SIM_END", 1000) + self.generator = self.layout_manager.run_stepwise( - jobs, - timestep_start=0, - timestep_end=self.config.get("SIM_END", 1000), - time_delta=self.args_dict.get("time_delta", 1), + self.jobs, + timestep_start=self.timestep_start, + timestep_end=self.timestep_end, + time_delta=self.args_dict.get("time_delta"), ) # --- RL spaces --- @@ -86,16 +88,63 @@ class RAPSEnv(gym.Env): ) self.action_space = spaces.Discrete(max_jobs) + def _build_jobs(self): + """ + Build a job list either from synthetic workload (--workload) + or from telemetry replay (--replay). + Returns: jobs, timestep_start, timestep_end + """ + # --- Case 1: Telemetry replay --- + if self.cli_args and getattr(self.cli_args, "replay"): + result = self.telemetry.load_jobs_times_args_from_files( + files=self.cli_args.replay, + args=self.cli_args, + config=self.config, + ) + + # Handle 3-tuple vs 4-tuple return + if len(result) == 3: + jobs, start_time, end_time = result + elif len(result) == 4: + jobs, start_time, end_time, _ = result + else: + raise ValueError(f"Unexpected telemetry return format: {len(result)} values") + + # Flatten partitioned jobs if necessary + if jobs and isinstance(jobs[0], list): + jobs = [job for sublist in jobs for job in sublist] + + return jobs, start_time, end_time + + # --- Case 2: Synthetic workload generation --- + elif self.cli_args and getattr(self.cli_args, "workload"): + wl = Workload(self.cli_args, self.config) + jobs = wl.generate_jobs() + + # For synthetic jobs, compute timestep_end from submit + run_time + timestep_start = 0 + timestep_end = max( + (getattr(job, "end_time", None) or getattr(job, "expected_run_time", 0) + job.submit_time) + for job in jobs + ) + return jobs, timestep_start, timestep_end + + # --- Error: neither replay nor workload specified --- + else: + raise ValueError("RAPSEnv requires either --workload or --replay to build jobs.") + def reset(self, **kwargs): - """Reset environment (new workload + engine).""" - wl = Workload(self.cli_args, self.config) - jobs = wl.generate_jobs() - - self.engine.jobs = jobs - self.engine.timestep_start = 0 - # self.engine.timestep_end = int(max(job.wall_time for job in jobs)) - self.engine.timestep_end = 100 - self.engine.current_timestep = 0 + self.engine.jobs = self.jobs + self.engine.timestep_start = self.timestep_start + self.engine.timestep_end = self.timestep_end + self.engine.current_timestep = self.timestep_start + + self.generator = self.layout_manager.run_stepwise( + self.jobs, + timestep_start=self.timestep_start, + timestep_end=self.timestep_end, + time_delta=self.args_dict.get("time_delta", 1), + ) return self._get_state() -- GitLab From 3ce571eabc1dc6dcaa8fa10bcf143afbb7c38ef3 Mon Sep 17 00:00:00 2001 From: Wes Brewer Date: Wed, 27 Aug 2025 23:16:28 -0400 Subject: [PATCH 11/29] Output engine stats after train_rl.py training run --- raps/engine.py | 36 +++++++++++++++++++----------------- raps/envs/raps_env.py | 9 +++++++++ raps/stats.py | 5 ++++- train_rl.py | 8 +++++++- 4 files changed, 39 insertions(+), 19 deletions(-) diff --git a/raps/engine.py b/raps/engine.py index f79b140..b4aa713 100644 --- a/raps/engine.py +++ b/raps/engine.py @@ -363,23 +363,25 @@ class Engine: return simulation_complete def tick(self, *, time_delta=1, replay=False): - # Tick runs all simulations of interest at the given time delta interval. - # - # The simulations which are needed for simulations consistency at each time step - # (inside: the main simulation loop of run_simulation) are not part of tick. - # - # Tick contains: - # For each running job: - # - CPU utilization - # - GPU utilization - # - Network utilization - # - # From these the systems (across all nodes) - # - System Utilization - # - Power - # - Cooling - # - System Performance - # is simulated. + """ + Tick runs all simulations of interest at the given time delta interval. + + The simulations which are needed for simulations consistency at each time step + (inside: the main simulation loop of run_simulation) are not part of tick. + + Tick contains: + For each running job: + - CPU utilization + - GPU utilization + - Network utilization + + From these the systems (across all nodes) + - System Utilization + - Power + - Cooling + - System Performance + is simulated. + """ scheduled_nodes = [] cpu_utils = [] diff --git a/raps/envs/raps_env.py b/raps/envs/raps_env.py index 31d7aa1..fc13ab3 100644 --- a/raps/envs/raps_env.py +++ b/raps/envs/raps_env.py @@ -11,6 +11,7 @@ from raps.ui import LayoutManager from raps.schedulers.rl import Scheduler # from raps.resmgr.default import MultiTenantResourceManager as ResourceManager from raps.resmgr.default import ExclusiveNodeResourceManager as ResourceManager +from raps.stats import get_engine_stats, get_job_stats, get_scheduler_stats, get_network_stats class RAPSEnv(gym.Env): @@ -212,3 +213,11 @@ class RAPSEnv(gym.Env): print("Timestep:", self.engine.current_timestep, "Utilization:", self.telemetry.utilization(), "Power:", self.telemetry.power()) + + def get_stats(self): + return { + "engine_stats": get_engine_stats(self.engine), + "job_stats": get_job_stats(self.engine), + "scheduler_stats": get_scheduler_stats(self.engine), + "network_stats": get_network_stats(self.engine) + } diff --git a/raps/stats.py b/raps/stats.py index b4cfbfb..23158cd 100644 --- a/raps/stats.py +++ b/raps/stats.py @@ -46,7 +46,10 @@ def get_engine_stats(engine: Engine): # Multitenancy Stats total_jobs_loaded = engine.total_initial_jobs # Assuming this is passed to __init__ stats['total jobs loaded'] = total_jobs_loaded - stats['jobs completed percentage'] = f"{(engine.jobs_completed / total_jobs_loaded * 100):.2f}%" + if total_jobs_loaded > 0: + stats['jobs completed percentage'] = f"{(engine.jobs_completed / total_jobs_loaded * 100):.2f}%" + else: + stats['jobs completed percentage'] = "0%" if engine.node_occupancy_history: # Calculate average concurrent jobs per node (average density across all nodes and timesteps) diff --git a/train_rl.py b/train_rl.py index 9cefbd4..40026a0 100644 --- a/train_rl.py +++ b/train_rl.py @@ -2,6 +2,7 @@ from stable_baselines3 import PPO from raps.envs.raps_env import RAPSEnv from raps.system_config import get_system_config from raps.sim_config import args, args_dict +from raps.stats import print_formatted_report config = get_system_config(args.system).get_legacy() args_dict['config'] = config @@ -19,9 +20,14 @@ model = PPO( learning_rate=3e-4, # default Adam lr, can try 1e-4 if unstable ent_coef=0.01, # encourage exploration verbose=1, + tensorboard_log="./ppo_raps_logs/" ) -model.learn(total_timesteps=10000) +model.learn(total_timesteps=10000, tb_log_name="ppo_raps") + +# Output stats +stats = env.get_stats() +print_formatted_report(**stats) # Save trained model model.save("ppo_raps") -- GitLab From 040c248a85fa5db4704331729b5996d018406998 Mon Sep 17 00:00:00 2001 From: Wes Brewer Date: Thu, 28 Aug 2025 12:09:18 -0400 Subject: [PATCH 12/29] Add stats output after each train rl episode --- raps/envs/raps_env.py | 32 ++++++++++++++++++++++++++++++++ train_rl.py | 2 -- 2 files changed, 32 insertions(+), 2 deletions(-) diff --git a/raps/envs/raps_env.py b/raps/envs/raps_env.py index fc13ab3..1046d06 100644 --- a/raps/envs/raps_env.py +++ b/raps/envs/raps_env.py @@ -1,5 +1,6 @@ import gym import numpy as np +import os from gym import spaces from raps.engine import Engine @@ -14,6 +15,34 @@ from raps.resmgr.default import ExclusiveNodeResourceManager as ResourceManager from raps.stats import get_engine_stats, get_job_stats, get_scheduler_stats, get_network_stats +def print_stats(stats): + os.system("clear") + wanted_keys = [ + "time simulated", + "average power", + "system power efficiency", + "total energy consumed", + "carbon emissions", + "jobs completed", + "throughput", + "jobs still running" + ] + + # merge just engine_stats + job_stats + combined = {} + for section in ["engine_stats", "job_stats"]: + if section in stats: + for k, v in stats[section].items(): + if k.lower() in wanted_keys: + pretty_key = k.replace("_", " ").title() + combined[pretty_key] = v + + # align only left column, leave right "ragged" + max_key_len = max(len(k) for k in combined.keys()) + for k, v in combined.items(): + print(f"{k.ljust(max_key_len)} | {v}") + + class RAPSEnv(gym.Env): """ Minimal Gym-compatible wrapper around RAPS Engine @@ -182,6 +211,9 @@ class RAPSEnv(gym.Env): obs = self._get_state() done = self.engine.current_timestep >= min(self.engine.timestep_end, 1000) + if done: + stats = self.get_stats() + print_stats(stats) info = { "scheduled_job": getattr(chosen_job, "id", None), diff --git a/train_rl.py b/train_rl.py index 40026a0..e98f7a3 100644 --- a/train_rl.py +++ b/train_rl.py @@ -2,7 +2,6 @@ from stable_baselines3 import PPO from raps.envs.raps_env import RAPSEnv from raps.system_config import get_system_config from raps.sim_config import args, args_dict -from raps.stats import print_formatted_report config = get_system_config(args.system).get_legacy() args_dict['config'] = config @@ -27,7 +26,6 @@ model.learn(total_timesteps=10000, tb_log_name="ppo_raps") # Output stats stats = env.get_stats() -print_formatted_report(**stats) # Save trained model model.save("ppo_raps") -- GitLab From 38c867de2aa55e85096721be7330c97a4f065fce Mon Sep 17 00:00:00 2001 From: Wes Brewer Date: Thu, 28 Aug 2025 12:16:33 -0400 Subject: [PATCH 13/29] Move to use SB3-style logger for stats output --- raps/envs/raps_env.py | 46 ++++++++++++++++++++++--------------------- 1 file changed, 24 insertions(+), 22 deletions(-) diff --git a/raps/envs/raps_env.py b/raps/envs/raps_env.py index 1046d06..580b349 100644 --- a/raps/envs/raps_env.py +++ b/raps/envs/raps_env.py @@ -1,6 +1,5 @@ import gym import numpy as np -import os from gym import spaces from raps.engine import Engine @@ -14,33 +13,36 @@ from raps.schedulers.rl import Scheduler from raps.resmgr.default import ExclusiveNodeResourceManager as ResourceManager from raps.stats import get_engine_stats, get_job_stats, get_scheduler_stats, get_network_stats +from stable_baselines3.common.logger import Logger, HumanOutputFormat +import sys + +logger = Logger( + folder=None, # no log file, just stdout + output_formats=[HumanOutputFormat(sys.stdout)] +) + + +def print_stats(stats, step=0): + """prints SB3-style stats output""" + + wanted_keys = { + "time simulated": "engine/Time Simulated", + "average power": "engine/Average Power", + "system power efficiency": "engine/System Power Efficiency", + "total energy consumed": "engine/Total Energy Consumed", + "carbon emissions": "engine/Carbon Emissions", + "jobs completed": "jobs/Jobs Completed", + "throughput": "jobs/Throughput", + "jobs still running": "jobs/Jobs Still Running", + } -def print_stats(stats): - os.system("clear") - wanted_keys = [ - "time simulated", - "average power", - "system power efficiency", - "total energy consumed", - "carbon emissions", - "jobs completed", - "throughput", - "jobs still running" - ] - - # merge just engine_stats + job_stats - combined = {} for section in ["engine_stats", "job_stats"]: if section in stats: for k, v in stats[section].items(): if k.lower() in wanted_keys: - pretty_key = k.replace("_", " ").title() - combined[pretty_key] = v + logger.record(wanted_keys[k.lower()], v) - # align only left column, leave right "ragged" - max_key_len = max(len(k) for k in combined.keys()) - for k, v in combined.items(): - print(f"{k.ljust(max_key_len)} | {v}") + logger.dump(step=step) class RAPSEnv(gym.Env): -- GitLab From 5283b6e66e684e0f328c13a7bced40b003633a89 Mon Sep 17 00:00:00 2001 From: Wes Brewer Date: Thu, 28 Aug 2025 14:55:57 -0400 Subject: [PATCH 14/29] Fix bug in telemetry.py when trying to load npz file with --arrival or --scale --- raps/telemetry.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/raps/telemetry.py b/raps/telemetry.py index f485daa..ee0e9fb 100644 --- a/raps/telemetry.py +++ b/raps/telemetry.py @@ -282,14 +282,14 @@ class Telemetry: if hasattr(args, 'scale') and args.scale: for job in tqdm(jobs, desc=f"Scaling jobs to {args.scale} nodes"): - job['nodes_required'] = random.randint(1, args.scale) - job['scheduled_nodes'] = None # Setting to None triggers scheduler to assign nodes + job.nodes_required = random.randint(1, args.scale) + job.scheduled_nodes = None # Setting to None triggers scheduler to assign nodes if hasattr(args, 'arrival') and args.arrival == 'poisson': print("available nodes:", config['AVAILABLE_NODES']) for job in tqdm(jobs, desc="Rescheduling jobs"): - job['scheduled_nodes'] = None - job['submit_time'] = next_arrival_byconfargs(config, args) + job.scheduled_nodes = None + job.submit_time = next_arrival_byconfargs(config, args) else: trigger_custom_dataloader = True break -- GitLab From ca3c30af4cb9253e77fec93fdc6f1a80d70aca06 Mon Sep 17 00:00:00 2001 From: Wes Brewer Date: Thu, 28 Aug 2025 14:57:37 -0400 Subject: [PATCH 15/29] Point all experiment data paths to /opt/data instead of ~/data --- experiments/frontier.yaml | 4 ++-- experiments/gcloudv2.yaml | 2 +- experiments/lassen.yaml | 2 +- experiments/marconi100.yaml | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/experiments/frontier.yaml b/experiments/frontier.yaml index f865a19..280d95f 100644 --- a/experiments/frontier.yaml +++ b/experiments/frontier.yaml @@ -1,4 +1,4 @@ system: frontier replay: - - ~/data/frontier/slurm/joblive/date=2024-01-18 - - ~/data/frontier/jobprofile/date=2024-01-18 + - /opt/data/frontier/slurm/joblive/date=2024-01-18 + - /opt/data/frontier/jobprofile/date=2024-01-18 diff --git a/experiments/gcloudv2.yaml b/experiments/gcloudv2.yaml index 85a1d6c..db8e218 100644 --- a/experiments/gcloudv2.yaml +++ b/experiments/gcloudv2.yaml @@ -1,4 +1,4 @@ system: gcloudv2 replay: - - ~/data/gcloud/v2/google_cluster_data_2011_sample + - /opt/data/gcloud/v2/google_cluster_data_2011_sample ff: 600 diff --git a/experiments/lassen.yaml b/experiments/lassen.yaml index 5434a1b..7ee04be 100644 --- a/experiments/lassen.yaml +++ b/experiments/lassen.yaml @@ -1,6 +1,6 @@ system: lassen replay: - - ~/data/lassen/Lassen-Supercomputer-Job-Dataset + - /opt/data/lassen/Lassen-Supercomputer-Job-Dataset policy: fcfs backfill: firstfit fastforward: 365d diff --git a/experiments/marconi100.yaml b/experiments/marconi100.yaml index 0568157..8592229 100644 --- a/experiments/marconi100.yaml +++ b/experiments/marconi100.yaml @@ -1,3 +1,3 @@ system: marconi100 replay: - - ~/data/marconi100/job_table.parquet + - /opt/data/marconi100/job_table.parquet -- GitLab From 3c8fb136f305d031a2d2acfc4d6a3dac5e353200 Mon Sep 17 00:00:00 2001 From: Wes Brewer Date: Thu, 28 Aug 2025 16:40:09 -0400 Subject: [PATCH 16/29] Modify mit_supercloud loader.py to use --arrival poisson, and add --episode_length option --- config/mit_supercloud.yaml | 51 ++++++++++++++ experiments/mitrl.yaml | 7 ++ raps/dataloaders/mit_supercloud/loader.py | 32 ++++++--- raps/envs/raps_env.py | 84 +++++++++++++++-------- raps/sim_config.py | 4 ++ raps/stats.py | 6 +- 6 files changed, 144 insertions(+), 40 deletions(-) create mode 100644 config/mit_supercloud.yaml create mode 100644 experiments/mitrl.yaml diff --git a/config/mit_supercloud.yaml b/config/mit_supercloud.yaml new file mode 100644 index 0000000..b780b10 --- /dev/null +++ b/config/mit_supercloud.yaml @@ -0,0 +1,51 @@ +system: + num_cdus: 12 + racks_per_cdu: 1 + nodes_per_rack: 40 + chassis_per_rack: 8 + nodes_per_blade: 1 + switches_per_chassis: 4 + nics_per_node: 4 + rectifiers_per_chassis: 4 + nodes_per_rectifier: 4 + missing_racks: [] + down_nodes: [] + cpus_per_node: 2 + cores_per_cpu: 24 + gpus_per_node: 0 + cpu_peak_flops: 2995200000000.0 + gpu_peak_flops: 0 + cpu_fp_ratio: 0.667 + gpu_fp_ratio: 0.667 +power: + power_gpu_idle: 88 + power_gpu_max: 560 + power_cpu_idle: 1 + power_cpu_max: 6 + power_mem: 74.26 + power_nvme: 30 + power_nic: 20 + power_cdu: 8473.47 + power_switch: 250 + power_update_freq: 15 + rectifier_peak_threshold: 13670 + sivoc_loss_constant: 13 + sivoc_efficiency: 0.98 + rectifier_loss_constant: 17 + rectifier_efficiency: 0.96 + power_cost: 0.094 +scheduler: + multitenant: true + job_arrival_time: 1 + mtbf: 11 + trace_quanta: 10 + min_wall_time: 3600 + max_wall_time: 43200 + ui_update_freq: 900 + max_nodes_per_job: 3000 + job_end_probs: + COMPLETED: 0.63 + FAILED: 0.13 + CANCELLED: 0.12 + TIMEOUT: 0.11 + NODE_FAIL: 0.01 diff --git a/experiments/mitrl.yaml b/experiments/mitrl.yaml new file mode 100644 index 0000000..c0adbfe --- /dev/null +++ b/experiments/mitrl.yaml @@ -0,0 +1,7 @@ +system: "mit_supercloud" +replay: + - /opt/data/mit_supercloud +start: 2021-05-21T21:00 +end: 2021-05-21T22:00 +episode_length: 500 +arrival: poisson diff --git a/raps/dataloaders/mit_supercloud/loader.py b/raps/dataloaders/mit_supercloud/loader.py index dfa9e44..ab08f69 100644 --- a/raps/dataloaders/mit_supercloud/loader.py +++ b/raps/dataloaders/mit_supercloud/loader.py @@ -118,7 +118,7 @@ from typing import Dict, Union, Optional from collections import Counter from raps.job import job_dict, Job -from raps.utils import summarize_ranges +from raps.utils import summarize_ranges, next_arrival from .utils import proc_cpu_series, proc_gpu_series, to_epoch from .utils import DEFAULT_START, DEFAULT_END @@ -209,6 +209,8 @@ def load_data(local_dataset_path, **kwargs): jobs_list, sim_start_time, sim_end_time """ debug = kwargs.get("debug") + config = kwargs.get("config") + arrival = kwargs.get("arrival") NL_PATH = os.path.dirname(__file__) skip_counts = Counter() @@ -302,8 +304,7 @@ def load_data(local_dataset_path, **kwargs): # handle single-partition configs (e.g., mit_supercloud.yaml) if not cpu_only and not mixed: - config = kwargs.get("config") - gpus_per_node = config.get("gpus_per_node") + gpus_per_node = config.get("GPUS_PER_NODE") if gpus_per_node == 0: cpu_only = True @@ -528,7 +529,6 @@ def load_data(local_dataset_path, **kwargs): jobs_list = [] # Get CPUS_PER_NODE and GPUS_PER_NODE from config - config = kwargs.get('config', {}) cpus_per_node = config.get('CPUS_PER_NODE') cores_per_cpu = config.get('CORES_PER_CPU') # gpus_per_node = config.get('GPUS_PER_NODE') # Unused @@ -585,7 +585,21 @@ def load_data(local_dataset_path, **kwargs): cpu_peak = cpu_cores_req / cores_per_cpu / cpus_per_node # Is this per CPU? cpu_tr = [min(x/cores_per_cpu/cpus_per_node, cpu_peak) for x in cpu_tr] - submit_time = rec.get("time_submit", t0) - start_ts + if arrival == "poisson": + job_arrival_time = config.get("JOB_ARRIVAL_TIME") + submit_time = next_arrival(1 / job_arrival_time) + start_time = submit_time + end_time = None + scheduled_nodes = None + telemetry_start = 0 + telemetry_end = 86640 + else: # replay + start_time = t0 - start_ts + end_time = t1 - start_ts + submit_time = rec.get("time_submit") - start_ts + scheduled_nodes = rec.get("scheduled_nodes") + telemetry_start = int(sl.time_start.min()) + telemetry_end = int(sl.time_end.max()) current_job_dict = job_dict( nodes_required=nr, @@ -599,12 +613,12 @@ def load_data(local_dataset_path, **kwargs): nrx_trace=[], end_state=rec.get("state_end", "unknown"), id=jid, - scheduled_nodes=rec.get("scheduled_nodes"), + scheduled_nodes=scheduled_nodes, priority=rec.get("priority", 0), submit_time=submit_time, time_limit=rec.get("timelimit", 0), - start_time=t0 - start_ts, - end_time=t1 - start_ts, + start_time=start_time, + end_time=end_time, expected_run_time=max(0, t1-t0), trace_time=len(cpu_tr)*quanta, trace_start_time=0, @@ -615,8 +629,6 @@ def load_data(local_dataset_path, **kwargs): jobs_list.append(job) # Calculate min_overall_utime and max_overall_utime - telemetry_start = int(sl.time_start.min()) - telemetry_end = int(sl.time_end.max()) # min_overall_utime = int(sl.time_submit.min()) # max_overall_utime = int(sl.time_submit.max()) diff --git a/raps/envs/raps_env.py b/raps/envs/raps_env.py index 580b349..c5cd2f2 100644 --- a/raps/envs/raps_env.py +++ b/raps/envs/raps_env.py @@ -40,6 +40,8 @@ def print_stats(stats, step=0): if section in stats: for k, v in stats[section].items(): if k.lower() in wanted_keys: + if k.lower() == "jobs still running" and isinstance(v, list): + v = len(v) logger.record(wanted_keys[k.lower()], v) logger.dump(step=step) @@ -103,7 +105,7 @@ class RAPSEnv(gym.Env): ) self.timestep_start = 0 - self.timestep_end = self.config.get("SIM_END", 1000) + self.timestep_end = getattr(self.cli_args, "episode_length") self.generator = self.layout_manager.run_stepwise( self.jobs, @@ -180,7 +182,29 @@ class RAPSEnv(gym.Env): return self._get_state() - def _compute_reward(self, tick_data, alpha=1.0, beta=0.001, gamma=0.1): + def _compute_reward(self, tick_data): + """ + Reward function: minimize carbon footprint per job completed. + Encourages the agent to complete jobs while keeping emissions low. + """ + reward = 0.0 + + # Jobs completed this tick + jobs_completed = len(getattr(tick_data, "completed", [])) + + # Carbon emitted so far (metric tons CO2) + carbon_so_far = getattr(self.engine, "carbon emissions", 0.0) + + if jobs_completed > 0: + # Reward is higher when more jobs finish with less carbon + reward = jobs_completed / (carbon_so_far + 1e-6) + else: + # Small penalty if no jobs finished (encourages progress) + reward = -0.01 + + return reward + + def _compute_reward2(self, tick_data, alpha=10.0, beta=1.0, gamma=2.0): completed = getattr(tick_data, "completed", None) jobs_completed = len(completed) if completed else 0 power = self.power_manager.history[-1][1] @@ -188,40 +212,46 @@ class RAPSEnv(gym.Env): reward = alpha * jobs_completed - beta * power - gamma * queue_len - if self.args_dict.get("debug", False): - print(f"[t={self.engine.current_timestep}] jobs_completed={jobs_completed}, " - f"power={power}, queue_len={queue_len}, reward={reward}") + print(f"[t={self.engine.current_timestep}] jobs_completed={jobs_completed}, " + f"power={power}, queue_len={queue_len}, reward={reward}") return reward def step(self, action): - chosen_job = None + queue = self.engine.queue + invalid_action = False - # Advance simulation by one step via generator - try: - tick_data = next(self.generator) - except StopIteration: - # Simulation finished - return self._get_state(), 0.0, True, {} - - # Store action for scheduler to pick up - self.scheduler.pending_action = action + # If queue empty or index out of range → invalid + if len(queue) == 0 or action >= len(queue): + invalid_action = True + else: + job = queue[int(action)] + available = len(self.engine.scheduler.resource_manager.available_nodes) + if job.nodes_required <= available: + # Valid scheduling + self.engine.scheduler.place_job_and_manage_queues( + job, queue, self.engine.running, self.engine.current_timestep + ) + else: + invalid_action = True - # Advance one step (scheduler.schedule() is called inside generator) + # advance simulation by one tick tick_data = next(self.generator) - reward = self._compute_reward(tick_data) + + # compute reward + if invalid_action: + reward = -1.0 + else: + reward = self._compute_reward(tick_data) + + # Print stats + stats = self.get_stats() + print_stats(stats) obs = self._get_state() - done = self.engine.current_timestep >= min(self.engine.timestep_end, 1000) - if done: - stats = self.get_stats() - print_stats(stats) - - info = { - "scheduled_job": getattr(chosen_job, "id", None), - "power": getattr(tick_data, "power", 0.0), - "completed": getattr(tick_data, "completed", []), - } + done = self.engine.current_timestep >= self.engine.timestep_end + info = {} + return obs, reward, done, info def _get_state(self): diff --git a/raps/sim_config.py b/raps/sim_config.py index 127cec3..e0b68e2 100644 --- a/raps/sim_config.py +++ b/raps/sim_config.py @@ -208,6 +208,10 @@ class SimConfig(BaseModel): maxqueue: int = 50 """ Specify the max queue length for continuous job generation """ + # Reinforcment Learning + episode_length: int = 1000 + """ Number of timesteps per RL episode (default 1000) """ + @model_validator(mode="before") def _parse_times(cls, data): time_fields = [ diff --git a/raps/stats.py b/raps/stats.py index 23158cd..aa8610e 100644 --- a/raps/stats.py +++ b/raps/stats.py @@ -32,13 +32,13 @@ def get_engine_stats(engine: Engine): stats = { 'time simulated': time_simulated, 'num_samples': num_samples, - 'average power': f'{average_power_mw:.2f} MW', - 'min loss': f'{min_loss_mw:.2f} MW', + 'average power': f'{average_power_mw:.4f} MW', + 'min loss': f'{min_loss_mw:.4f} MW', 'average loss': f'{average_loss_mw:.2f} MW', 'max loss': f'{max_loss_mw:.2f} MW', 'system power efficiency': f'{efficiency * 100:.2f}%', 'total energy consumed': f'{total_energy_consumed:.2f} MW-hr', - 'carbon emissions': f'{emissions:.2f} metric tons CO2', + 'carbon emissions': f'{emissions:.4f} metric tons CO2', 'total cost': f'${total_cost:.2f}' } -- GitLab From c80d121d67e3ccf8ebd3c2a4301a06146d50f683 Mon Sep 17 00:00:00 2001 From: Wes Brewer Date: Tue, 9 Sep 2025 21:13:08 -0400 Subject: [PATCH 17/29] Quite a few changes to get the final RL results for the IEEE HPEC paper --- config/mit_supercloud.yaml | 52 +--- config/mit_supercloud/part-cpu.yaml | 2 +- experiments/mit.yaml | 4 +- experiments/mitrl.yaml | 4 +- raps/dataloaders/frontier.py | 32 ++- raps/dataloaders/mit_supercloud/loader.py | 2 +- raps/engine.py | 4 +- raps/envs/raps_env.py | 122 +++++++-- raps/resmgr/default.py | 3 +- raps/workload.py | 290 +++++++++++----------- train_rl.py | 4 + 11 files changed, 276 insertions(+), 243 deletions(-) mode change 100644 => 120000 config/mit_supercloud.yaml diff --git a/config/mit_supercloud.yaml b/config/mit_supercloud.yaml deleted file mode 100644 index b780b10..0000000 --- a/config/mit_supercloud.yaml +++ /dev/null @@ -1,51 +0,0 @@ -system: - num_cdus: 12 - racks_per_cdu: 1 - nodes_per_rack: 40 - chassis_per_rack: 8 - nodes_per_blade: 1 - switches_per_chassis: 4 - nics_per_node: 4 - rectifiers_per_chassis: 4 - nodes_per_rectifier: 4 - missing_racks: [] - down_nodes: [] - cpus_per_node: 2 - cores_per_cpu: 24 - gpus_per_node: 0 - cpu_peak_flops: 2995200000000.0 - gpu_peak_flops: 0 - cpu_fp_ratio: 0.667 - gpu_fp_ratio: 0.667 -power: - power_gpu_idle: 88 - power_gpu_max: 560 - power_cpu_idle: 1 - power_cpu_max: 6 - power_mem: 74.26 - power_nvme: 30 - power_nic: 20 - power_cdu: 8473.47 - power_switch: 250 - power_update_freq: 15 - rectifier_peak_threshold: 13670 - sivoc_loss_constant: 13 - sivoc_efficiency: 0.98 - rectifier_loss_constant: 17 - rectifier_efficiency: 0.96 - power_cost: 0.094 -scheduler: - multitenant: true - job_arrival_time: 1 - mtbf: 11 - trace_quanta: 10 - min_wall_time: 3600 - max_wall_time: 43200 - ui_update_freq: 900 - max_nodes_per_job: 3000 - job_end_probs: - COMPLETED: 0.63 - FAILED: 0.13 - CANCELLED: 0.12 - TIMEOUT: 0.11 - NODE_FAIL: 0.01 diff --git a/config/mit_supercloud.yaml b/config/mit_supercloud.yaml new file mode 120000 index 0000000..2167597 --- /dev/null +++ b/config/mit_supercloud.yaml @@ -0,0 +1 @@ +mit_supercloud/part-gpu.yaml \ No newline at end of file diff --git a/config/mit_supercloud/part-cpu.yaml b/config/mit_supercloud/part-cpu.yaml index 111882d..b780b10 100644 --- a/config/mit_supercloud/part-cpu.yaml +++ b/config/mit_supercloud/part-cpu.yaml @@ -36,7 +36,7 @@ power: power_cost: 0.094 scheduler: multitenant: true - job_arrival_time: 900 + job_arrival_time: 1 mtbf: 11 trace_quanta: 10 min_wall_time: 3600 diff --git a/experiments/mit.yaml b/experiments/mit.yaml index 83892f9..3dcd692 100644 --- a/experiments/mit.yaml +++ b/experiments/mit.yaml @@ -1,5 +1,5 @@ partitions: ["mit_supercloud/part-cpu", "mit_supercloud/part-gpu"] replay: - /opt/data/mit_supercloud -start: 2021-05-21T13:00 -end: 2021-05-21T14:00 +start: 2021-05-21T00:00 +end: 2021-05-22T00:00 diff --git a/experiments/mitrl.yaml b/experiments/mitrl.yaml index c0adbfe..3bbd988 100644 --- a/experiments/mitrl.yaml +++ b/experiments/mitrl.yaml @@ -1,7 +1,7 @@ system: "mit_supercloud" replay: - /opt/data/mit_supercloud -start: 2021-05-21T21:00 -end: 2021-05-21T22:00 +start: 2021-05-21T00:00 +end: 2021-05-22T00:00 episode_length: 500 arrival: poisson diff --git a/raps/dataloaders/frontier.py b/raps/dataloaders/frontier.py index 8491617..e749fa8 100644 --- a/raps/dataloaders/frontier.py +++ b/raps/dataloaders/frontier.py @@ -118,7 +118,7 @@ def load_data_from_df(jobs_df: pd.DataFrame, jobprofile_df: pd.DataFrame, **kwar - end_time # Maybe Null - expected_run_time (end_time - start_time) # Maybe Null - current_run_time (How long did the job run already, when loading) # Maybe zero - - trace_time (lenght of each trace in seconds) # Maybe Null + - trace_time (length of each trace in seconds) # Maybe Null - trace_start_time (time offset in seconds after which the trace starts) # Maybe Null - trace_end_time (time offset in seconds after which the trace ends) # Maybe Null - trace_quanta (job's associated trace quanta, to correctly replay with different trace quanta) # Maybe Null @@ -269,8 +269,8 @@ def load_data_from_df(jobs_df: pd.DataFrame, jobprofile_df: pd.DataFrame, **kwar if arrival == 'poisson': # Modify the arrival times of the jobs according to Poisson distribution scheduled_nodes = None submit_time = next_arrival_byconfkwargs(config, kwargs) - start_time = None # ? - end_time = None # ? + end_time = submit_time + end_time - start_time + start_time = submit_time priority = aging_boost(nodes_required) else: # Prescribed replay @@ -281,24 +281,20 @@ def load_data_from_df(jobs_df: pd.DataFrame, jobprofile_df: pd.DataFrame, **kwar indices = xname_to_index(xname, config) scheduled_nodes.append(indices) + if end_time < telemetry_start: + print("Job ends before first recorded telemetry entry:", job_id, "start:", + start_time, "end:", end_time, " Telemetry: ", len(gpu_trace), "entries.") + continue # skip + + if start_time > telemetry_end: + print("Job starts after last recorded telemetry entry:", job_id, "start:", + start_time, "end:", end_time, " Telemetry: ", len(gpu_trace), "entries.") + continue # skip + # Throw out jobs that are not valid! if gpu_trace.size == 0: print("ignoring job b/c zero trace:", jidx, submit_time, start_time, nodes_required) - continue # SKIP! - if end_time < telemetry_start: - # raise ValueError("Job ends before frist recorded telemetry entry:", - # job_id, "start:", start_time,"end:",end_time, - # " Telemetry: ", len(gpu_trace), "entries.") - print("Job ends before frist recorded telemetry entry:", job_id, "start:", - start_time, "end:", end_time, " Telemetry: ", len(gpu_trace), "entries.") - continue # SKIP! - if start_time > telemetry_end: - # raise ValueError("Job starts after last recorded telemetry entry:", - # job_id, "start:", start_time,"end:",end_time, - # " Telemetry: ", len(gpu_trace), "entries.") - print("Job starts after last recorded telemetry entry:", job_id, "start:", - start_time, "end:", end_time, " Telemetry: ", len(gpu_trace), "entries.") - continue # SKIP! + continue # skip if gpu_trace.size > 0 and (jid == job_id or jid == '*'): # and time_submit >= 0: job_info = job_dict( diff --git a/raps/dataloaders/mit_supercloud/loader.py b/raps/dataloaders/mit_supercloud/loader.py index ab08f69..d0e32f4 100644 --- a/raps/dataloaders/mit_supercloud/loader.py +++ b/raps/dataloaders/mit_supercloud/loader.py @@ -616,7 +616,7 @@ def load_data(local_dataset_path, **kwargs): scheduled_nodes=scheduled_nodes, priority=rec.get("priority", 0), submit_time=submit_time, - time_limit=rec.get("timelimit", 0), + time_limit=rec.get("timelimit") * 60, start_time=start_time, end_time=end_time, expected_run_time=max(0, t1-t0), diff --git a/raps/engine.py b/raps/engine.py index b4aa713..89fc400 100644 --- a/raps/engine.py +++ b/raps/engine.py @@ -402,7 +402,7 @@ class Engine: job.running_time = self.current_timestep - job.start_time if job.current_state != JobState.RUNNING: - raise ValueError(f"Job is in running list, but state is not RUNNING: job.state == {job.currentstate}") + raise ValueError(f"Job is in running list, but state is not RUNNING: job.state == {job.current_state}") else: # if job.state == JobState.RUNNING: # Error checks if job.running_time > job.time_limit: @@ -609,7 +609,7 @@ class Engine: # listener_thread = threading.Thread(target=keyboard_listener, args=(sim_state,), daemon=True) # listener_thread.start() - while self.current_timestep < self.timestep_end: # Runs every seconds! + while self.current_timestep < self.timestep_end: # Runs every second if sim_state.is_paused(): time.sleep(0.1) diff --git a/raps/envs/raps_env.py b/raps/envs/raps_env.py index c5cd2f2..bf3a161 100644 --- a/raps/envs/raps_env.py +++ b/raps/envs/raps_env.py @@ -1,6 +1,7 @@ +import copy import gym -import numpy as np from gym import spaces +import numpy as np from raps.engine import Engine from raps.power import PowerManager, compute_node_power @@ -30,7 +31,7 @@ def print_stats(stats, step=0): "average power": "engine/Average Power", "system power efficiency": "engine/System Power Efficiency", "total energy consumed": "engine/Total Energy Consumed", - "carbon emissions": "engine/Carbon Emissions", + "carbon emissions": "engine/Carbon Footprint", "jobs completed": "jobs/Jobs Completed", "throughput": "jobs/Throughput", "jobs still running": "jobs/Jobs Still Running", @@ -73,6 +74,7 @@ class RAPSEnv(gym.Env): # --- Build initial jobs & time bounds --- self.jobs, self.timestep_start, self.timestep_end = self._build_jobs() + self.original_jobs = self.jobs # keep pristine version self.engine = Engine( power_manager=self.power_manager, @@ -167,11 +169,60 @@ class RAPSEnv(gym.Env): else: raise ValueError("RAPSEnv requires either --workload or --replay to build jobs.") +# def reset(self, seed=None, options=None): +# super().reset(seed=seed) +# +# self.jobs = copy.deepcopy(self.original_jobs) # working copy +# +# # Reset engine +# self.engine.current_timestep = 0 +# #self.engine.reset() # or clear state manually +# power_manager = PowerManager(compute_node_power, **self.config) +# flops_manager = FLOPSManager(**self.args_dict) +# telemetry = Telemetry(**self.args_dict) +# jobs, timestep_start, timestep_end = self._build_jobs() +# +# self.engine = Engine( +# power_manager=power_manager, +# flops_manager=flops_manager, +# jobs=jobs, +# **self.args_dict +# ) +# +# self.engine.timestep_start = timestep_start +# self.engine.timestep_end = timestep_end +# #self.engine.current_timestep = timestep_start +# +# # Restart generator +# self.generator = self.layout_manager.run_stepwise( +# self.jobs, +# timestep_start=self.timestep_start, +# timestep_end=self.timestep_end, +# time_delta=self.args_dict.get("time_delta"), +# ) +# +# return self._get_state(), {} + def reset(self, **kwargs): - self.engine.jobs = self.jobs - self.engine.timestep_start = self.timestep_start - self.engine.timestep_end = self.timestep_end - self.engine.current_timestep = self.timestep_start + completed = [j.id for j in self.jobs if j.current_state.name == "COMPLETED"] + print(f"[RESET] Jobs already completed before deepcopy: {len(completed)}") + + super().reset(seed=42) + # self.engine.jobs = self.jobs + self.jobs = copy.deepcopy(self.original_jobs) # working copy + + # self.engine.timestep_start = self.timestep_start + # self.engine.timestep_end = self.timestep_end + # self.engine.reset(self.jobs, self.timestep_start, self.timestep_end) + + # self.engine.current_timestep = self.timestep_start + + # self.engine.jobs = self.jobs # repoint engine to fresh jobs + # self.engine.completed_jobs = [] + # self.engine.queue.clear() + # self.engine.running.clear() + # self.engine.power_manager.history.clear() + # self.engine.jobs_completed = 0 self.generator = self.layout_manager.run_stepwise( self.jobs, @@ -184,26 +235,51 @@ class RAPSEnv(gym.Env): def _compute_reward(self, tick_data): """ - Reward function: minimize carbon footprint per job completed. - Encourages the agent to complete jobs while keeping emissions low. + Reward function for RL scheduling on Frontier-like systems. + Balances throughput and carbon footprint, using incremental values. """ - reward = 0.0 - # Jobs completed this tick - jobs_completed = len(getattr(tick_data, "completed", [])) + # How many jobs completed *this tick* + jobs_done = len(getattr(tick_data, "completed", [])) - # Carbon emitted so far (metric tons CO2) - carbon_so_far = getattr(self.engine, "carbon emissions", 0.0) + # Incremental carbon emitted this tick + carbon_step = getattr(self.engine, "carbon emissions", 0.0) - if jobs_completed > 0: - # Reward is higher when more jobs finish with less carbon - reward = jobs_completed / (carbon_so_far + 1e-6) - else: - # Small penalty if no jobs finished (encourages progress) - reward = -0.01 + # Tradeoff weights (tunable hyperparameters) + alpha = 10.0 # reward for finishing a job + beta = 0.1 # penalty per metric ton CO2 + + # Reward = (jobs * alpha) - (carbon * beta) + reward = (alpha * jobs_done) - (beta * carbon_step) + + # Small penalty if idle and no jobs complete + if jobs_done == 0 and carbon_step == 0: + reward -= 0.01 return reward +# def _compute_reward(self, tick_data): +# """ +# Reward function: minimize carbon footprint per job completed. +# Encourages the agent to complete jobs while keeping emissions low. +# """ +# reward = 0.0 +# +# # Jobs completed this tick +# jobs_completed = len(getattr(tick_data, "completed", [])) +# +# # Carbon emitted so far (metric tons CO2) +# carbon_so_far = getattr(self.engine, "carbon emissions", 0.0) +# +# if jobs_completed > 0: +# # Reward is higher when more jobs finish with less carbon +# reward = jobs_completed / (carbon_so_far + 1e-6) +# else: +# # Small penalty if no jobs finished (encourages progress) +# reward = -0.01 +# +# return reward + def _compute_reward2(self, tick_data, alpha=10.0, beta=1.0, gamma=2.0): completed = getattr(tick_data, "completed", None) jobs_completed = len(completed) if completed else 0 @@ -252,12 +328,18 @@ class RAPSEnv(gym.Env): done = self.engine.current_timestep >= self.engine.timestep_end info = {} + print(f"t={self.engine.current_timestep}, " + f"queue={len(self.engine.queue)}, " + f"running={len(self.engine.running)}, " + f"completed={self.engine.jobs_completed}", + f"action={action}") + return obs, reward, done, info def _get_state(self): """Construct simple state representation from engine's job queue.""" # Example: take waiting jobs (haven’t started yet) - job_queue = [j for j in self.engine.jobs if getattr(j, "start_time", None) is None] + job_queue = [j for j in self.jobs if getattr(j, "start_time", None) is None] max_jobs, job_features = self.observation_space.shape state = np.zeros((max_jobs, job_features), dtype=np.float32) diff --git a/raps/resmgr/default.py b/raps/resmgr/default.py index 1429a5f..339f0ed 100644 --- a/raps/resmgr/default.py +++ b/raps/resmgr/default.py @@ -66,7 +66,8 @@ class ExclusiveNodeResourceManager: if n not in self.available_nodes: self.available_nodes.append(n) else: - raise KeyError(f"node was free but already in available nodes: {n.id}") + # Already free — log instead of raising + print(f"[WARN] Tried to free node {n}, but it was already available") self.available_nodes = sorted(self.available_nodes) def update_system_utilization(self, current_time, running_jobs): diff --git a/raps/workload.py b/raps/workload.py index 151e2c3..f02649c 100644 --- a/raps/workload.py +++ b/raps/workload.py @@ -642,6 +642,151 @@ class Workload: return jobs + def multitenant(self, **kwargs): + """ + Generate deterministic jobs to validate multitenant scheduling & power. + + Parameters + ---------- + mode : str + One of: + - 'ONE_JOB_PER_NODE_ALL_CORES' + - 'TWO_JOBS_PER_NODE_SPLIT' + - 'STAGGERED_JOBS_PER_NODE' + wall_time : int + Duration (seconds) of each job (default: 3600) + trace_quanta : int + Sampling interval for traces; defaults to config['TRACE_QUANTA'] + + Returns + ------- + list[dict] + List of job_dict entries. + """ + mode = kwargs.get('mode', 'TWO_JOBS_PER_NODE_SPLIT') + wall_time = kwargs.get('wall_time', 3600) + + jobs = [] + + for partition in self.partitions: + cfg = self.config_map[partition] + trace_quanta = kwargs.get('trace_quanta', cfg['TRACE_QUANTA']) + + cores_per_cpu = cfg.get('CORES_PER_CPU', 1) + cpus_per_node = cfg.get('CPUS_PER_NODE', 1) + cores_per_node = cores_per_cpu * cpus_per_node + gpus_per_node = cfg.get('GPUS_PER_NODE', 0) + + n_nodes = cfg['AVAILABLE_NODES'] + + def make_trace(cpu_util, gpu_util): + return self.compute_traces(cpu_util, gpu_util, wall_time, trace_quanta) + + job_id_ctr = 0 + + if mode == 'ONE_JOB_PER_NODE_ALL_CORES': + # Each node runs one job that consumes all cores/GPUs + for nid in range(n_nodes): + cpu_trace, gpu_trace = make_trace(cores_per_node, gpus_per_node) + jobs.append(Job(job_dict( + nodes_required=1, + cpu_cores_required=cores_per_node, + gpu_units_required=gpus_per_node, + name=f"MT_full_node_{partition}_{nid}", + account=random.choice(ACCT_NAMES), + cpu_trace=cpu_trace, + gpu_trace=gpu_trace, + ntx_trace=[], nrx_trace=[], + end_state='COMPLETED', + id=job_id_ctr, + priority=random.randint(0, MAX_PRIORITY), + partition=partition, + submit_time=0, + time_limit=wall_time, + start_time=0, + end_time=wall_time, + expected_run_time=wall_time, + trace_time=wall_time, + trace_start_time=0, + trace_end_time=wall_time, + trace_quanta=cfg['TRACE_QUANTA'] + ))) + job_id_ctr += 1 + + elif mode == 'TWO_JOBS_PER_NODE_SPLIT': + # Two jobs per node: split CPU/GPU roughly in half + for nid in range(n_nodes): + cpu_a = cores_per_node // 2 + cpu_b = cores_per_node - cpu_a + gpu_a = gpus_per_node // 2 + gpu_b = gpus_per_node - gpu_a + + for idx, (c_req, g_req, tag) in enumerate([(cpu_a, gpu_a, 'A'), + (cpu_b, gpu_b, 'B')]): + cpu_trace, gpu_trace = make_trace(c_req, g_req) + jobs.append(Job(job_dict( + nodes_required=1, # still one node; multitenant RM packs cores + cpu_cores_required=c_req, + gpu_units_required=g_req, + name=f"MT_split_node_{partition}_{nid}_{tag}", + account=random.choice(ACCT_NAMES), + cpu_trace=cpu_trace, + gpu_trace=gpu_trace, + ntx_trace=[], nrx_trace=[], + end_state='COMPLETED', + id=job_id_ctr, + priority=random.randint(0, MAX_PRIORITY), + partition=partition, + submit_time=0, + time_limit=wall_time, + start_time=0, + end_time=wall_time, + expected_run_time=wall_time, + trace_time=wall_time, + trace_start_time=0, + trace_end_time=wall_time, + trace_quanta=cfg['TRACE_QUANTA'] + ))) + job_id_ctr += 1 + + elif mode == 'STAGGERED_JOBS_PER_NODE': + # Three jobs per node, staggered starts: 0, wall_time/3, 2*wall_time/3 + offsets = [0, wall_time // 3, 2 * wall_time // 3] + cpu_each = cores_per_node // 3 or 1 + gpu_each = max(1, gpus_per_node // 3) if gpus_per_node else 0 + + for nid in range(n_nodes): + for k, offset in enumerate(offsets): + cpu_trace, gpu_trace = make_trace(cpu_each, gpu_each) + jobs.append(Job(job_dict( + nodes_required=1, + cpu_cores_required=cpu_each, + gpu_units_required=gpu_each, + name=f"MT_stagger_node_{partition}_{nid}_{k}", + account=random.choice(ACCT_NAMES), + cpu_trace=cpu_trace, + gpu_trace=gpu_trace, + ntx_trace=[], nrx_trace=[], + end_state='COMPLETED', + id=job_id_ctr, + priority=random.randint(0, MAX_PRIORITY), + partition=partition, + submit_time=offset, + time_limit=wall_time, + start_time=offset, + end_time=offset + wall_time, + expected_run_time=wall_time, + trace_time=wall_time, + trace_start_time=0, + trace_end_time=wall_time, + trace_quanta=cfg['TRACE_QUANTA'] + ))) + job_id_ctr += 1 + else: + raise ValueError(f"Unknown multitenant mode: {mode}") + + return jobs + def plot_job_hist(jobs, config=None, dist_split=None, gantt_nodes=False): # put args.multimodal in dist_split! @@ -818,151 +963,6 @@ def run_workload(): np.savez_compressed(filename, jobs=jobs, timestep_start=timestep_start, timestep_end=timestep_end, args=args) print(filename + ".npz") # To std-out to show which npz was created. - def multitenant(self, **kwargs): - """ - Generate deterministic jobs to validate multitenant scheduling & power. - - Parameters - ---------- - mode : str - One of: - - 'ONE_JOB_PER_NODE_ALL_CORES' - - 'TWO_JOBS_PER_NODE_SPLIT' - - 'STAGGERED_JOBS_PER_NODE' - wall_time : int - Duration (seconds) of each job (default: 3600) - trace_quanta : int - Sampling interval for traces; defaults to config['TRACE_QUANTA'] - - Returns - ------- - list[dict] - List of job_dict entries. - """ - mode = kwargs.get('mode', 'TWO_JOBS_PER_NODE_SPLIT') - wall_time = kwargs.get('wall_time', 3600) - - jobs = [] - - for partition in self.partitions: - cfg = self.config_map[partition] - trace_quanta = kwargs.get('trace_quanta', cfg['TRACE_QUANTA']) - - cores_per_cpu = cfg.get('CORES_PER_CPU', 1) - cpus_per_node = cfg.get('CPUS_PER_NODE', 1) - cores_per_node = cores_per_cpu * cpus_per_node - gpus_per_node = cfg.get('GPUS_PER_NODE', 0) - - n_nodes = cfg['AVAILABLE_NODES'] - - def make_trace(cpu_util, gpu_util): - return self.compute_traces(cpu_util, gpu_util, wall_time, trace_quanta) - - job_id_ctr = 0 - - if mode == 'ONE_JOB_PER_NODE_ALL_CORES': - # Each node runs one job that consumes all cores/GPUs - for nid in range(n_nodes): - cpu_trace, gpu_trace = make_trace(cores_per_node, gpus_per_node) - jobs.append(job_dict( - nodes_required=1, - cpu_cores_required=cores_per_node, - gpu_units_required=gpus_per_node, - name=f"MT_full_node_{partition}_{nid}", - account=random.choice(ACCT_NAMES), - cpu_trace=cpu_trace, - gpu_trace=gpu_trace, - ntx_trace=[], nrx_trace=[], - end_state='COMPLETED', - id=job_id_ctr, - priority=random.randint(0, MAX_PRIORITY), - partition=partition, - submit_time=0, - time_limit=wall_time, - start_time=0, - end_time=wall_time, - expected_run_time=wall_time, - trace_time=wall_time, - trace_start_time=0, - trace_end_time=wall_time, - trace_quanta=config['TRACE_QUANTA'] - )) - job_id_ctr += 1 - - elif mode == 'TWO_JOBS_PER_NODE_SPLIT': - # Two jobs per node: split CPU/GPU roughly in half - for nid in range(n_nodes): - cpu_a = cores_per_node // 2 - cpu_b = cores_per_node - cpu_a - gpu_a = gpus_per_node // 2 - gpu_b = gpus_per_node - gpu_a - - for idx, (c_req, g_req, tag) in enumerate([(cpu_a, gpu_a, 'A'), - (cpu_b, gpu_b, 'B')]): - cpu_trace, gpu_trace = make_trace(c_req, g_req) - jobs.append(job_dict( - nodes_required=1, # still one node; multitenant RM packs cores - cpu_cores_required=c_req, - gpu_units_required=g_req, - name=f"MT_split_node_{partition}_{nid}_{tag}", - account=random.choice(ACCT_NAMES), - cpu_trace=cpu_trace, - gpu_trace=gpu_trace, - ntx_trace=[], nrx_trace=[], - end_state='COMPLETED', - id=job_id_ctr, - priority=random.randint(0, MAX_PRIORITY), - partition=partition, - submit_time=0, - time_limit=wall_time, - start_time=0, - end_time=wall_time, - expected_run_time=wall_time, - trace_time=wall_time, - trace_start_time=0, - trace_end_time=wall_time, - trace_quanta=config['TRACE_QUANTA'] - )) - job_id_ctr += 1 - - elif mode == 'STAGGERED_JOBS_PER_NODE': - # Three jobs per node, staggered starts: 0, wall_time/3, 2*wall_time/3 - offsets = [0, wall_time // 3, 2 * wall_time // 3] - cpu_each = cores_per_node // 3 or 1 - gpu_each = max(1, gpus_per_node // 3) if gpus_per_node else 0 - - for nid in range(n_nodes): - for k, offset in enumerate(offsets): - cpu_trace, gpu_trace = make_trace(cpu_each, gpu_each) - jobs.append(job_dict( - nodes_required=1, - cpu_cores_required=cpu_each, - gpu_units_required=gpu_each, - name=f"MT_stagger_node_{partition}_{nid}_{k}", - account=random.choice(ACCT_NAMES), - cpu_trace=cpu_trace, - gpu_trace=gpu_trace, - ntx_trace=[], nrx_trace=[], - end_state='COMPLETED', - id=job_id_ctr, - priority=random.randint(0, MAX_PRIORITY), - partition=partition, - submit_time=offset, - time_limit=wall_time, - start_time=offset, - end_time=offset + wall_time, - expected_run_time=wall_time, - trace_time=wall_time, - trace_start_time=0, - trace_end_time=wall_time, - trace_quanta=config['TRACE_QUANTA'] - )) - job_id_ctr += 1 - else: - raise ValueError(f"Unknown multitenant mode: {mode}") - - return jobs - def continuous_job_generation(*, engine, timestep, jobs): # print("if len(engine.queue) <= engine.continuous_workload.args.maxqueue:") diff --git a/train_rl.py b/train_rl.py index e98f7a3..73d3ed7 100644 --- a/train_rl.py +++ b/train_rl.py @@ -1,3 +1,7 @@ +""" +Example usage: + python train_rl.py --system mit_supercloud -f /opt/data/mit_supercloud/202201 +""" from stable_baselines3 import PPO from raps.envs.raps_env import RAPSEnv from raps.system_config import get_system_config -- GitLab From 2d112decfe0490cd3b8e501fa037a5b463b5c4c8 Mon Sep 17 00:00:00 2001 From: Jesse Hines Date: Wed, 10 Sep 2025 11:07:22 -0400 Subject: [PATCH 18/29] Remove symlink config This will cause issues with the multi-partiton lookup --- config/mit_supercloud.yaml | 1 - 1 file changed, 1 deletion(-) delete mode 120000 config/mit_supercloud.yaml diff --git a/config/mit_supercloud.yaml b/config/mit_supercloud.yaml deleted file mode 120000 index 2167597..0000000 --- a/config/mit_supercloud.yaml +++ /dev/null @@ -1 +0,0 @@ -mit_supercloud/part-gpu.yaml \ No newline at end of file -- GitLab From 57653313bb6c5432b2f3c6dc6afd2814b5a59387 Mon Sep 17 00:00:00 2001 From: Jesse Hines Date: Wed, 10 Sep 2025 11:12:32 -0400 Subject: [PATCH 19/29] Update pyproject.toml --- pyproject.toml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index f396280..c009d2a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "raps" -version = "0.0.1" +version = "2.0.0.dev0" requires-python = ">=3.12" description = "RAPS" readme = "README.md" @@ -30,6 +30,8 @@ dependencies = [ "pyyaml>=6.0.2", "pydantic>=2.11.7", "pydantic-settings>=2.10.1", + "stable-baselines3==2.7.0", + "gym==0.26.2", "pre-commit" ] -- GitLab From 6699940d748855f1c508fdb3849352d3737dd0e9 Mon Sep 17 00:00:00 2001 From: Jesse Hines Date: Wed, 10 Sep 2025 12:04:55 -0400 Subject: [PATCH 20/29] Move sim shortcuts --- raps/run_sim.py | 25 ++++--------------------- raps/sim_config.py | 17 +++++++++++++++++ raps/workload.py | 4 ++-- 3 files changed, 23 insertions(+), 23 deletions(-) diff --git a/raps/run_sim.py b/raps/run_sim.py index ce89529..5525d7f 100644 --- a/raps/run_sim.py +++ b/raps/run_sim.py @@ -22,7 +22,7 @@ from raps.stats import ( print_formatted_report ) -from raps.sim_config import SingleSimConfig, MultiPartSimConfig +from raps.sim_config import SingleSimConfig, MultiPartSimConfig, SIM_SHORTCUTS def read_yaml(config_file: str): @@ -34,23 +34,6 @@ def read_yaml(config_file: str): return {} -shortcuts = { - "partitions": "x", - "cooling": "c", - "simulate-network": "net", - "fastforward": "ff", - "time": "t", - "debug": "d", - "numjobs": "n", - "verbose": "v", - "output": "o", - "uncertainties": "u", - "plot": "p", - "replay": "f", - "workload": "w", -} - - def run_sim_add_parser(subparsers: SubParsers): parser = subparsers.add_parser("run", description=""" Run single-partition (homogeneous) systems. Supports synthetic workload generation or @@ -63,7 +46,7 @@ def run_sim_add_parser(subparsers: SubParsers): flags. Pass "-" to read from stdin. """) model_validate = pydantic_add_args(parser, SingleSimConfig, model_config={ - "cli_shortcuts": shortcuts, + "cli_shortcuts": SIM_SHORTCUTS, }) parser.set_defaults( impl=lambda args: run_sim(model_validate(args, read_yaml(args.config_file))) @@ -238,7 +221,7 @@ def run_parts_sim_add_parser(subparsers: SubParsers): flags. Pass "-" to read from stdin. """) model_validate = pydantic_add_args(parser, MultiPartSimConfig, model_config={ - "cli_shortcuts": shortcuts, + "cli_shortcuts": SIM_SHORTCUTS, }) parser.set_defaults( impl=lambda args: run_parts_sim(model_validate(args, read_yaml(args.config_file))) @@ -325,7 +308,7 @@ def show_add_parser(subparsers: SubParsers): If true, include defaults in the output YAML """) model_validate = pydantic_add_args(parser, SingleSimConfig, model_config={ - "cli_shortcuts": shortcuts, + "cli_shortcuts": SIM_SHORTCUTS, }) def impl(args): diff --git a/raps/sim_config.py b/raps/sim_config.py index a73cd3e..05c078c 100644 --- a/raps/sim_config.py +++ b/raps/sim_config.py @@ -391,3 +391,20 @@ class MultiPartSimConfig(SimConfig): @cached_property def _multi_partition_system_config(self): return get_partition_configs(self.partitions) + + +SIM_SHORTCUTS = { + "partitions": "x", + "cooling": "c", + "simulate-network": "net", + "fastforward": "ff", + "time": "t", + "debug": "d", + "numjobs": "n", + "verbose": "v", + "output": "o", + "uncertainties": "u", + "plot": "p", + "replay": "f", + "workload": "w", +} diff --git a/raps/workload.py b/raps/workload.py index ddd4377..2d57227 100644 --- a/raps/workload.py +++ b/raps/workload.py @@ -952,7 +952,7 @@ def plot_job_hist(jobs, config=None, dist_split=None, gantt_nodes=False): def run_workload_add_parser(subparsers: SubParsers): - from raps.run_sim import shortcuts + from raps.sim_config import SIM_SHORTCUTS # TODO: Separate the arguments for this command parser = subparsers.add_parser("workload", description=""" Saves workload as a snapshot. @@ -962,7 +962,7 @@ def run_workload_add_parser(subparsers: SubParsers): flags. Pass "-" to read from stdin. """) model_validate = pydantic_add_args(parser, SingleSimConfig, model_config={ - "cli_shortcuts": shortcuts, + "cli_shortcuts": SIM_SHORTCUTS, }) parser.set_defaults(impl=lambda args: run_workload(model_validate(args, {}))) -- GitLab From 68cfb20ed9081fccbb89cb1da7cfb7891178400e Mon Sep 17 00:00:00 2001 From: Jesse Hines Date: Wed, 10 Sep 2025 12:07:00 -0400 Subject: [PATCH 21/29] Move read_yaml to utils so we can reuse it --- raps/run_sim.py | 11 +---------- raps/utils.py | 10 ++++++++++ 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/raps/run_sim.py b/raps/run_sim.py index 5525d7f..4a3f9b3 100644 --- a/raps/run_sim.py +++ b/raps/run_sim.py @@ -13,7 +13,7 @@ from raps.ui import LayoutManager from raps.plotting import Plotter from raps.engine import Engine from raps.multi_part_engine import MultiPartEngine -from raps.utils import write_dict_to_file, pydantic_add_args, SubParsers, yaml_dump +from raps.utils import write_dict_to_file, pydantic_add_args, SubParsers, yaml_dump, read_yaml from raps.stats import ( get_engine_stats, get_job_stats, @@ -25,15 +25,6 @@ from raps.stats import ( from raps.sim_config import SingleSimConfig, MultiPartSimConfig, SIM_SHORTCUTS -def read_yaml(config_file: str): - if config_file == "-": - return yaml.safe_load(sys.stdin.read()) - elif config_file: - return yaml.safe_load(Path(config_file).read_text()) - else: - return {} - - def run_sim_add_parser(subparsers: SubParsers): parser = subparsers.add_parser("run", description=""" Run single-partition (homogeneous) systems. Supports synthetic workload generation or diff --git a/raps/utils.py b/raps/utils.py index ab02a2a..aeb04c5 100644 --- a/raps/utils.py +++ b/raps/utils.py @@ -750,6 +750,16 @@ def yaml_dump(data): ) +def read_yaml(config_file: str): + """ Parses yaml file. Pass "-" to read from stdin """ + if config_file == "-": + return yaml.safe_load(sys.stdin.read()) + elif config_file: + return yaml.safe_load(Path(config_file).read_text()) + else: + return {} + + class WorkloadData(RAPSBaseModel): """ Represents a workload, a list of jobs with some metadata. Returned by dataloaders load_data() -- GitLab From 05527c712695fc321fdf45cc1b7d45797110dec5 Mon Sep 17 00:00:00 2001 From: Jesse Hines Date: Wed, 10 Sep 2025 12:16:29 -0400 Subject: [PATCH 22/29] Add train-rl subcommand --- main.py | 2 ++ raps/train_rl.py | 58 ++++++++++++++++++++++++++++++++++++++++++++++++ train_rl.py | 35 ----------------------------- 3 files changed, 60 insertions(+), 35 deletions(-) create mode 100644 raps/train_rl.py delete mode 100644 train_rl.py diff --git a/main.py b/main.py index 7c38960..9b36a76 100644 --- a/main.py +++ b/main.py @@ -6,6 +6,7 @@ from raps.helpers import check_python_version from raps.run_sim import run_sim_add_parser, run_parts_sim_add_parser, show_add_parser from raps.workload import run_workload_add_parser from raps.telemetry import run_telemetry_add_parser +from raps.train_rl import train_rl_add_parser check_python_version() @@ -24,6 +25,7 @@ def main(cli_args: list[str] | None = None): show_add_parser(subparsers) run_workload_add_parser(subparsers) run_telemetry_add_parser(subparsers) + train_rl_add_parser(subparsers) # TODO: move other misc scripts into here diff --git a/raps/train_rl.py b/raps/train_rl.py new file mode 100644 index 0000000..f854e18 --- /dev/null +++ b/raps/train_rl.py @@ -0,0 +1,58 @@ +from raps.sim_config import SingleSimConfig, SIM_SHORTCUTS +from raps.utils import SubParsers, pydantic_add_args, read_yaml + + +class RLConfig(SingleSimConfig): + # Reinforcement Learning + episode_length: int = 1000 + """ Number of timesteps per RL episode (default 1000) """ + + +def train_rl_add_parser(subparsers: SubParsers): + parser = subparsers.add_parser("train-rl", description=""" + Example usage: + raps train-rl --system mit_supercloud/part-gpu -f /opt/data/mit_supercloud/202201 + """) + parser.add_argument("config_file", nargs="?", default=None, help=""" + YAML sim config file, can be used to configure an experiment instead of using CLI + flags. Pass "-" to read from stdin. + """) + model_validate = pydantic_add_args(parser, RLConfig, model_config={ + "cli_shortcuts": SIM_SHORTCUTS, + }) + parser.set_defaults( + impl=lambda args: train_rl(model_validate(args, read_yaml(args.config_file))) + ) + + +def train_rl(rl_config: RLConfig): + from stable_baselines3 import PPO + from raps.envs.raps_env import RAPSEnv + + args_dict = rl_config.get_legacy_args_dict() + config = rl_config.system_configs[0].get_legacy() + args_dict['config'] = config + args_dict['args'] = rl_config.get_legacy_args() + + env = RAPSEnv(**args_dict) + + model = PPO( + "MlpPolicy", + env, + n_steps=512, # shorter rollouts (quicker feedback loop) + batch_size=128, # must divide n_steps evenly + n_epochs=10, # # of minibatch passes per update + gamma=0.99, # discount (keeps long-term credit) + learning_rate=3e-4, # default Adam lr, can try 1e-4 if unstable + ent_coef=0.01, # encourage exploration + verbose=1, + tensorboard_log="./ppo_raps_logs/" + ) + + model.learn(total_timesteps=10000, tb_log_name="ppo_raps") + + # Output stats + stats = env.get_stats() + + # Save trained model + model.save("ppo_raps") diff --git a/train_rl.py b/train_rl.py deleted file mode 100644 index 73d3ed7..0000000 --- a/train_rl.py +++ /dev/null @@ -1,35 +0,0 @@ -""" -Example usage: - python train_rl.py --system mit_supercloud -f /opt/data/mit_supercloud/202201 -""" -from stable_baselines3 import PPO -from raps.envs.raps_env import RAPSEnv -from raps.system_config import get_system_config -from raps.sim_config import args, args_dict - -config = get_system_config(args.system).get_legacy() -args_dict['config'] = config -args_dict['args'] = args - -env = RAPSEnv(**args_dict) - -model = PPO( - "MlpPolicy", - env, - n_steps=512, # shorter rollouts (quicker feedback loop) - batch_size=128, # must divide n_steps evenly - n_epochs=10, # # of minibatch passes per update - gamma=0.99, # discount (keeps long-term credit) - learning_rate=3e-4, # default Adam lr, can try 1e-4 if unstable - ent_coef=0.01, # encourage exploration - verbose=1, - tensorboard_log="./ppo_raps_logs/" -) - -model.learn(total_timesteps=10000, tb_log_name="ppo_raps") - -# Output stats -stats = env.get_stats() - -# Save trained model -model.save("ppo_raps") -- GitLab From a546f9938154a6d5fcbb8ae287dd0ccbdee1daf7 Mon Sep 17 00:00:00 2001 From: Wes Brewer Date: Wed, 10 Sep 2025 18:10:33 -0400 Subject: [PATCH 23/29] A number of simplifications to raps_env.py --- raps/envs/raps_env.py | 82 ++++++++----------------------------------- raps/train_rl.py | 24 ++++++------- 2 files changed, 26 insertions(+), 80 deletions(-) diff --git a/raps/envs/raps_env.py b/raps/envs/raps_env.py index bf3a161..e786013 100644 --- a/raps/envs/raps_env.py +++ b/raps/envs/raps_env.py @@ -4,23 +4,14 @@ from gym import spaces import numpy as np from raps.engine import Engine -from raps.power import PowerManager, compute_node_power -from raps.flops import FLOPSManager -from raps.telemetry import Telemetry from raps.workload import Workload -from raps.ui import LayoutManager -from raps.schedulers.rl import Scheduler # from raps.resmgr.default import MultiTenantResourceManager as ResourceManager -from raps.resmgr.default import ExclusiveNodeResourceManager as ResourceManager from raps.stats import get_engine_stats, get_job_stats, get_scheduler_stats, get_network_stats from stable_baselines3.common.logger import Logger, HumanOutputFormat import sys -logger = Logger( - folder=None, # no log file, just stdout - output_formats=[HumanOutputFormat(sys.stdout)] -) +logger = Logger(folder=None, output_formats=[HumanOutputFormat(sys.stdout)]) def print_stats(stats, step=0): @@ -56,65 +47,11 @@ class RAPSEnv(gym.Env): metadata = {"render.modes": ["human"]} - def __init__(self, **kwargs): + def __init__(self, sim_config): super().__init__() # Store everything in self.args - self.args_dict = kwargs # dict - self.cli_args = kwargs.get("args") # Namespace - self.config = kwargs.get("config") - if self.cli_args is None: - raise ValueError("RAPSEnv requires 'args' (argparse.Namespace) in kwargs") - if self.config is None: - raise ValueError("RAPSEnv requires 'config' in kwargs") - - # --- managers (minimal versions) --- - self.power_manager = PowerManager(compute_node_power, **self.config) - self.flops_manager = FLOPSManager(**self.args_dict) - self.telemetry = Telemetry(**self.args_dict) - - # --- Build initial jobs & time bounds --- - self.jobs, self.timestep_start, self.timestep_end = self._build_jobs() - self.original_jobs = self.jobs # keep pristine version - - self.engine = Engine( - power_manager=self.power_manager, - flops_manager=self.flops_manager, - jobs=self.jobs, - **self.args_dict - ) - - resmgr = ResourceManager( - total_nodes=self.config["TOTAL_NODES"], - down_nodes=self.config.get("DOWN_NODES", []), - config=self.config - ) - - # Plug in RL scheduler - self.scheduler = Scheduler( - config=self.config, - policy="fcfs", # or None if you want no heuristic fallback - resource_manager=resmgr, - env=self - ) - self.engine.scheduler = self.scheduler - - self.layout_manager = LayoutManager( - self.args_dict.get("layout"), engine=self.engine, - debug=self.args_dict.get("debug", False), - total_timesteps=self.args_dict.get("time", 1000), - args_dict=self.args_dict, - **self.config - ) - - self.timestep_start = 0 - self.timestep_end = getattr(self.cli_args, "episode_length") - - self.generator = self.layout_manager.run_stepwise( - self.jobs, - timestep_start=self.timestep_start, - timestep_end=self.timestep_end, - time_delta=self.args_dict.get("time_delta"), - ) + self.sim_config = sim_config + self.engine = self._create_engine() # --- RL spaces --- max_jobs = 100 @@ -124,6 +61,14 @@ class RAPSEnv(gym.Env): ) self.action_space = spaces.Discrete(max_jobs) + def _create_engine(self): + self.engine, workload_data, time_delta = Engine.from_sim_config(self.sim_config) + self.engine.scheduler.env = self + jobs = workload_data.jobs + timestep_start = workload_data.telemetry_start + timestep_end = workload_data.telemetry_end + self.generator = self.engine.run_simulation(jobs, timestep_start, timestep_end, time_delta) + def _build_jobs(self): """ Build a job list either from synthetic workload (--workload) @@ -204,6 +149,9 @@ class RAPSEnv(gym.Env): # return self._get_state(), {} def reset(self, **kwargs): + self.engine = self._create_engine() + + def reset2(self, **kwargs): completed = [j.id for j in self.jobs if j.current_state.name == "COMPLETED"] print(f"[RESET] Jobs already completed before deepcopy: {len(completed)}") diff --git a/raps/train_rl.py b/raps/train_rl.py index f854e18..eac4172 100644 --- a/raps/train_rl.py +++ b/raps/train_rl.py @@ -2,12 +2,6 @@ from raps.sim_config import SingleSimConfig, SIM_SHORTCUTS from raps.utils import SubParsers, pydantic_add_args, read_yaml -class RLConfig(SingleSimConfig): - # Reinforcement Learning - episode_length: int = 1000 - """ Number of timesteps per RL episode (default 1000) """ - - def train_rl_add_parser(subparsers: SubParsers): parser = subparsers.add_parser("train-rl", description=""" Example usage: @@ -17,15 +11,18 @@ def train_rl_add_parser(subparsers: SubParsers): YAML sim config file, can be used to configure an experiment instead of using CLI flags. Pass "-" to read from stdin. """) - model_validate = pydantic_add_args(parser, RLConfig, model_config={ + model_validate = pydantic_add_args(parser, SingleSimConfig, model_config={ "cli_shortcuts": SIM_SHORTCUTS, }) - parser.set_defaults( - impl=lambda args: train_rl(model_validate(args, read_yaml(args.config_file))) - ) + + def impl(args): + model = model_validate(args, read_yaml(args.config_file)) + model.scheduler = "rl" + train_rl(model) + parser.set_defaults(impl=impl) -def train_rl(rl_config: RLConfig): +def train_rl(rl_config: SingleSimConfig): from stable_baselines3 import PPO from raps.envs.raps_env import RAPSEnv @@ -34,14 +31,14 @@ def train_rl(rl_config: RLConfig): args_dict['config'] = config args_dict['args'] = rl_config.get_legacy_args() - env = RAPSEnv(**args_dict) + env = RAPSEnv(rl_config) model = PPO( "MlpPolicy", env, n_steps=512, # shorter rollouts (quicker feedback loop) batch_size=128, # must divide n_steps evenly - n_epochs=10, # # of minibatch passes per update + n_epochs=10, # of minibatch passes per update gamma=0.99, # discount (keeps long-term credit) learning_rate=3e-4, # default Adam lr, can try 1e-4 if unstable ent_coef=0.01, # encourage exploration @@ -53,6 +50,7 @@ def train_rl(rl_config: RLConfig): # Output stats stats = env.get_stats() + print(stats) # Save trained model model.save("ppo_raps") -- GitLab From cfac2e289baad4c20ae018799ea36c76a9d697ea Mon Sep 17 00:00:00 2001 From: Jesse Hines Date: Thu, 11 Sep 2025 11:25:43 -0400 Subject: [PATCH 24/29] Fix running single partition --- raps/dataloaders/mit_supercloud/loader.py | 2 +- raps/telemetry.py | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/raps/dataloaders/mit_supercloud/loader.py b/raps/dataloaders/mit_supercloud/loader.py index 97deb4e..a622965 100644 --- a/raps/dataloaders/mit_supercloud/loader.py +++ b/raps/dataloaders/mit_supercloud/loader.py @@ -298,7 +298,7 @@ def load_data(local_dataset_path, **kwargs): ]) # partition mode - part = kwargs.get("partition", "").split("/")[-1].lower() + part = (kwargs.get("partition") or "").split("/")[-1].lower() cpu_only = (part == "part-cpu") mixed = (part == "part-gpu") diff --git a/raps/telemetry.py b/raps/telemetry.py index 63ee158..7ab911a 100644 --- a/raps/telemetry.py +++ b/raps/telemetry.py @@ -77,11 +77,12 @@ class Telemetry: def __init__(self, **kwargs): self.kwargs = kwargs - self.system = kwargs.get('system') + self.system = kwargs['system'] self.config = kwargs.get('config') try: - self.dataloader = importlib.import_module(f"raps.dataloaders.{self.system}", package=__package__) + module = self.system.split("/")[0] + self.dataloader = importlib.import_module(f"raps.dataloaders.{module}", package=__package__) except ImportError as e: print(f"WARNING: Failed to load dataloader: {e}") self.dataloader = None -- GitLab From af3a91710885d2917c65b5052c7bc414d11a004d Mon Sep 17 00:00:00 2001 From: Wes Brewer Date: Thu, 11 Sep 2025 12:29:38 -0400 Subject: [PATCH 25/29] Clean up raps_env.py. Add in check_env. Add sample command to README.md. Not working yet. --- README.md | 5 +- raps/envs/raps_env.py | 110 +++--------------------------------------- raps/train_rl.py | 2 + 3 files changed, 14 insertions(+), 103 deletions(-) diff --git a/README.md b/README.md index a3ccb54..eb39cc7 100644 --- a/README.md +++ b/README.md @@ -74,6 +74,9 @@ For MIT Supercloud # Synthetic tests for verification studies: raps run-parts -x mit_supercloud -w multitenant + # Reinforcement learning test case + python main.py train-rl --system mit_supercloud/part-cpu -f /opt/data/mit_supercloud/202201 + For Lumi # Synthetic test for Lumi: @@ -170,7 +173,7 @@ See instructions in [dashboard/README.md](https://code.ornl.gov/exadigit/simulat ## Running Tests -RAPS uses [pytest](https://docs.pytest.org/) for its test suite. +RAPS uses [pytest](https://docs.pytest.org/) for its test suite. Before running tests, ensure that you have a valid data directory available (e.g., `/opt/data`) and set the environment variable `RAPS_DATA_DIR` to point to it. ### Run all tests diff --git a/raps/envs/raps_env.py b/raps/envs/raps_env.py index e786013..2dbb032 100644 --- a/raps/envs/raps_env.py +++ b/raps/envs/raps_env.py @@ -1,4 +1,3 @@ -import copy import gym from gym import spaces import numpy as np @@ -64,10 +63,10 @@ class RAPSEnv(gym.Env): def _create_engine(self): self.engine, workload_data, time_delta = Engine.from_sim_config(self.sim_config) self.engine.scheduler.env = self - jobs = workload_data.jobs + self.jobs = workload_data.jobs timestep_start = workload_data.telemetry_start timestep_end = workload_data.telemetry_end - self.generator = self.engine.run_simulation(jobs, timestep_start, timestep_end, time_delta) + self.generator = self.engine.run_simulation(self.jobs, timestep_start, timestep_end, time_delta) def _build_jobs(self): """ @@ -114,72 +113,11 @@ class RAPSEnv(gym.Env): else: raise ValueError("RAPSEnv requires either --workload or --replay to build jobs.") -# def reset(self, seed=None, options=None): -# super().reset(seed=seed) -# -# self.jobs = copy.deepcopy(self.original_jobs) # working copy -# -# # Reset engine -# self.engine.current_timestep = 0 -# #self.engine.reset() # or clear state manually -# power_manager = PowerManager(compute_node_power, **self.config) -# flops_manager = FLOPSManager(**self.args_dict) -# telemetry = Telemetry(**self.args_dict) -# jobs, timestep_start, timestep_end = self._build_jobs() -# -# self.engine = Engine( -# power_manager=power_manager, -# flops_manager=flops_manager, -# jobs=jobs, -# **self.args_dict -# ) -# -# self.engine.timestep_start = timestep_start -# self.engine.timestep_end = timestep_end -# #self.engine.current_timestep = timestep_start -# -# # Restart generator -# self.generator = self.layout_manager.run_stepwise( -# self.jobs, -# timestep_start=self.timestep_start, -# timestep_end=self.timestep_end, -# time_delta=self.args_dict.get("time_delta"), -# ) -# -# return self._get_state(), {} - def reset(self, **kwargs): self.engine = self._create_engine() - - def reset2(self, **kwargs): - completed = [j.id for j in self.jobs if j.current_state.name == "COMPLETED"] - print(f"[RESET] Jobs already completed before deepcopy: {len(completed)}") - - super().reset(seed=42) - # self.engine.jobs = self.jobs - self.jobs = copy.deepcopy(self.original_jobs) # working copy - - # self.engine.timestep_start = self.timestep_start - # self.engine.timestep_end = self.timestep_end - # self.engine.reset(self.jobs, self.timestep_start, self.timestep_end) - - # self.engine.current_timestep = self.timestep_start - - # self.engine.jobs = self.jobs # repoint engine to fresh jobs - # self.engine.completed_jobs = [] - # self.engine.queue.clear() - # self.engine.running.clear() - # self.engine.power_manager.history.clear() - # self.engine.jobs_completed = 0 - - self.generator = self.layout_manager.run_stepwise( - self.jobs, - timestep_start=self.timestep_start, - timestep_end=self.timestep_end, - time_delta=self.args_dict.get("time_delta", 1), - ) - - return self._get_state() + obs = self._get_state() + info = {} + return obs, info def _compute_reward(self, tick_data): """ @@ -206,41 +144,6 @@ class RAPSEnv(gym.Env): return reward -# def _compute_reward(self, tick_data): -# """ -# Reward function: minimize carbon footprint per job completed. -# Encourages the agent to complete jobs while keeping emissions low. -# """ -# reward = 0.0 -# -# # Jobs completed this tick -# jobs_completed = len(getattr(tick_data, "completed", [])) -# -# # Carbon emitted so far (metric tons CO2) -# carbon_so_far = getattr(self.engine, "carbon emissions", 0.0) -# -# if jobs_completed > 0: -# # Reward is higher when more jobs finish with less carbon -# reward = jobs_completed / (carbon_so_far + 1e-6) -# else: -# # Small penalty if no jobs finished (encourages progress) -# reward = -0.01 -# -# return reward - - def _compute_reward2(self, tick_data, alpha=10.0, beta=1.0, gamma=2.0): - completed = getattr(tick_data, "completed", None) - jobs_completed = len(completed) if completed else 0 - power = self.power_manager.history[-1][1] - queue_len = len(self.engine.queue) - - reward = alpha * jobs_completed - beta * power - gamma * queue_len - - print(f"[t={self.engine.current_timestep}] jobs_completed={jobs_completed}, " - f"power={power}, queue_len={queue_len}, reward={reward}") - - return reward - def step(self, action): queue = self.engine.queue invalid_action = False @@ -268,6 +171,9 @@ class RAPSEnv(gym.Env): else: reward = self._compute_reward(tick_data) + # clip reward + reward = np.clip(reward, -10.0, 10.0) + # Print stats stats = self.get_stats() print_stats(stats) diff --git a/raps/train_rl.py b/raps/train_rl.py index eac4172..35edce1 100644 --- a/raps/train_rl.py +++ b/raps/train_rl.py @@ -24,6 +24,7 @@ def train_rl_add_parser(subparsers: SubParsers): def train_rl(rl_config: SingleSimConfig): from stable_baselines3 import PPO + from stable_baselines3.common.env_checker import check_env from raps.envs.raps_env import RAPSEnv args_dict = rl_config.get_legacy_args_dict() @@ -32,6 +33,7 @@ def train_rl(rl_config: SingleSimConfig): args_dict['args'] = rl_config.get_legacy_args() env = RAPSEnv(rl_config) + check_env(RAPSEnv(env)) model = PPO( "MlpPolicy", -- GitLab From 82f8925738c7aedbed9c5a3db6d28df77cf9cc69 Mon Sep 17 00:00:00 2001 From: Wes Brewer Date: Thu, 11 Sep 2025 12:47:56 -0400 Subject: [PATCH 26/29] Get RL working again... --- raps/envs/raps_env.py | 27 ++++++++++++++++++--------- raps/train_rl.py | 2 -- 2 files changed, 18 insertions(+), 11 deletions(-) diff --git a/raps/envs/raps_env.py b/raps/envs/raps_env.py index 2dbb032..d395066 100644 --- a/raps/envs/raps_env.py +++ b/raps/envs/raps_env.py @@ -61,12 +61,13 @@ class RAPSEnv(gym.Env): self.action_space = spaces.Discrete(max_jobs) def _create_engine(self): - self.engine, workload_data, time_delta = Engine.from_sim_config(self.sim_config) - self.engine.scheduler.env = self + engine, workload_data, time_delta = Engine.from_sim_config(self.sim_config) + engine.scheduler.env = self self.jobs = workload_data.jobs timestep_start = workload_data.telemetry_start timestep_end = workload_data.telemetry_end - self.generator = self.engine.run_simulation(self.jobs, timestep_start, timestep_end, time_delta) + self.generator = engine.run_simulation(self.jobs, timestep_start, timestep_end, time_delta) + return engine def _build_jobs(self): """ @@ -116,8 +117,7 @@ class RAPSEnv(gym.Env): def reset(self, **kwargs): self.engine = self._create_engine() obs = self._get_state() - info = {} - return obs, info + return obs def _compute_reward(self, tick_data): """ @@ -145,6 +145,9 @@ class RAPSEnv(gym.Env): return reward def step(self, action): + if self.engine is None: + raise RuntimeError("Engine not initialized. Did you forget to call reset()?") + queue = self.engine.queue invalid_action = False @@ -153,11 +156,17 @@ class RAPSEnv(gym.Env): invalid_action = True else: job = queue[int(action)] - available = len(self.engine.scheduler.resource_manager.available_nodes) - if job.nodes_required <= available: - # Valid scheduling + available_nodes = self.engine.scheduler.resource_manager.available_nodes + + if job.nodes_required <= len(available_nodes): + # Just pick the first available node (simplest placement policy) + node_id = available_nodes[0] self.engine.scheduler.place_job_and_manage_queues( - job, queue, self.engine.running, self.engine.current_timestep + job, + queue, + self.engine.running, + self.engine.current_timestep, + node_id, ) else: invalid_action = True diff --git a/raps/train_rl.py b/raps/train_rl.py index 35edce1..eac4172 100644 --- a/raps/train_rl.py +++ b/raps/train_rl.py @@ -24,7 +24,6 @@ def train_rl_add_parser(subparsers: SubParsers): def train_rl(rl_config: SingleSimConfig): from stable_baselines3 import PPO - from stable_baselines3.common.env_checker import check_env from raps.envs.raps_env import RAPSEnv args_dict = rl_config.get_legacy_args_dict() @@ -33,7 +32,6 @@ def train_rl(rl_config: SingleSimConfig): args_dict['args'] = rl_config.get_legacy_args() env = RAPSEnv(rl_config) - check_env(RAPSEnv(env)) model = PPO( "MlpPolicy", -- GitLab From edc8b43e00b995203592cb60189b5e8f169a9685 Mon Sep 17 00:00:00 2001 From: Wes Brewer Date: Thu, 11 Sep 2025 13:02:04 -0400 Subject: [PATCH 27/29] Remove unused _build_jobs method --- raps/envs/raps_env.py | 47 ------------------------------------------- 1 file changed, 47 deletions(-) diff --git a/raps/envs/raps_env.py b/raps/envs/raps_env.py index d395066..e27a6d2 100644 --- a/raps/envs/raps_env.py +++ b/raps/envs/raps_env.py @@ -3,8 +3,6 @@ from gym import spaces import numpy as np from raps.engine import Engine -from raps.workload import Workload -# from raps.resmgr.default import MultiTenantResourceManager as ResourceManager from raps.stats import get_engine_stats, get_job_stats, get_scheduler_stats, get_network_stats from stable_baselines3.common.logger import Logger, HumanOutputFormat @@ -69,51 +67,6 @@ class RAPSEnv(gym.Env): self.generator = engine.run_simulation(self.jobs, timestep_start, timestep_end, time_delta) return engine - def _build_jobs(self): - """ - Build a job list either from synthetic workload (--workload) - or from telemetry replay (--replay). - Returns: jobs, timestep_start, timestep_end - """ - # --- Case 1: Telemetry replay --- - if self.cli_args and getattr(self.cli_args, "replay"): - result = self.telemetry.load_jobs_times_args_from_files( - files=self.cli_args.replay, - args=self.cli_args, - config=self.config, - ) - - # Handle 3-tuple vs 4-tuple return - if len(result) == 3: - jobs, start_time, end_time = result - elif len(result) == 4: - jobs, start_time, end_time, _ = result - else: - raise ValueError(f"Unexpected telemetry return format: {len(result)} values") - - # Flatten partitioned jobs if necessary - if jobs and isinstance(jobs[0], list): - jobs = [job for sublist in jobs for job in sublist] - - return jobs, start_time, end_time - - # --- Case 2: Synthetic workload generation --- - elif self.cli_args and getattr(self.cli_args, "workload"): - wl = Workload(self.cli_args, self.config) - jobs = wl.generate_jobs() - - # For synthetic jobs, compute timestep_end from submit + run_time - timestep_start = 0 - timestep_end = max( - (getattr(job, "end_time", None) or getattr(job, "expected_run_time", 0) + job.submit_time) - for job in jobs - ) - return jobs, timestep_start, timestep_end - - # --- Error: neither replay nor workload specified --- - else: - raise ValueError("RAPSEnv requires either --workload or --replay to build jobs.") - def reset(self, **kwargs): self.engine = self._create_engine() obs = self._get_state() -- GitLab From 219d243c09a0fd1d6b7d6829df174c6c95e91b3b Mon Sep 17 00:00:00 2001 From: Jesse Hines Date: Mon, 15 Sep 2025 11:02:30 -0400 Subject: [PATCH 28/29] Fix next_arrival_byconfargs error --- raps/utils.py | 24 +++++++++++++++++++----- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/raps/utils.py b/raps/utils.py index aeb04c5..c3c541f 100644 --- a/raps/utils.py +++ b/raps/utils.py @@ -54,6 +54,19 @@ def deep_subtract_dicts(a: dict, b: dict): return a +def to_dict(arg): + """ + Normalizes arg to a dictionary if necessary. Used to convert between legacy argparse.Namespace + objects and dictionaries. + """ + if isinstance(arg, dict): + return arg + elif isinstance(arg, argparse.Namespace): + return vars(arg) + else: + raise ValueError(f"Cannot convert {arg} to dict") + + def sum_values(values): return sum(x[1] for x in values) if values else 0 @@ -456,14 +469,15 @@ def create_dir_indexed(dir: str, path: str = None) -> str: def next_arrival_byconfargs(config, args, reset=False): + args = to_dict(args) arrival_rate = 1 arrival_time = config['JOB_ARRIVAL_TIME'] - downscale = args.downscale + downscale = args['downscale'] - if args.job_arrival_rate: - arrival_rate = args.job_arrival_rate - if args.job_arrival_time: - arrival_time = args.job_arrival_time + if args['job_arrival_rate']: + arrival_rate = args['job_arrival_rate'] + if args['job_arrival_time']: + arrival_time = args['job_arrival_time'] return next_arrival(arrival_rate / (arrival_time * downscale), reset) -- GitLab From c41b265e162f97f2f1fc94e7b9fec44f2dde5544 Mon Sep 17 00:00:00 2001 From: Jesse Hines Date: Tue, 16 Sep 2025 15:59:15 -0400 Subject: [PATCH 29/29] Add TODO comment --- raps/telemetry.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/raps/telemetry.py b/raps/telemetry.py index 7ab911a..6d5aa19 100644 --- a/raps/telemetry.py +++ b/raps/telemetry.py @@ -229,6 +229,8 @@ class Telemetry: job.scheduled_nodes = None # Setting to None triggers scheduler to assign nodes if self.kwargs['arrival'] == "poisson": + # TODO: --arrival poisson distribution throws errors about start_time in some scenarios + # e.g. `python main.py run-parts experiments/mit-replay-24hrs.yaml --arrival poisson` for job in jobs: job.scheduled_nodes = None job.submit_time = next_arrival_byconfargs(self.config, self.kwargs) -- GitLab