From 27d2179b038e4cabbf440fec49441902b44dee9b Mon Sep 17 00:00:00 2001 From: Wes Brewer Date: Tue, 9 Sep 2025 21:15:28 -0400 Subject: [PATCH] Initial work on adding in the RL framework to latest develop --- main.py | 2 + pyproject.toml | 2 +- raps/envs/raps_env.py | 375 ++++++++++++++++++++++++++++++++++++++++++ raps/schedulers/rl.py | 35 ++++ raps/train_rl.py | 73 ++++++++ 5 files changed, 486 insertions(+), 1 deletion(-) create mode 100644 raps/envs/raps_env.py create mode 100644 raps/schedulers/rl.py create mode 100644 raps/train_rl.py diff --git a/main.py b/main.py index 7c38960..9b36a76 100644 --- a/main.py +++ b/main.py @@ -6,6 +6,7 @@ from raps.helpers import check_python_version from raps.run_sim import run_sim_add_parser, run_parts_sim_add_parser, show_add_parser from raps.workload import run_workload_add_parser from raps.telemetry import run_telemetry_add_parser +from raps.train_rl import train_rl_add_parser check_python_version() @@ -24,6 +25,7 @@ def main(cli_args: list[str] | None = None): show_add_parser(subparsers) run_workload_add_parser(subparsers) run_telemetry_add_parser(subparsers) + train_rl_add_parser(subparsers) # TODO: move other misc scripts into here diff --git a/pyproject.toml b/pyproject.toml index f396280..15faff6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "raps" -version = "0.0.1" +version = "2.0.0.dev0" requires-python = ">=3.12" description = "RAPS" readme = "README.md" diff --git a/raps/envs/raps_env.py b/raps/envs/raps_env.py new file mode 100644 index 0000000..503b1c2 --- /dev/null +++ b/raps/envs/raps_env.py @@ -0,0 +1,375 @@ +import copy +import gym +from gym import spaces +import numpy as np + +from raps.engine import Engine +from raps.power import PowerManager, compute_node_power +from raps.flops import FLOPSManager +from raps.telemetry import Telemetry +from raps.workload import Workload +from raps.ui import LayoutManager +from raps.schedulers.rl import Scheduler +# from raps.resmgr.default import MultiTenantResourceManager as ResourceManager +from raps.resmgr.default import ExclusiveNodeResourceManager as ResourceManager +from raps.stats import get_engine_stats, get_job_stats, get_scheduler_stats, get_network_stats + +from stable_baselines3.common.logger import Logger, HumanOutputFormat +import sys + +logger = Logger( + folder=None, # no log file, just stdout + output_formats=[HumanOutputFormat(sys.stdout)] +) + + +def print_stats(stats, step=0): + """prints SB3-style stats output""" + + wanted_keys = { + "time simulated": "engine/Time Simulated", + "average power": "engine/Average Power", + "system power efficiency": "engine/System Power Efficiency", + "total energy consumed": "engine/Total Energy Consumed", + "carbon emissions": "engine/Carbon Footprint", + "jobs completed": "jobs/Jobs Completed", + "throughput": "jobs/Throughput", + "jobs still running": "jobs/Jobs Still Running", + } + + for section in ["engine_stats", "job_stats"]: + if section in stats: + for k, v in stats[section].items(): + if k.lower() in wanted_keys: + if k.lower() == "jobs still running" and isinstance(v, list): + v = len(v) + logger.record(wanted_keys[k.lower()], v) + + logger.dump(step=step) + + +class RAPSEnv(gym.Env): + """ + Minimal Gym-compatible wrapper around RAPS Engine + for RL job scheduling experiments. + """ + + metadata = {"render.modes": ["human"]} + + def __init__(self, **kwargs): + super().__init__() + # Store everything in self.args + self.args_dict = kwargs # dict + self.cli_args = kwargs.get("args") # Namespace + self.config = kwargs.get("config") + if self.cli_args is None: + raise ValueError("RAPSEnv requires 'args' (argparse.Namespace) in kwargs") + if self.config is None: + raise ValueError("RAPSEnv requires 'config' in kwargs") + + # --- managers (minimal versions) --- + self.power_manager = PowerManager(compute_node_power, **self.config) + self.flops_manager = FLOPSManager(**self.args_dict) + self.telemetry = Telemetry(**self.args_dict) + + # --- Build initial jobs & time bounds --- + self.jobs, self.timestep_start, self.timestep_end = self._build_jobs() + self.original_jobs = self.jobs # keep pristine version + + self.engine = Engine( + power_manager=self.power_manager, + flops_manager=self.flops_manager, + jobs=self.jobs, + **self.args_dict + ) + + resmgr = ResourceManager( + total_nodes=self.config["TOTAL_NODES"], + down_nodes=self.config.get("DOWN_NODES", []), + config=self.config + ) + + # Plug in RL scheduler + self.scheduler = Scheduler( + config=self.config, + policy="fcfs", # or None if you want no heuristic fallback + resource_manager=resmgr, + env=self + ) + self.engine.scheduler = self.scheduler + + self.layout_manager = LayoutManager( + self.args_dict.get("layout"), engine=self.engine, + debug=self.args_dict.get("debug", False), + total_timesteps=self.args_dict.get("time", 1000), + args_dict=self.args_dict, + **self.config + ) + + self.timestep_start = 0 + self.timestep_end = getattr(self.cli_args, "episode_length") + + self.generator = self.layout_manager.run_stepwise( + self.jobs, + timestep_start=self.timestep_start, + timestep_end=self.timestep_end, + time_delta=self.args_dict.get("time_delta"), + ) + + # --- RL spaces --- + max_jobs = 100 + job_features = 4 # [nodes, runtime, priority, wait_time] + self.observation_space = spaces.Box( + low=0, high=1, shape=(max_jobs, job_features), dtype=np.float32 + ) + self.action_space = spaces.Discrete(max_jobs) + + def _build_jobs(self): + """ + Build a job list either from synthetic workload (--workload) + or from telemetry replay (--replay). + Returns: jobs, timestep_start, timestep_end + """ + # --- Case 1: Telemetry replay --- + if self.cli_args and getattr(self.cli_args, "replay"): + #result = self.telemetry.load_jobs_times_args_from_files( + # files=self.cli_args.replay, + # args=self.cli_args, + # config=self.config, + #) + + result = self.telemetry.load_from_files(self.cli_args.replay) + jobs = result.jobs + start_time, end_time = result.telemetry_start, result.telemetry_end + + + ## Handle 3-tuple vs 4-tuple return + #if len(result) == 3: + # jobs, start_time, end_time = result + #elif len(result) == 4: + # jobs, start_time, end_time, _ = result + #else: + # raise ValueError(f"Unexpected telemetry return format: {len(result)} values") +# +# # Flatten partitioned jobs if necessary +# if jobs and isinstance(jobs[0], list): +# jobs = [job for sublist in jobs for job in sublist] + + return jobs, start_time, end_time + + # --- Case 2: Synthetic workload generation --- + elif self.cli_args and getattr(self.cli_args, "workload"): + wl = Workload(self.cli_args, self.config) + jobs = wl.generate_jobs() + + # For synthetic jobs, compute timestep_end from submit + run_time + timestep_start = 0 + timestep_end = max( + (getattr(job, "end_time", None) or getattr(job, "expected_run_time", 0) + job.submit_time) + for job in jobs + ) + return jobs, timestep_start, timestep_end + + # --- Error: neither replay nor workload specified --- + else: + raise ValueError("RAPSEnv requires either --workload or --replay to build jobs.") + +# def reset(self, seed=None, options=None): +# super().reset(seed=seed) +# +# self.jobs = copy.deepcopy(self.original_jobs) # working copy +# +# # Reset engine +# self.engine.current_timestep = 0 +# #self.engine.reset() # or clear state manually +# power_manager = PowerManager(compute_node_power, **self.config) +# flops_manager = FLOPSManager(**self.args_dict) +# telemetry = Telemetry(**self.args_dict) +# jobs, timestep_start, timestep_end = self._build_jobs() +# +# self.engine = Engine( +# power_manager=power_manager, +# flops_manager=flops_manager, +# jobs=jobs, +# **self.args_dict +# ) +# +# self.engine.timestep_start = timestep_start +# self.engine.timestep_end = timestep_end +# #self.engine.current_timestep = timestep_start +# +# # Restart generator +# self.generator = self.layout_manager.run_stepwise( +# self.jobs, +# timestep_start=self.timestep_start, +# timestep_end=self.timestep_end, +# time_delta=self.args_dict.get("time_delta"), +# ) +# +# return self._get_state(), {} + + def reset(self, **kwargs): + completed = [j.id for j in self.jobs if j.current_state.name == "COMPLETED"] + print(f"[RESET] Jobs already completed before deepcopy: {len(completed)}") + + super().reset(seed=42) + #self.engine.jobs = self.jobs + self.jobs = copy.deepcopy(self.original_jobs) # working copy + + #self.engine.timestep_start = self.timestep_start + #self.engine.timestep_end = self.timestep_end + #self.engine.reset(self.jobs, self.timestep_start, self.timestep_end) + + #self.engine.current_timestep = self.timestep_start +# +# self.engine.jobs = self.jobs # repoint engine to fresh jobs +# self.engine.completed_jobs = [] +# self.engine.queue.clear() +# self.engine.running.clear() +# self.engine.power_manager.history.clear() +# self.engine.jobs_completed = 0 + + self.generator = self.layout_manager.run_stepwise( + self.jobs, + timestep_start=self.timestep_start, + timestep_end=self.timestep_end, + time_delta=self.args_dict.get("time_delta", 1), + ) + + return self._get_state() + + def _compute_reward(self, tick_data): + """ + Reward function for RL scheduling on Frontier-like systems. + Balances throughput and carbon footprint, using incremental values. + """ + + # How many jobs completed *this tick* + jobs_done = len(getattr(tick_data, "completed", [])) + + # Incremental carbon emitted this tick + carbon_step = getattr(self.engine, "carbon emissions", 0.0) + + # Tradeoff weights (tunable hyperparameters) + alpha = 10.0 # reward for finishing a job + beta = 0.1 # penalty per metric ton CO2 + + # Reward = (jobs * alpha) - (carbon * beta) + reward = (alpha * jobs_done) - (beta * carbon_step) + + # Small penalty if idle and no jobs complete + if jobs_done == 0 and carbon_step == 0: + reward -= 0.01 + + return reward + +# def _compute_reward(self, tick_data): +# """ +# Reward function: minimize carbon footprint per job completed. +# Encourages the agent to complete jobs while keeping emissions low. +# """ +# reward = 0.0 +# +# # Jobs completed this tick +# jobs_completed = len(getattr(tick_data, "completed", [])) +# +# # Carbon emitted so far (metric tons CO2) +# carbon_so_far = getattr(self.engine, "carbon emissions", 0.0) +# +# if jobs_completed > 0: +# # Reward is higher when more jobs finish with less carbon +# reward = jobs_completed / (carbon_so_far + 1e-6) +# else: +# # Small penalty if no jobs finished (encourages progress) +# reward = -0.01 +# +# return reward + + def _compute_reward2(self, tick_data, alpha=10.0, beta=1.0, gamma=2.0): + completed = getattr(tick_data, "completed", None) + jobs_completed = len(completed) if completed else 0 + power = self.power_manager.history[-1][1] + queue_len = len(self.engine.queue) + + reward = alpha * jobs_completed - beta * power - gamma * queue_len + + print(f"[t={self.engine.current_timestep}] jobs_completed={jobs_completed}, " + f"power={power}, queue_len={queue_len}, reward={reward}") + + return reward + + def step(self, action): + queue = self.engine.queue + invalid_action = False + + # If queue empty or index out of range → invalid + if len(queue) == 0 or action >= len(queue): + invalid_action = True + else: + job = queue[int(action)] + available = len(self.engine.scheduler.resource_manager.available_nodes) + if job.nodes_required <= available: + # Valid scheduling + self.engine.scheduler.place_job_and_manage_queues( + job, queue, self.engine.running, self.engine.current_timestep + ) + else: + invalid_action = True + + # advance simulation by one tick + tick_data = next(self.generator) + + # compute reward + if invalid_action: + reward = -1.0 + else: + reward = self._compute_reward(tick_data) + + # Print stats + stats = self.get_stats() + print_stats(stats) + + obs = self._get_state() + done = self.engine.current_timestep >= self.engine.timestep_end + info = {} + + print(f"t={self.engine.current_timestep}, " + f"queue={len(self.engine.queue)}, " + f"running={len(self.engine.running)}, " + f"completed={self.engine.jobs_completed}", + f"action={action}") + + return obs, reward, done, info + + def _get_state(self): + """Construct simple state representation from engine's job queue.""" + # Example: take waiting jobs (haven’t started yet) + #job_queue = [j for j in self.engine.jobs if getattr(j, "start_time", None) is None] + job_queue = [j for j in self.jobs if getattr(j, "start_time", None) is None] + + max_jobs, job_features = self.observation_space.shape + state = np.zeros((max_jobs, job_features), dtype=np.float32) + + for i, job in enumerate(job_queue[:max_jobs]): + features = [ + getattr(job, "nodes_required", 0), + getattr(job, "wall_time", 0), + getattr(job, "priority", 0), + getattr(job, "wait_time", 0), # may need to compute from current_timestep - qdt + ] + state[i, : len(features)] = features + + return state + + def render(self, mode="human"): + print("Timestep:", self.engine.current_timestep, + "Utilization:", self.telemetry.utilization(), + "Power:", self.telemetry.power()) + + def get_stats(self): + return { + "engine_stats": get_engine_stats(self.engine), + "job_stats": get_job_stats(self.engine), + "scheduler_stats": get_scheduler_stats(self.engine), + "network_stats": get_network_stats(self.engine) + } diff --git a/raps/schedulers/rl.py b/raps/schedulers/rl.py new file mode 100644 index 0000000..2272e9d --- /dev/null +++ b/raps/schedulers/rl.py @@ -0,0 +1,35 @@ +from raps.schedulers.default import Scheduler as DefaultScheduler + + +class Scheduler(DefaultScheduler): + """ + Scheduler driven by RL agent actions. + RAPSEnv.step(action) sets env.pending_action, + then RLScheduler.schedule() reads it and acts. + """ + + def __init__(self, config, policy, resource_manager, env=None, *args, **kwargs): + super().__init__(config=config, policy=policy, resource_manager=resource_manager, *args, **kwargs) + self.env = env + self.pending_action = None + + def schedule(self, queue, running, current_time, **kwargs): + if not queue or self.pending_action is None: + return + + action = self.pending_action + if action >= len(queue): + return + + job = queue[action] + + # Check feasibility + if job.nodes_required <= len(self.resource_manager.available_nodes): + self.place_job_and_manage_queues(job, queue, running, current_time) + else: + # Invalid action → skip or log + if self.config.args.get("debug", False): + print(f"[t={current_time}] RL chose invalid job {job.id} (needs {job.nodes_required})") + + # Reset action after use + self.pending_action = None diff --git a/raps/train_rl.py b/raps/train_rl.py new file mode 100644 index 0000000..05ec812 --- /dev/null +++ b/raps/train_rl.py @@ -0,0 +1,73 @@ +from stable_baselines3 import PPO +from raps.envs.raps_env import RAPSEnv +from raps.sim_config import SimConfig + +""" +Example usage: + + python main.py train-rl --system mit_supercloud -f /opt/data/mit_supercloud + +""" + +def train_rl_add_parser(subparsers): + parser = subparsers.add_parser( + "train-rl", + help="Train an RL agent with PPO on RAPS environment", + ) + parser.add_argument( + "--system", + type=str, + required=True, + help="System configuration to use (e.g., mit_supercloud)", + ) + parser.add_argument( + "-f", "--replay", + type=str, + help="Replay file (.npz) or telemetry dataset", + ) + parser.set_defaults(impl=main) + + +def main(args): + # Convert CLI args into SimConfig + sim_config = SimConfig(system=args.system, replay=[args.replay] if args.replay else None) + + # Legacy namespace + args_ns = sim_config.get_legacy_args() + + if args.replay: + args_ns.replay = [args.replay] + + # Build kwargs for RAPSEnv aligned with Engine + env_kwargs = { + "args": args_ns, + "config": sim_config.system_configs[0].get_legacy(), + "sim_config": sim_config, + "system_config": sim_config.system_configs[0], + # optional extras + "replay": sim_config.replay, + "validate": args_ns.validate, + "downscale": sim_config.downscale, + } + + env = RAPSEnv(**env_kwargs) + + model = PPO( + "MlpPolicy", + env, + n_steps=512, + batch_size=128, + n_epochs=10, + gamma=0.99, + learning_rate=3e-4, + ent_coef=0.01, + verbose=1, + tensorboard_log="./ppo_raps_logs/" + ) + + model.learn(total_timesteps=10000, tb_log_name="ppo_raps") + + stats = env.get_stats() + print("Training stats:", stats) + + model.save("ppo_raps") -- GitLab