diff --git a/README.md b/README.md index bccaec147779664d9910348fab2ebbb23dd33b2f..eb39cc718c5ad7e4b86ea20d08cc2741af94f867 100644 --- a/README.md +++ b/README.md @@ -62,16 +62,21 @@ For MIT Supercloud python -m raps.dataloaders.mit_supercloud.cli download --start 2021-05-21T13:00 --end 2021-05-21T14:00 # Load data and run simulation - will save data as part-cpu.npz and part-gpu.npz files - raps run-parts -x mit_supercloud -f $DPATH --system mit_supercloud --start 2021-05-21T13:00 --end 2021-05-21T14:00 + raps run-parts -x mit_supercloud -f $DPATH --start 2021-05-21T13:00 --end 2021-05-21T14:00 + # or simply + raps run-parts experiments/mit-replay-25hrs.yaml # Note: if no start, end dates provided will default to run 24 hours between # 2021-05-21T00:00 to 2021-05-22T00:00 set by defaults in raps/dataloaders/mit_supercloud/utils.py # Re-run simulation using npz files (much faster load) - raps run-parts -x mit_supercloud -f part-*.npz --system mit_supercloud + raps run-parts -x mit_supercloud -f part-*.npz # Synthetic tests for verification studies: raps run-parts -x mit_supercloud -w multitenant + # Reinforcement learning test case + python main.py train-rl --system mit_supercloud/part-cpu -f /opt/data/mit_supercloud/202201 + For Lumi # Synthetic test for Lumi: @@ -168,7 +173,7 @@ See instructions in [dashboard/README.md](https://code.ornl.gov/exadigit/simulat ## Running Tests -RAPS uses [pytest](https://docs.pytest.org/) for its test suite. +RAPS uses [pytest](https://docs.pytest.org/) for its test suite. Before running tests, ensure that you have a valid data directory available (e.g., `/opt/data`) and set the environment variable `RAPS_DATA_DIR` to point to it. ### Run all tests diff --git a/config/mit_supercloud/part-cpu.yaml b/config/mit_supercloud/part-cpu.yaml index 111882dd85186148d380096164fa85ffe258c482..b780b10f6ea51d395189e8c3fdf44118a77e1d38 100644 --- a/config/mit_supercloud/part-cpu.yaml +++ b/config/mit_supercloud/part-cpu.yaml @@ -36,7 +36,7 @@ power: power_cost: 0.094 scheduler: multitenant: true - job_arrival_time: 900 + job_arrival_time: 1 mtbf: 11 trace_quanta: 10 min_wall_time: 3600 diff --git a/experiments/frontier.yaml b/experiments/frontier.yaml index f865a197a61a5199714aa93579d112ec929be7c9..280d95f8ae8e68329814e2f0d043cc2c3f20db84 100644 --- a/experiments/frontier.yaml +++ b/experiments/frontier.yaml @@ -1,4 +1,4 @@ system: frontier replay: - - ~/data/frontier/slurm/joblive/date=2024-01-18 - - ~/data/frontier/jobprofile/date=2024-01-18 + - /opt/data/frontier/slurm/joblive/date=2024-01-18 + - /opt/data/frontier/jobprofile/date=2024-01-18 diff --git a/experiments/gcloudv2.yaml b/experiments/gcloudv2.yaml index 85a1d6ce68bbdb5a0de88d592511fab8e7cac64c..db8e21839ce05c788208fd66e839713f0e967e17 100644 --- a/experiments/gcloudv2.yaml +++ b/experiments/gcloudv2.yaml @@ -1,4 +1,4 @@ system: gcloudv2 replay: - - ~/data/gcloud/v2/google_cluster_data_2011_sample + - /opt/data/gcloud/v2/google_cluster_data_2011_sample ff: 600 diff --git a/experiments/marconi100.yaml b/experiments/marconi100.yaml index 05681571789cdb97e9a9f6c8918cfb127e701c13..859222975067c11a03bf83395101572d87500463 100644 --- a/experiments/marconi100.yaml +++ b/experiments/marconi100.yaml @@ -1,3 +1,3 @@ system: marconi100 replay: - - ~/data/marconi100/job_table.parquet + - /opt/data/marconi100/job_table.parquet diff --git a/experiments/mitrl.yaml b/experiments/mitrl.yaml new file mode 100644 index 0000000000000000000000000000000000000000..3bbd988d0d4005821e2b588304eb2bedf251e6fb --- /dev/null +++ b/experiments/mitrl.yaml @@ -0,0 +1,7 @@ +system: "mit_supercloud" +replay: + - /opt/data/mit_supercloud +start: 2021-05-21T00:00 +end: 2021-05-22T00:00 +episode_length: 500 +arrival: poisson diff --git a/main.py b/main.py index 7c38960d4ae576c2bbb1abcf1c59b2fb7b4d6c44..9b36a76ec0a0218fa94eecc09c7971af59f25d77 100644 --- a/main.py +++ b/main.py @@ -6,6 +6,7 @@ from raps.helpers import check_python_version from raps.run_sim import run_sim_add_parser, run_parts_sim_add_parser, show_add_parser from raps.workload import run_workload_add_parser from raps.telemetry import run_telemetry_add_parser +from raps.train_rl import train_rl_add_parser check_python_version() @@ -24,6 +25,7 @@ def main(cli_args: list[str] | None = None): show_add_parser(subparsers) run_workload_add_parser(subparsers) run_telemetry_add_parser(subparsers) + train_rl_add_parser(subparsers) # TODO: move other misc scripts into here diff --git a/pyproject.toml b/pyproject.toml index f396280caab9911fba348a2eca34713804312fcf..c009d2aa23d3d1cdb5c45119bdcf1ada91eb6416 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "raps" -version = "0.0.1" +version = "2.0.0.dev0" requires-python = ">=3.12" description = "RAPS" readme = "README.md" @@ -30,6 +30,8 @@ dependencies = [ "pyyaml>=6.0.2", "pydantic>=2.11.7", "pydantic-settings>=2.10.1", + "stable-baselines3==2.7.0", + "gym==0.26.2", "pre-commit" ] diff --git a/raps/dataloaders/frontier.py b/raps/dataloaders/frontier.py index 0e1bdcde2acb39f395fe794d14ba9195da38a162..3849731d76ed041aa14672be7796aa6a60fbae28 100644 --- a/raps/dataloaders/frontier.py +++ b/raps/dataloaders/frontier.py @@ -119,7 +119,7 @@ def load_data_from_df(jobs_df: pd.DataFrame, jobprofile_df: pd.DataFrame, **kwar - end_time # Maybe Null - expected_run_time (end_time - start_time) # Maybe Null - current_run_time (How long did the job run already, when loading) # Maybe zero - - trace_time (lenght of each trace in seconds) # Maybe Null + - trace_time (length of each trace in seconds) # Maybe Null - trace_start_time (time offset in seconds after which the trace starts) # Maybe Null - trace_end_time (time offset in seconds after which the trace ends) # Maybe Null - trace_quanta (job's associated trace quanta, to correctly replay with different trace quanta) # Maybe Null @@ -273,23 +273,19 @@ def load_data_from_df(jobs_df: pd.DataFrame, jobprofile_df: pd.DataFrame, **kwar indices = xname_to_index(xname, config) scheduled_nodes.append(indices) - # Throw out jobs that are not valid! - if gpu_trace.size == 0: - print("ignoring job b/c zero trace:", jidx, submit_time, start_time, nodes_required) - continue # SKIP! if end_time < telemetry_start: - # raise ValueError("Job ends before frist recorded telemetry entry:", - # job_id, "start:", start_time,"end:",end_time, - # " Telemetry: ", len(gpu_trace), "entries.") - print("Job ends before frist recorded telemetry entry:", job_id, "start:", - start_time, "end:", end_time, " Telemetry: ", len(gpu_trace), "entries.") - continue # SKIP! + print("Job ends before first recorded telemetry entry:", job_id, "start:", + start_time, "end:", end_time, " Telemetry: ", len(gpu_trace), "entries.") + continue # skip + if start_time > telemetry_end: - # raise ValueError("Job starts after last recorded telemetry entry:", - # job_id, "start:", start_time,"end:",end_time, - # " Telemetry: ", len(gpu_trace), "entries.") print("Job starts after last recorded telemetry entry:", job_id, "start:", - start_time, "end:", end_time, " Telemetry: ", len(gpu_trace), "entries.") + start_time, "end:", end_time, " Telemetry: ", len(gpu_trace), "entries.") + continue # skip + + # Throw out jobs that are not valid! + if gpu_trace.size == 0: + print("ignoring job b/c zero trace:", jidx, submit_time, start_time, nodes_required) continue # SKIP! if gpu_trace.size > 0 and (jid == job_id or jid == '*'): # and time_submit >= 0: diff --git a/raps/dataloaders/mit_supercloud/loader.py b/raps/dataloaders/mit_supercloud/loader.py index ab68eb7c941bd4677eddd95a98575948e39a1693..a6229657ea1aaaf23e43993a3e7697d09265ca3b 100644 --- a/raps/dataloaders/mit_supercloud/loader.py +++ b/raps/dataloaders/mit_supercloud/loader.py @@ -119,7 +119,7 @@ from collections import Counter from datetime import datetime, timezone from raps.job import job_dict, Job -from raps.utils import summarize_ranges, WorkloadData +from raps.utils import summarize_ranges, next_arrival, WorkloadData from .utils import proc_cpu_series, proc_gpu_series, to_epoch from .utils import DEFAULT_START, DEFAULT_END @@ -298,7 +298,7 @@ def load_data(local_dataset_path, **kwargs): ]) # partition mode - part = kwargs.get("partition", "").split("/")[-1].lower() + part = (kwargs.get("partition") or "").split("/")[-1].lower() cpu_only = (part == "part-cpu") mixed = (part == "part-gpu") diff --git a/raps/engine.py b/raps/engine.py index c569bd0d7903f7d974e783a8d7657762e40c653f..419f1eb6c9e3e676ec73e1a53203c27adc1b8e67 100644 --- a/raps/engine.py +++ b/raps/engine.py @@ -488,23 +488,25 @@ class Engine: return simulation_complete def tick(self, *, time_delta=1, replay=False): - # Tick runs all simulations of interest at the given time delta interval. - # - # The simulations which are needed for simulations consistency at each time step - # (inside: the main simulation loop of run_simulation) are not part of tick. - # - # Tick contains: - # For each running job: - # - CPU utilization - # - GPU utilization - # - Network utilization - # - # From these the systems (across all nodes) - # - System Utilization - # - Power - # - Cooling - # - System Performance - # is simulated. + """ + Tick runs all simulations of interest at the given time delta interval. + + The simulations which are needed for simulations consistency at each time step + (inside: the main simulation loop of run_simulation) are not part of tick. + + Tick contains: + For each running job: + - CPU utilization + - GPU utilization + - Network utilization + + From these the systems (across all nodes) + - System Utilization + - Power + - Cooling + - System Performance + is simulated. + """ scheduled_nodes = [] cpu_utils = [] @@ -523,7 +525,7 @@ class Engine: if job.current_state != JobState.RUNNING: raise ValueError( f"Job {job.id} is in running list, " + - "but state is not RUNNING: job.state == {job.current_state}" + f"but state is not RUNNING: job.state == {job.current_state}" ) else: # if job.state == JobState.RUNNING: # Error checks @@ -734,7 +736,7 @@ class Engine: # listener_thread = threading.Thread(target=keyboard_listener, args=(sim_state,), daemon=True) # listener_thread.start() - while self.current_timestep < self.timestep_end: # Runs every seconds! + while self.current_timestep < self.timestep_end: # Runs every second if sim_state.is_paused(): time.sleep(0.1) diff --git a/raps/envs/raps_env.py b/raps/envs/raps_env.py new file mode 100644 index 0000000000000000000000000000000000000000..e27a6d254b7cd5035bda0fc490170e19bfcd08ad --- /dev/null +++ b/raps/envs/raps_env.py @@ -0,0 +1,185 @@ +import gym +from gym import spaces +import numpy as np + +from raps.engine import Engine +from raps.stats import get_engine_stats, get_job_stats, get_scheduler_stats, get_network_stats + +from stable_baselines3.common.logger import Logger, HumanOutputFormat +import sys + +logger = Logger(folder=None, output_formats=[HumanOutputFormat(sys.stdout)]) + + +def print_stats(stats, step=0): + """prints SB3-style stats output""" + + wanted_keys = { + "time simulated": "engine/Time Simulated", + "average power": "engine/Average Power", + "system power efficiency": "engine/System Power Efficiency", + "total energy consumed": "engine/Total Energy Consumed", + "carbon emissions": "engine/Carbon Footprint", + "jobs completed": "jobs/Jobs Completed", + "throughput": "jobs/Throughput", + "jobs still running": "jobs/Jobs Still Running", + } + + for section in ["engine_stats", "job_stats"]: + if section in stats: + for k, v in stats[section].items(): + if k.lower() in wanted_keys: + if k.lower() == "jobs still running" and isinstance(v, list): + v = len(v) + logger.record(wanted_keys[k.lower()], v) + + logger.dump(step=step) + + +class RAPSEnv(gym.Env): + """ + Minimal Gym-compatible wrapper around RAPS Engine + for RL job scheduling experiments. + """ + + metadata = {"render.modes": ["human"]} + + def __init__(self, sim_config): + super().__init__() + # Store everything in self.args + self.sim_config = sim_config + self.engine = self._create_engine() + + # --- RL spaces --- + max_jobs = 100 + job_features = 4 # [nodes, runtime, priority, wait_time] + self.observation_space = spaces.Box( + low=0, high=1, shape=(max_jobs, job_features), dtype=np.float32 + ) + self.action_space = spaces.Discrete(max_jobs) + + def _create_engine(self): + engine, workload_data, time_delta = Engine.from_sim_config(self.sim_config) + engine.scheduler.env = self + self.jobs = workload_data.jobs + timestep_start = workload_data.telemetry_start + timestep_end = workload_data.telemetry_end + self.generator = engine.run_simulation(self.jobs, timestep_start, timestep_end, time_delta) + return engine + + def reset(self, **kwargs): + self.engine = self._create_engine() + obs = self._get_state() + return obs + + def _compute_reward(self, tick_data): + """ + Reward function for RL scheduling on Frontier-like systems. + Balances throughput and carbon footprint, using incremental values. + """ + + # How many jobs completed *this tick* + jobs_done = len(getattr(tick_data, "completed", [])) + + # Incremental carbon emitted this tick + carbon_step = getattr(self.engine, "carbon emissions", 0.0) + + # Tradeoff weights (tunable hyperparameters) + alpha = 10.0 # reward for finishing a job + beta = 0.1 # penalty per metric ton CO2 + + # Reward = (jobs * alpha) - (carbon * beta) + reward = (alpha * jobs_done) - (beta * carbon_step) + + # Small penalty if idle and no jobs complete + if jobs_done == 0 and carbon_step == 0: + reward -= 0.01 + + return reward + + def step(self, action): + if self.engine is None: + raise RuntimeError("Engine not initialized. Did you forget to call reset()?") + + queue = self.engine.queue + invalid_action = False + + # If queue empty or index out of range → invalid + if len(queue) == 0 or action >= len(queue): + invalid_action = True + else: + job = queue[int(action)] + available_nodes = self.engine.scheduler.resource_manager.available_nodes + + if job.nodes_required <= len(available_nodes): + # Just pick the first available node (simplest placement policy) + node_id = available_nodes[0] + self.engine.scheduler.place_job_and_manage_queues( + job, + queue, + self.engine.running, + self.engine.current_timestep, + node_id, + ) + else: + invalid_action = True + + # advance simulation by one tick + tick_data = next(self.generator) + + # compute reward + if invalid_action: + reward = -1.0 + else: + reward = self._compute_reward(tick_data) + + # clip reward + reward = np.clip(reward, -10.0, 10.0) + + # Print stats + stats = self.get_stats() + print_stats(stats) + + obs = self._get_state() + done = self.engine.current_timestep >= self.engine.timestep_end + info = {} + + print(f"t={self.engine.current_timestep}, " + f"queue={len(self.engine.queue)}, " + f"running={len(self.engine.running)}, " + f"completed={self.engine.jobs_completed}", + f"action={action}") + + return obs, reward, done, info + + def _get_state(self): + """Construct simple state representation from engine's job queue.""" + # Example: take waiting jobs (haven’t started yet) + job_queue = [j for j in self.jobs if getattr(j, "start_time", None) is None] + + max_jobs, job_features = self.observation_space.shape + state = np.zeros((max_jobs, job_features), dtype=np.float32) + + for i, job in enumerate(job_queue[:max_jobs]): + features = [ + getattr(job, "nodes_required", 0), + getattr(job, "wall_time", 0), + getattr(job, "priority", 0), + getattr(job, "wait_time", 0), # may need to compute from current_timestep - qdt + ] + state[i, : len(features)] = features + + return state + + def render(self, mode="human"): + print("Timestep:", self.engine.current_timestep, + "Utilization:", self.telemetry.utilization(), + "Power:", self.telemetry.power()) + + def get_stats(self): + return { + "engine_stats": get_engine_stats(self.engine), + "job_stats": get_job_stats(self.engine), + "scheduler_stats": get_scheduler_stats(self.engine), + "network_stats": get_network_stats(self.engine) + } diff --git a/raps/resmgr/default.py b/raps/resmgr/default.py index ab1e7746a24eef87c5015118defec29a9d5689d0..198fce774042b7dcd84bd6b20d3f43f0fd2d9796 100644 --- a/raps/resmgr/default.py +++ b/raps/resmgr/default.py @@ -42,7 +42,8 @@ class ExclusiveNodeResourceManager: """Assigns full nodes to a job (replay or count-based).""" # Ensure enough free nodes if len(self.available_nodes) < job.nodes_required: - raise ValueError(f"Not enough available nodes to schedule job {job.id}") + raise ValueError(f"Not enough available nodes to schedule job {job.id}", + f"{len(self.available_nodes)} < {job.nodes_required}") if policy == PolicyType.REPLAY and job.scheduled_nodes: # Telemetry replay: use the exact nodes @@ -65,8 +66,10 @@ class ExclusiveNodeResourceManager: if n not in self.available_nodes: self.available_nodes.append(n) else: - raise KeyError((f"Atempting to free node {n} after completion of job {job.id}. " + - "Node is already free (in available nodes)!")) + # Already free — log instead of raising + print(f"[WARN] Tried to free node {n}, but it was already available") + print(f"Atempting to free node {n} after completion of job {job.id}. " + + "Node is already free (in available nodes)!") self.available_nodes = sorted(self.available_nodes) def update_system_utilization(self, current_time, running_jobs): diff --git a/raps/run_sim.py b/raps/run_sim.py index ce89529a7fc925f240a4cc4c513927aa25ecfc8c..4a3f9b385ed65e57e2e52aa3f7718d8ed436bc0b 100644 --- a/raps/run_sim.py +++ b/raps/run_sim.py @@ -13,7 +13,7 @@ from raps.ui import LayoutManager from raps.plotting import Plotter from raps.engine import Engine from raps.multi_part_engine import MultiPartEngine -from raps.utils import write_dict_to_file, pydantic_add_args, SubParsers, yaml_dump +from raps.utils import write_dict_to_file, pydantic_add_args, SubParsers, yaml_dump, read_yaml from raps.stats import ( get_engine_stats, get_job_stats, @@ -22,33 +22,7 @@ from raps.stats import ( print_formatted_report ) -from raps.sim_config import SingleSimConfig, MultiPartSimConfig - - -def read_yaml(config_file: str): - if config_file == "-": - return yaml.safe_load(sys.stdin.read()) - elif config_file: - return yaml.safe_load(Path(config_file).read_text()) - else: - return {} - - -shortcuts = { - "partitions": "x", - "cooling": "c", - "simulate-network": "net", - "fastforward": "ff", - "time": "t", - "debug": "d", - "numjobs": "n", - "verbose": "v", - "output": "o", - "uncertainties": "u", - "plot": "p", - "replay": "f", - "workload": "w", -} +from raps.sim_config import SingleSimConfig, MultiPartSimConfig, SIM_SHORTCUTS def run_sim_add_parser(subparsers: SubParsers): @@ -63,7 +37,7 @@ def run_sim_add_parser(subparsers: SubParsers): flags. Pass "-" to read from stdin. """) model_validate = pydantic_add_args(parser, SingleSimConfig, model_config={ - "cli_shortcuts": shortcuts, + "cli_shortcuts": SIM_SHORTCUTS, }) parser.set_defaults( impl=lambda args: run_sim(model_validate(args, read_yaml(args.config_file))) @@ -238,7 +212,7 @@ def run_parts_sim_add_parser(subparsers: SubParsers): flags. Pass "-" to read from stdin. """) model_validate = pydantic_add_args(parser, MultiPartSimConfig, model_config={ - "cli_shortcuts": shortcuts, + "cli_shortcuts": SIM_SHORTCUTS, }) parser.set_defaults( impl=lambda args: run_parts_sim(model_validate(args, read_yaml(args.config_file))) @@ -325,7 +299,7 @@ def show_add_parser(subparsers: SubParsers): If true, include defaults in the output YAML """) model_validate = pydantic_add_args(parser, SingleSimConfig, model_config={ - "cli_shortcuts": shortcuts, + "cli_shortcuts": SIM_SHORTCUTS, }) def impl(args): diff --git a/raps/schedulers/rl.py b/raps/schedulers/rl.py new file mode 100644 index 0000000000000000000000000000000000000000..2272e9d7eb7cb6beca0b12142c1c9a4c71a8f9d1 --- /dev/null +++ b/raps/schedulers/rl.py @@ -0,0 +1,35 @@ +from raps.schedulers.default import Scheduler as DefaultScheduler + + +class Scheduler(DefaultScheduler): + """ + Scheduler driven by RL agent actions. + RAPSEnv.step(action) sets env.pending_action, + then RLScheduler.schedule() reads it and acts. + """ + + def __init__(self, config, policy, resource_manager, env=None, *args, **kwargs): + super().__init__(config=config, policy=policy, resource_manager=resource_manager, *args, **kwargs) + self.env = env + self.pending_action = None + + def schedule(self, queue, running, current_time, **kwargs): + if not queue or self.pending_action is None: + return + + action = self.pending_action + if action >= len(queue): + return + + job = queue[action] + + # Check feasibility + if job.nodes_required <= len(self.resource_manager.available_nodes): + self.place_job_and_manage_queues(job, queue, running, current_time) + else: + # Invalid action → skip or log + if self.config.args.get("debug", False): + print(f"[t={current_time}] RL chose invalid job {job.id} (needs {job.nodes_required})") + + # Reset action after use + self.pending_action = None diff --git a/raps/sim_config.py b/raps/sim_config.py index a73cd3e29a577e8828d63157a02bf209aff335fd..05c078c257ebd5075121458f3b2b791218cc26ab 100644 --- a/raps/sim_config.py +++ b/raps/sim_config.py @@ -391,3 +391,20 @@ class MultiPartSimConfig(SimConfig): @cached_property def _multi_partition_system_config(self): return get_partition_configs(self.partitions) + + +SIM_SHORTCUTS = { + "partitions": "x", + "cooling": "c", + "simulate-network": "net", + "fastforward": "ff", + "time": "t", + "debug": "d", + "numjobs": "n", + "verbose": "v", + "output": "o", + "uncertainties": "u", + "plot": "p", + "replay": "f", + "workload": "w", +} diff --git a/raps/stats.py b/raps/stats.py index b4cfbfba4a36224c05db827e076e68883a47f17f..aa8610ed2cbba7a9460a7cc7b5bd7980b2c1ddab 100644 --- a/raps/stats.py +++ b/raps/stats.py @@ -32,13 +32,13 @@ def get_engine_stats(engine: Engine): stats = { 'time simulated': time_simulated, 'num_samples': num_samples, - 'average power': f'{average_power_mw:.2f} MW', - 'min loss': f'{min_loss_mw:.2f} MW', + 'average power': f'{average_power_mw:.4f} MW', + 'min loss': f'{min_loss_mw:.4f} MW', 'average loss': f'{average_loss_mw:.2f} MW', 'max loss': f'{max_loss_mw:.2f} MW', 'system power efficiency': f'{efficiency * 100:.2f}%', 'total energy consumed': f'{total_energy_consumed:.2f} MW-hr', - 'carbon emissions': f'{emissions:.2f} metric tons CO2', + 'carbon emissions': f'{emissions:.4f} metric tons CO2', 'total cost': f'${total_cost:.2f}' } @@ -46,7 +46,10 @@ def get_engine_stats(engine: Engine): # Multitenancy Stats total_jobs_loaded = engine.total_initial_jobs # Assuming this is passed to __init__ stats['total jobs loaded'] = total_jobs_loaded - stats['jobs completed percentage'] = f"{(engine.jobs_completed / total_jobs_loaded * 100):.2f}%" + if total_jobs_loaded > 0: + stats['jobs completed percentage'] = f"{(engine.jobs_completed / total_jobs_loaded * 100):.2f}%" + else: + stats['jobs completed percentage'] = "0%" if engine.node_occupancy_history: # Calculate average concurrent jobs per node (average density across all nodes and timesteps) diff --git a/raps/telemetry.py b/raps/telemetry.py index 63ee158f0a69f658fe9af03c6ea1f31e3a83c34a..6d5aa19a25bac3c91abca7c6bc66bcb91e683a3d 100644 --- a/raps/telemetry.py +++ b/raps/telemetry.py @@ -77,11 +77,12 @@ class Telemetry: def __init__(self, **kwargs): self.kwargs = kwargs - self.system = kwargs.get('system') + self.system = kwargs['system'] self.config = kwargs.get('config') try: - self.dataloader = importlib.import_module(f"raps.dataloaders.{self.system}", package=__package__) + module = self.system.split("/")[0] + self.dataloader = importlib.import_module(f"raps.dataloaders.{module}", package=__package__) except ImportError as e: print(f"WARNING: Failed to load dataloader: {e}") self.dataloader = None @@ -228,6 +229,8 @@ class Telemetry: job.scheduled_nodes = None # Setting to None triggers scheduler to assign nodes if self.kwargs['arrival'] == "poisson": + # TODO: --arrival poisson distribution throws errors about start_time in some scenarios + # e.g. `python main.py run-parts experiments/mit-replay-24hrs.yaml --arrival poisson` for job in jobs: job.scheduled_nodes = None job.submit_time = next_arrival_byconfargs(self.config, self.kwargs) diff --git a/raps/train_rl.py b/raps/train_rl.py new file mode 100644 index 0000000000000000000000000000000000000000..eac41724610ddd021c4c1650cf22ffaf42b4287e --- /dev/null +++ b/raps/train_rl.py @@ -0,0 +1,56 @@ +from raps.sim_config import SingleSimConfig, SIM_SHORTCUTS +from raps.utils import SubParsers, pydantic_add_args, read_yaml + + +def train_rl_add_parser(subparsers: SubParsers): + parser = subparsers.add_parser("train-rl", description=""" + Example usage: + raps train-rl --system mit_supercloud/part-gpu -f /opt/data/mit_supercloud/202201 + """) + parser.add_argument("config_file", nargs="?", default=None, help=""" + YAML sim config file, can be used to configure an experiment instead of using CLI + flags. Pass "-" to read from stdin. + """) + model_validate = pydantic_add_args(parser, SingleSimConfig, model_config={ + "cli_shortcuts": SIM_SHORTCUTS, + }) + + def impl(args): + model = model_validate(args, read_yaml(args.config_file)) + model.scheduler = "rl" + train_rl(model) + parser.set_defaults(impl=impl) + + +def train_rl(rl_config: SingleSimConfig): + from stable_baselines3 import PPO + from raps.envs.raps_env import RAPSEnv + + args_dict = rl_config.get_legacy_args_dict() + config = rl_config.system_configs[0].get_legacy() + args_dict['config'] = config + args_dict['args'] = rl_config.get_legacy_args() + + env = RAPSEnv(rl_config) + + model = PPO( + "MlpPolicy", + env, + n_steps=512, # shorter rollouts (quicker feedback loop) + batch_size=128, # must divide n_steps evenly + n_epochs=10, # of minibatch passes per update + gamma=0.99, # discount (keeps long-term credit) + learning_rate=3e-4, # default Adam lr, can try 1e-4 if unstable + ent_coef=0.01, # encourage exploration + verbose=1, + tensorboard_log="./ppo_raps_logs/" + ) + + model.learn(total_timesteps=10000, tb_log_name="ppo_raps") + + # Output stats + stats = env.get_stats() + print(stats) + + # Save trained model + model.save("ppo_raps") diff --git a/raps/utils.py b/raps/utils.py index ab02a2ae1691fee09cf16ffb15c1e6429d46a711..c3c541f6f817f9480bc5ec0914800c8a548f3ace 100644 --- a/raps/utils.py +++ b/raps/utils.py @@ -54,6 +54,19 @@ def deep_subtract_dicts(a: dict, b: dict): return a +def to_dict(arg): + """ + Normalizes arg to a dictionary if necessary. Used to convert between legacy argparse.Namespace + objects and dictionaries. + """ + if isinstance(arg, dict): + return arg + elif isinstance(arg, argparse.Namespace): + return vars(arg) + else: + raise ValueError(f"Cannot convert {arg} to dict") + + def sum_values(values): return sum(x[1] for x in values) if values else 0 @@ -456,14 +469,15 @@ def create_dir_indexed(dir: str, path: str = None) -> str: def next_arrival_byconfargs(config, args, reset=False): + args = to_dict(args) arrival_rate = 1 arrival_time = config['JOB_ARRIVAL_TIME'] - downscale = args.downscale + downscale = args['downscale'] - if args.job_arrival_rate: - arrival_rate = args.job_arrival_rate - if args.job_arrival_time: - arrival_time = args.job_arrival_time + if args['job_arrival_rate']: + arrival_rate = args['job_arrival_rate'] + if args['job_arrival_time']: + arrival_time = args['job_arrival_time'] return next_arrival(arrival_rate / (arrival_time * downscale), reset) @@ -750,6 +764,16 @@ def yaml_dump(data): ) +def read_yaml(config_file: str): + """ Parses yaml file. Pass "-" to read from stdin """ + if config_file == "-": + return yaml.safe_load(sys.stdin.read()) + elif config_file: + return yaml.safe_load(Path(config_file).read_text()) + else: + return {} + + class WorkloadData(RAPSBaseModel): """ Represents a workload, a list of jobs with some metadata. Returned by dataloaders load_data() diff --git a/raps/workload.py b/raps/workload.py index 2a630b28ce07520313d599151cc15490bf38ce1d..2d572274aca602fe80d46e522d484fb52d91033a 100644 --- a/raps/workload.py +++ b/raps/workload.py @@ -653,10 +653,6 @@ class Workload: """ Generate deterministic jobs to validate multitenant scheduling & power. - usage example: - - python main.py run-multi-part -x mit_supercloud -w multitenant - Parameters ---------- mode : str @@ -956,7 +952,7 @@ def plot_job_hist(jobs, config=None, dist_split=None, gantt_nodes=False): def run_workload_add_parser(subparsers: SubParsers): - from raps.run_sim import shortcuts + from raps.sim_config import SIM_SHORTCUTS # TODO: Separate the arguments for this command parser = subparsers.add_parser("workload", description=""" Saves workload as a snapshot. @@ -966,7 +962,7 @@ def run_workload_add_parser(subparsers: SubParsers): flags. Pass "-" to read from stdin. """) model_validate = pydantic_add_args(parser, SingleSimConfig, model_config={ - "cli_shortcuts": shortcuts, + "cli_shortcuts": SIM_SHORTCUTS, }) parser.set_defaults(impl=lambda args: run_workload(model_validate(args, {})))