Loading config/kestrel.yaml 0 → 100644 +53 −0 Original line number Diff line number Diff line system: num_cdus: 6 racks_per_cdu: 6 nodes_per_rack: 80 rectifiers_per_rack: 6 chassis_per_rack: 1 nodes_per_blade: 1 switches_per_chassis: 5 nics_per_node: 2 rectifiers_per_chassis: 5 nodes_per_rectifier: 4 missing_racks: [] down_nodes: [] cpus_per_node: 1 gpus_per_node: 4 cpu_peak_flops: 396800000000.0 gpu_peak_flops: 7800000000000.0 cpu_fp_ratio: 0.69 gpu_fp_ratio: 0.69 power: power_gpu_idle: 75 power_gpu_max: 300 power_cpu_idle: 100 power_cpu_max: 800 power_mem: 74.26 power_nic: 21 power_nvme: 45 power_switch: 250 power_cdu: 0 power_update_freq: 20 rectifier_peak_threshold: 13670 sivoc_loss_constant: 0 sivoc_efficiency: 1 rectifier_loss_constant: 0 rectifier_efficiency: 1 power_cost: 0.094 scheduler: seed: 42 job_arrival_time: 20 mtbf: 11 trace_quanta: 20 min_wall_time: 3600 max_wall_time: 43200 ui_update_freq: 3600 max_nodes_per_job: 3000 job_end_probs: COMPLETED: 0.63 FAILED: 0.13 CANCELLED: 0.12 TIMEOUT: 0.11 NODE_FAIL: 0.01 No newline at end of file raps/dataloaders/kestrel.py 0 → 100644 +170 −0 Original line number Diff line number Diff line """ Load data for NREL's Kestrel cluster. """ import uuid import pandas as pd from tqdm import tqdm from ..job import job_dict, Job from ..utils import power_to_utilization, next_arrival def load_data(jobs_path, **kwargs): """ Reads job and job profile data from parquet files and parses them. Parameters ---------- jobs_path : str The path to the jobs parquet file. Returns ------- list The list of parsed jobs. """ jobs_df = pd.read_parquet(jobs_path, engine='pyarrow') return load_data_from_df(jobs_df, **kwargs) def load_data_from_df(jobs_df: pd.DataFrame, **kwargs): """ Reads job and job profile data from parquet files and parses them. Requires the following fields in the DataFrame: - start_time (timestamp): Time execution begins (actual or expected) - job_id (int): Job ID - node_power_consumption (List[int]): Power consumption of the job, recorded at Node level - nodes_required (int): Number of nodes allocated to the job - cpu_power_consumption (List[int]): Power consumption of the job, recorded at CPU level (don't have this) - mem_power_consumption (List[int]): Power consumption of the job, recorded at Memory level (don't have this) - priority (int): Relative priority of the job, 0=held, 1=required nodes DOWN/DRAINED - job_state (string): State of the job, see enum job_states for possible values - wall_time (int): Actual runtime of job, in seconds - nodes (string): List of nodes allocated to job Returns ------- list The list of parsed jobs. """ config = kwargs.get('config') min_time = kwargs.get('min_time', None) reschedule = kwargs.get('reschedule') fastforward = kwargs.get('fastforward') validate = kwargs.get('validate') jid = kwargs.get('jid', '*') if fastforward: print(f"fast-forwarding {fastforward} seconds") # Sort jobs dataframe based on values in time_start column, adjust indices after sorting jobs_df = jobs_df.sort_values(by='submit_time') jobs_df = jobs_df[(jobs_df.start_time.between(pd.to_datetime('2024-09-01T00:00:00'), pd.to_datetime('2024-09-16T00:00:00'), inclusive='right') | jobs_df.end_time.between(pd.to_datetime('2024-09-01T00:00:00'), pd.to_datetime('2024-09-16T00:00:00'), inclusive='right') )].copy() jobs_df = jobs_df.reset_index(drop=True) telemetry_start_timestamp = jobs_df['start_time'].min() telemetry_end_timestamp = jobs_df['end_time'].max() telemetry_start = 0 telemetry_end = int((telemetry_end_timestamp - telemetry_start_timestamp).total_seconds()) # Take earliest time as baseline reference # We can use the start time of the first job. if min_time: time_zero = min_time else: time_zero = jobs_df['submit_time'].min() num_jobs = len(jobs_df) print("time_zero:", time_zero, "num_jobs", num_jobs) jobs = [] # Map dataframe to job state. Add results to jobs list for jidx in tqdm(range(num_jobs - 1), total=num_jobs, desc="Processing Kestrel Jobs"): job_id = jobs_df.loc[jidx, 'job_id'] account = jobs_df.loc[jidx, 'account'] if not jid == '*': if int(jid) == int(job_id): print(f'Extracting {job_id} profile') else: continue nodes_required = jobs_df.loc[jidx, 'nodes_required'] name = str(uuid.uuid4())[:6] if validate: cpu_power = jobs_df.loc[jidx, 'power_per_node'] cpu_trace = cpu_power else: cpu_power = jobs_df.loc[jidx, 'power_per_node'] cpu_power_array = [600] if (pd.isna(cpu_power) or cpu_power == 0) else cpu_power.tolist() cpu_min_power = nodes_required * config['POWER_CPU_IDLE'] * config['CPUS_PER_NODE'] cpu_max_power = nodes_required * config['POWER_CPU_MAX'] * config['CPUS_PER_NODE'] cpu_util = power_to_utilization(cpu_power_array, cpu_min_power, cpu_max_power) cpu_trace = cpu_util * config['CPUS_PER_NODE'] gpu_trace = 0 # Priority sorting doesn't seem to be implemented at the moment priority = 0 wall_time = jobs_df.loc[jidx, 'wall_time'] end_state = jobs_df.loc[jidx, 'job_state'] time_submit = jobs_df.loc[jidx+1, 'submit_time'] diff = time_submit - time_zero if jid == '*': time_offset = max(diff.total_seconds(), 0) else: # When extracting out a single job, run one iteration past the end of the job time_offset = config['UI_UPDATE_FREQ'] if fastforward: time_offset -= fastforward if reschedule: # Let the scheduler reschedule the jobs scheduled_nodes = None time_offset = next_arrival(1/config['JOB_ARRIVAL_TIME']) else: # Prescribed replay scheduled_nodes = None time_offset = next_arrival(1/config['JOB_ARRIVAL_TIME']) trace_quanta = config['TRACE_QUANTA'] if cpu_trace.size > 0 and time_offset >= 0: job_info = job_dict(nodes_required = nodes_required, name = name, account = account, cpu_trace = cpu_trace, gpu_trace = gpu_trace, ntx_trace = [], nrx_trace = [], end_state = end_state, scheduled_nodes = scheduled_nodes, id = job_id, priority = priority, submit_time = time_offset, time_limit = wall_time, trace_quanta=trace_quanta) jobs.append(Job(job_info)) return jobs, telemetry_start, telemetry_end def node_index_to_name(index: int, config: dict): """ Converts an index value back to an name string based on system configuration. """ return f"node{index:04d}" def cdu_index_to_name(index: int, config: dict): return f"cdu{index:02d}" def cdu_pos(index: int, config: dict) -> tuple[int, int]: """ Return (row, col) tuple for a cdu index """ return (0, index) # TODO No newline at end of file raps/engine.py +19 −9 Original line number Diff line number Diff line Loading @@ -42,6 +42,7 @@ from raps.weather import Weather from raps.sim_config import SimConfig from raps.system_config import SystemConfig from bisect import bisect_right @dataclasses.dataclass class TickData: Loading Loading @@ -413,7 +414,6 @@ class Engine: self.power_manager.set_idle(job.scheduled_nodes) job.current_state = JobState.COMPLETED job.end_time = self.current_timestep self.running.remove(job) self.jobs_completed += 1 job_stats = job.statistics() Loading @@ -424,7 +424,7 @@ class Engine: self.resource_manager.free_nodes_from_job(job) killed_jobs = [job for job in self.running if job.start_time + job.time_limit <= self.current_timestep] job.end_time is not None and job.start_time + job.time_limit <= self.current_timestep] for job in killed_jobs: self.power_manager.set_idle(job.scheduled_nodes) Loading Loading @@ -489,7 +489,8 @@ class Engine: actively_considered_jobs: List, all_jobs: List, replay: bool, autoshutdown: bool): autoshutdown: bool, cursor: int): # 1 update running time of all running jobs # 2 update the current_timestep of the engine (this serves as reference for most computations) # 3 Check if simulation should shutdown Loading @@ -504,7 +505,7 @@ class Engine: len(self.queue) == 0 and \ len(self.running) == 0 and \ not replay and \ len(all_jobs) == 0 and \ len(all_jobs) == cursor and \ len(actively_considered_jobs) == 0: if self.debug: print(f"Simulaiton completed early: {self.config['system_name']} - " Loading Loading @@ -555,10 +556,10 @@ class Engine: job.running_time = self.current_timestep - job.start_time if job.current_state != JobState.RUNNING: raise ValueError(f"Job is in running list, but state is not RUNNING: job.state == {job.currentstate}") raise ValueError(f"Job {job.id} is in running list, but state is not RUNNING: job.state == {job.current_state}") else: # if job.state == JobState.RUNNING: # Error checks if job.running_time > job.time_limit: if job.running_time > job.time_limit and job.end_time is not None: raise Exception(f"Job exceded time limit! " f"{job.running_time} > {job.time_limit}" f"\n{job}" Loading Loading @@ -754,6 +755,9 @@ class Engine: # Process jobs in batches for better performance of timestep loop all_jobs = jobs.copy() submit_times = [j.submit_time for j in all_jobs] cursor = 0 jobs = [] # Batch Jobs into 6h windows based on submit_time or twice the time_delta if larger batch_window = max(60 * 60 * 6, 2 * time_delta) # at least 6h Loading @@ -772,8 +776,13 @@ class Engine: if (self.current_timestep % batch_window == 0) or (self.current_timestep == timestep_start): # Add jobs that are within the batching window and remove them from all jobs jobs += [job for job in all_jobs if job.submit_time <= self.current_timestep + batch_window] all_jobs[:] = [job for job in all_jobs if job.submit_time > self.current_timestep + batch_window] # jobs += [job for job in all_jobs if job.submit_time <= self.current_timestep + batch_window] # all_jobs[:] = [job for job in all_jobs if job.submit_time > self.current_timestep + batch_window] cutoff = self.current_timestep + batch_window r = bisect_right(submit_times, cutoff, lo=cursor) if r > cursor: jobs.extend(all_jobs[cursor:r]) cursor = r # 1. Prepare Timestep: completed_jobs, killed_jobs, newly_downed_nodes, need_reschedule = \ Loading Loading @@ -812,7 +821,8 @@ class Engine: simulation_done = self.complete_timestep(actively_considered_jobs=jobs, all_jobs=all_jobs, replay=replay, autoshutdown=autoshutdown) autoshutdown=autoshutdown, cursor=cursor) if simulation_done: break yield tick_data Loading raps/resmgr/default.py +2 −1 Original line number Diff line number Diff line Loading @@ -65,7 +65,8 @@ class ExclusiveNodeResourceManager: if n not in self.available_nodes: self.available_nodes.append(n) else: raise KeyError(f"node was free but already in available nodes: {n.id}") raise KeyError((f"Atempting to free node {n} after completion of job {job.id}. " + "Node is already free (in available nodes)!")) self.available_nodes = sorted(self.available_nodes) def update_system_utilization(self, current_time, running_jobs): Loading raps/schedulers/fastsim.py 0 → 100644 +163 −0 Original line number Diff line number Diff line import pandas as pd import sys import os import zmq from ..policy import PolicyType, BackfillType from raps.telemetry import Telemetry from ..job import JobState from raps.sim_config import args from raps.system_config import get_system_config # Run with this command: # python main.py --system kestrel -f ../data/fastsim_jobs_output.parquet --scheduler fastsim --policy priority --start 2024-09-01T00:00 --end 2024-09-15T00:00 class Scheduler(): """ FastSim-backed scheduler (strict lockstep via ZeroMQ). Protocol (server side is FastSim --serve): - INIT -> { init_time } - GET { t } -> { t, running_ids } (server acks t after reply) - END (on shutdown) -> { ok: true } Semantics at engine second t: - R_t := authoritative running IDs from FastSim for t - started = R_t - prev_R -> stamp start_time=t (once), assign nodes once, mark RUNNING - finished = prev_R - R_t -> stamp end_time=t (engine will finalize next tick in prepare_timestep) running list for this tick = R_t & finished (so those finishing at t remain visible for one more scheduler call; engine completes them on next second). """ def __init__(self, config, resource_manager, **kwargs): self.config = config self.policy = PolicyType(kwargs.get('policy')) self.bfpolicy = BackfillType(kwargs.get('backfill')) self.debug = bool(kwargs.get('debug', False)) # ---- ZeroMQ client ---- self.endpoint = kwargs.get('plugin_endpoint', 'ipc:///tmp/fastsim.sock') self._ctx = zmq.Context.instance() self._sock = self._ctx.socket(zmq.REQ) self._sock.setsockopt(zmq.LINGER, 0) self._sock.connect(self.endpoint) # INIT handshake: fetch FastSim's init_time (ISO string). self.init_time_iso = self._rpc('INIT').get('init_time') self.resource_manager = resource_manager # Job metadata: id -> Job self.jobids_to_jobs = {} self.allocated_jobs = set() # job_ids we have assigned nodes for self.prev_running_ids = set() # R_{t-1} # Build the Job objects from RAPS Telemetry (needed so ExaDigiT subsystems have objects) args_dict = vars(args) config = get_system_config(args.system).get_legacy() args_dict['config'] = config td = Telemetry(**args_dict) print("...Now loading jobs to FastSim scheduler.") jobs, _, _ = td.load_data(args.replay) for job in jobs: self.jobids_to_jobs[job.id] = job if self.debug: print(f"[RAPS-FastSim] Connected to {self.endpoint}; init_time={self.init_time_iso}", file=sys.stderr) def _rpc(self, op, **payload): """Send a JSON request and return the JSON reply (dict).""" try: msg = {'op': op} msg.update(payload) self._sock.send_json(msg) rep = self._sock.recv_json() except Exception as e: raise RuntimeError(f"[RAPS-FastSim] RPC {op} failed: {e}") from e if isinstance(rep, dict) and 'error' in rep: raise RuntimeError(f"[RAPS-FastSim] RPC {op} error: {rep['error']}") return rep def _fastsim_running_ids(self, t: int): """Blocking call: get authoritative running job IDs for second t.""" rep = self._rpc('GET', t=int(t)) rids = rep.get('running_ids', []) return set(rids) def schedule(self, queue=None, running=None, current_time=None, accounts=None, sorted=False): """ Called by Engine when RAPS detects an event. """ running = running if running is not None else [] t = int(current_time) # Get authoritative running set for second t (blocks until available) R_t = self._fastsim_running_ids(t) # Diff vs previous second started_ids = R_t - self.prev_running_ids finished_ids = self.prev_running_ids - R_t # these end at t; engine finalizes next tick # Handle starts: stamp start_time, assign nodes, mark RUNNING for jid in started_ids: job = self.jobids_to_jobs.get(jid) if job is None: if self.debug: print(f"[RAPS-FastSim][WARN] Unknown job id from FastSim: {jid}", file=sys.stderr) continue # Assign nodes exactly once if jid not in self.allocated_jobs: self.resource_manager.assign_nodes_to_job(job, t, self.policy) self.allocated_jobs.add(jid) # FastSim is authoritative job.start_time = t # IMPORTANT: prevent premature completion by RM’s default behavior job.end_time = None # Prevents RAPS from removing job job.state = JobState.RUNNING # Handle finishes: stamp end_time=t (engine.prepare_timestep next tick completes) running.clear() for jid in finished_ids: job = self.jobids_to_jobs.get(jid) if job is not None: # overwrite any prior value; FastSim is the source of truth # job.end_time = t if job.start_time is not None: observed = t - job.start_time if (job.time_limit is None) or (job.time_limit < observed): # This is necessary since RAPS is handling finishing jobs, but schedule is not always # called at every tick, even though the job may have finished in FastSim during that tick. # TODO: Deal with this, because it messes up the end time of some jobs. # print(f"Extending {job.id} runtime {job.time_limit} to match observed {observed} at finish.") job.time_limit = observed # print((f"Job {job.id} is finished, start time: {job.start_time}, wall time: {job.time_limit}," # f"end time: {job.end_time}, at time {t}. With nodes {job.scheduled_nodes}.")) job.end_time = t job.time_limit = t - job.start_time running.append(job) # Running list reflects exactly FastSim’s R_t for jid in R_t: job = self.jobids_to_jobs.get(jid) if job is not None: # defensively ensure state isn’t stuck at COMPLETED if job.state != JobState.RUNNING: job.state = JobState.RUNNING running.append(job) # Update prev self.prev_running_ids = R_t def end_sim(self): # Ask server to stop try: self._rpc('END') except Exception: pass No newline at end of file Loading
config/kestrel.yaml 0 → 100644 +53 −0 Original line number Diff line number Diff line system: num_cdus: 6 racks_per_cdu: 6 nodes_per_rack: 80 rectifiers_per_rack: 6 chassis_per_rack: 1 nodes_per_blade: 1 switches_per_chassis: 5 nics_per_node: 2 rectifiers_per_chassis: 5 nodes_per_rectifier: 4 missing_racks: [] down_nodes: [] cpus_per_node: 1 gpus_per_node: 4 cpu_peak_flops: 396800000000.0 gpu_peak_flops: 7800000000000.0 cpu_fp_ratio: 0.69 gpu_fp_ratio: 0.69 power: power_gpu_idle: 75 power_gpu_max: 300 power_cpu_idle: 100 power_cpu_max: 800 power_mem: 74.26 power_nic: 21 power_nvme: 45 power_switch: 250 power_cdu: 0 power_update_freq: 20 rectifier_peak_threshold: 13670 sivoc_loss_constant: 0 sivoc_efficiency: 1 rectifier_loss_constant: 0 rectifier_efficiency: 1 power_cost: 0.094 scheduler: seed: 42 job_arrival_time: 20 mtbf: 11 trace_quanta: 20 min_wall_time: 3600 max_wall_time: 43200 ui_update_freq: 3600 max_nodes_per_job: 3000 job_end_probs: COMPLETED: 0.63 FAILED: 0.13 CANCELLED: 0.12 TIMEOUT: 0.11 NODE_FAIL: 0.01 No newline at end of file
raps/dataloaders/kestrel.py 0 → 100644 +170 −0 Original line number Diff line number Diff line """ Load data for NREL's Kestrel cluster. """ import uuid import pandas as pd from tqdm import tqdm from ..job import job_dict, Job from ..utils import power_to_utilization, next_arrival def load_data(jobs_path, **kwargs): """ Reads job and job profile data from parquet files and parses them. Parameters ---------- jobs_path : str The path to the jobs parquet file. Returns ------- list The list of parsed jobs. """ jobs_df = pd.read_parquet(jobs_path, engine='pyarrow') return load_data_from_df(jobs_df, **kwargs) def load_data_from_df(jobs_df: pd.DataFrame, **kwargs): """ Reads job and job profile data from parquet files and parses them. Requires the following fields in the DataFrame: - start_time (timestamp): Time execution begins (actual or expected) - job_id (int): Job ID - node_power_consumption (List[int]): Power consumption of the job, recorded at Node level - nodes_required (int): Number of nodes allocated to the job - cpu_power_consumption (List[int]): Power consumption of the job, recorded at CPU level (don't have this) - mem_power_consumption (List[int]): Power consumption of the job, recorded at Memory level (don't have this) - priority (int): Relative priority of the job, 0=held, 1=required nodes DOWN/DRAINED - job_state (string): State of the job, see enum job_states for possible values - wall_time (int): Actual runtime of job, in seconds - nodes (string): List of nodes allocated to job Returns ------- list The list of parsed jobs. """ config = kwargs.get('config') min_time = kwargs.get('min_time', None) reschedule = kwargs.get('reschedule') fastforward = kwargs.get('fastforward') validate = kwargs.get('validate') jid = kwargs.get('jid', '*') if fastforward: print(f"fast-forwarding {fastforward} seconds") # Sort jobs dataframe based on values in time_start column, adjust indices after sorting jobs_df = jobs_df.sort_values(by='submit_time') jobs_df = jobs_df[(jobs_df.start_time.between(pd.to_datetime('2024-09-01T00:00:00'), pd.to_datetime('2024-09-16T00:00:00'), inclusive='right') | jobs_df.end_time.between(pd.to_datetime('2024-09-01T00:00:00'), pd.to_datetime('2024-09-16T00:00:00'), inclusive='right') )].copy() jobs_df = jobs_df.reset_index(drop=True) telemetry_start_timestamp = jobs_df['start_time'].min() telemetry_end_timestamp = jobs_df['end_time'].max() telemetry_start = 0 telemetry_end = int((telemetry_end_timestamp - telemetry_start_timestamp).total_seconds()) # Take earliest time as baseline reference # We can use the start time of the first job. if min_time: time_zero = min_time else: time_zero = jobs_df['submit_time'].min() num_jobs = len(jobs_df) print("time_zero:", time_zero, "num_jobs", num_jobs) jobs = [] # Map dataframe to job state. Add results to jobs list for jidx in tqdm(range(num_jobs - 1), total=num_jobs, desc="Processing Kestrel Jobs"): job_id = jobs_df.loc[jidx, 'job_id'] account = jobs_df.loc[jidx, 'account'] if not jid == '*': if int(jid) == int(job_id): print(f'Extracting {job_id} profile') else: continue nodes_required = jobs_df.loc[jidx, 'nodes_required'] name = str(uuid.uuid4())[:6] if validate: cpu_power = jobs_df.loc[jidx, 'power_per_node'] cpu_trace = cpu_power else: cpu_power = jobs_df.loc[jidx, 'power_per_node'] cpu_power_array = [600] if (pd.isna(cpu_power) or cpu_power == 0) else cpu_power.tolist() cpu_min_power = nodes_required * config['POWER_CPU_IDLE'] * config['CPUS_PER_NODE'] cpu_max_power = nodes_required * config['POWER_CPU_MAX'] * config['CPUS_PER_NODE'] cpu_util = power_to_utilization(cpu_power_array, cpu_min_power, cpu_max_power) cpu_trace = cpu_util * config['CPUS_PER_NODE'] gpu_trace = 0 # Priority sorting doesn't seem to be implemented at the moment priority = 0 wall_time = jobs_df.loc[jidx, 'wall_time'] end_state = jobs_df.loc[jidx, 'job_state'] time_submit = jobs_df.loc[jidx+1, 'submit_time'] diff = time_submit - time_zero if jid == '*': time_offset = max(diff.total_seconds(), 0) else: # When extracting out a single job, run one iteration past the end of the job time_offset = config['UI_UPDATE_FREQ'] if fastforward: time_offset -= fastforward if reschedule: # Let the scheduler reschedule the jobs scheduled_nodes = None time_offset = next_arrival(1/config['JOB_ARRIVAL_TIME']) else: # Prescribed replay scheduled_nodes = None time_offset = next_arrival(1/config['JOB_ARRIVAL_TIME']) trace_quanta = config['TRACE_QUANTA'] if cpu_trace.size > 0 and time_offset >= 0: job_info = job_dict(nodes_required = nodes_required, name = name, account = account, cpu_trace = cpu_trace, gpu_trace = gpu_trace, ntx_trace = [], nrx_trace = [], end_state = end_state, scheduled_nodes = scheduled_nodes, id = job_id, priority = priority, submit_time = time_offset, time_limit = wall_time, trace_quanta=trace_quanta) jobs.append(Job(job_info)) return jobs, telemetry_start, telemetry_end def node_index_to_name(index: int, config: dict): """ Converts an index value back to an name string based on system configuration. """ return f"node{index:04d}" def cdu_index_to_name(index: int, config: dict): return f"cdu{index:02d}" def cdu_pos(index: int, config: dict) -> tuple[int, int]: """ Return (row, col) tuple for a cdu index """ return (0, index) # TODO No newline at end of file
raps/engine.py +19 −9 Original line number Diff line number Diff line Loading @@ -42,6 +42,7 @@ from raps.weather import Weather from raps.sim_config import SimConfig from raps.system_config import SystemConfig from bisect import bisect_right @dataclasses.dataclass class TickData: Loading Loading @@ -413,7 +414,6 @@ class Engine: self.power_manager.set_idle(job.scheduled_nodes) job.current_state = JobState.COMPLETED job.end_time = self.current_timestep self.running.remove(job) self.jobs_completed += 1 job_stats = job.statistics() Loading @@ -424,7 +424,7 @@ class Engine: self.resource_manager.free_nodes_from_job(job) killed_jobs = [job for job in self.running if job.start_time + job.time_limit <= self.current_timestep] job.end_time is not None and job.start_time + job.time_limit <= self.current_timestep] for job in killed_jobs: self.power_manager.set_idle(job.scheduled_nodes) Loading Loading @@ -489,7 +489,8 @@ class Engine: actively_considered_jobs: List, all_jobs: List, replay: bool, autoshutdown: bool): autoshutdown: bool, cursor: int): # 1 update running time of all running jobs # 2 update the current_timestep of the engine (this serves as reference for most computations) # 3 Check if simulation should shutdown Loading @@ -504,7 +505,7 @@ class Engine: len(self.queue) == 0 and \ len(self.running) == 0 and \ not replay and \ len(all_jobs) == 0 and \ len(all_jobs) == cursor and \ len(actively_considered_jobs) == 0: if self.debug: print(f"Simulaiton completed early: {self.config['system_name']} - " Loading Loading @@ -555,10 +556,10 @@ class Engine: job.running_time = self.current_timestep - job.start_time if job.current_state != JobState.RUNNING: raise ValueError(f"Job is in running list, but state is not RUNNING: job.state == {job.currentstate}") raise ValueError(f"Job {job.id} is in running list, but state is not RUNNING: job.state == {job.current_state}") else: # if job.state == JobState.RUNNING: # Error checks if job.running_time > job.time_limit: if job.running_time > job.time_limit and job.end_time is not None: raise Exception(f"Job exceded time limit! " f"{job.running_time} > {job.time_limit}" f"\n{job}" Loading Loading @@ -754,6 +755,9 @@ class Engine: # Process jobs in batches for better performance of timestep loop all_jobs = jobs.copy() submit_times = [j.submit_time for j in all_jobs] cursor = 0 jobs = [] # Batch Jobs into 6h windows based on submit_time or twice the time_delta if larger batch_window = max(60 * 60 * 6, 2 * time_delta) # at least 6h Loading @@ -772,8 +776,13 @@ class Engine: if (self.current_timestep % batch_window == 0) or (self.current_timestep == timestep_start): # Add jobs that are within the batching window and remove them from all jobs jobs += [job for job in all_jobs if job.submit_time <= self.current_timestep + batch_window] all_jobs[:] = [job for job in all_jobs if job.submit_time > self.current_timestep + batch_window] # jobs += [job for job in all_jobs if job.submit_time <= self.current_timestep + batch_window] # all_jobs[:] = [job for job in all_jobs if job.submit_time > self.current_timestep + batch_window] cutoff = self.current_timestep + batch_window r = bisect_right(submit_times, cutoff, lo=cursor) if r > cursor: jobs.extend(all_jobs[cursor:r]) cursor = r # 1. Prepare Timestep: completed_jobs, killed_jobs, newly_downed_nodes, need_reschedule = \ Loading Loading @@ -812,7 +821,8 @@ class Engine: simulation_done = self.complete_timestep(actively_considered_jobs=jobs, all_jobs=all_jobs, replay=replay, autoshutdown=autoshutdown) autoshutdown=autoshutdown, cursor=cursor) if simulation_done: break yield tick_data Loading
raps/resmgr/default.py +2 −1 Original line number Diff line number Diff line Loading @@ -65,7 +65,8 @@ class ExclusiveNodeResourceManager: if n not in self.available_nodes: self.available_nodes.append(n) else: raise KeyError(f"node was free but already in available nodes: {n.id}") raise KeyError((f"Atempting to free node {n} after completion of job {job.id}. " + "Node is already free (in available nodes)!")) self.available_nodes = sorted(self.available_nodes) def update_system_utilization(self, current_time, running_jobs): Loading
raps/schedulers/fastsim.py 0 → 100644 +163 −0 Original line number Diff line number Diff line import pandas as pd import sys import os import zmq from ..policy import PolicyType, BackfillType from raps.telemetry import Telemetry from ..job import JobState from raps.sim_config import args from raps.system_config import get_system_config # Run with this command: # python main.py --system kestrel -f ../data/fastsim_jobs_output.parquet --scheduler fastsim --policy priority --start 2024-09-01T00:00 --end 2024-09-15T00:00 class Scheduler(): """ FastSim-backed scheduler (strict lockstep via ZeroMQ). Protocol (server side is FastSim --serve): - INIT -> { init_time } - GET { t } -> { t, running_ids } (server acks t after reply) - END (on shutdown) -> { ok: true } Semantics at engine second t: - R_t := authoritative running IDs from FastSim for t - started = R_t - prev_R -> stamp start_time=t (once), assign nodes once, mark RUNNING - finished = prev_R - R_t -> stamp end_time=t (engine will finalize next tick in prepare_timestep) running list for this tick = R_t & finished (so those finishing at t remain visible for one more scheduler call; engine completes them on next second). """ def __init__(self, config, resource_manager, **kwargs): self.config = config self.policy = PolicyType(kwargs.get('policy')) self.bfpolicy = BackfillType(kwargs.get('backfill')) self.debug = bool(kwargs.get('debug', False)) # ---- ZeroMQ client ---- self.endpoint = kwargs.get('plugin_endpoint', 'ipc:///tmp/fastsim.sock') self._ctx = zmq.Context.instance() self._sock = self._ctx.socket(zmq.REQ) self._sock.setsockopt(zmq.LINGER, 0) self._sock.connect(self.endpoint) # INIT handshake: fetch FastSim's init_time (ISO string). self.init_time_iso = self._rpc('INIT').get('init_time') self.resource_manager = resource_manager # Job metadata: id -> Job self.jobids_to_jobs = {} self.allocated_jobs = set() # job_ids we have assigned nodes for self.prev_running_ids = set() # R_{t-1} # Build the Job objects from RAPS Telemetry (needed so ExaDigiT subsystems have objects) args_dict = vars(args) config = get_system_config(args.system).get_legacy() args_dict['config'] = config td = Telemetry(**args_dict) print("...Now loading jobs to FastSim scheduler.") jobs, _, _ = td.load_data(args.replay) for job in jobs: self.jobids_to_jobs[job.id] = job if self.debug: print(f"[RAPS-FastSim] Connected to {self.endpoint}; init_time={self.init_time_iso}", file=sys.stderr) def _rpc(self, op, **payload): """Send a JSON request and return the JSON reply (dict).""" try: msg = {'op': op} msg.update(payload) self._sock.send_json(msg) rep = self._sock.recv_json() except Exception as e: raise RuntimeError(f"[RAPS-FastSim] RPC {op} failed: {e}") from e if isinstance(rep, dict) and 'error' in rep: raise RuntimeError(f"[RAPS-FastSim] RPC {op} error: {rep['error']}") return rep def _fastsim_running_ids(self, t: int): """Blocking call: get authoritative running job IDs for second t.""" rep = self._rpc('GET', t=int(t)) rids = rep.get('running_ids', []) return set(rids) def schedule(self, queue=None, running=None, current_time=None, accounts=None, sorted=False): """ Called by Engine when RAPS detects an event. """ running = running if running is not None else [] t = int(current_time) # Get authoritative running set for second t (blocks until available) R_t = self._fastsim_running_ids(t) # Diff vs previous second started_ids = R_t - self.prev_running_ids finished_ids = self.prev_running_ids - R_t # these end at t; engine finalizes next tick # Handle starts: stamp start_time, assign nodes, mark RUNNING for jid in started_ids: job = self.jobids_to_jobs.get(jid) if job is None: if self.debug: print(f"[RAPS-FastSim][WARN] Unknown job id from FastSim: {jid}", file=sys.stderr) continue # Assign nodes exactly once if jid not in self.allocated_jobs: self.resource_manager.assign_nodes_to_job(job, t, self.policy) self.allocated_jobs.add(jid) # FastSim is authoritative job.start_time = t # IMPORTANT: prevent premature completion by RM’s default behavior job.end_time = None # Prevents RAPS from removing job job.state = JobState.RUNNING # Handle finishes: stamp end_time=t (engine.prepare_timestep next tick completes) running.clear() for jid in finished_ids: job = self.jobids_to_jobs.get(jid) if job is not None: # overwrite any prior value; FastSim is the source of truth # job.end_time = t if job.start_time is not None: observed = t - job.start_time if (job.time_limit is None) or (job.time_limit < observed): # This is necessary since RAPS is handling finishing jobs, but schedule is not always # called at every tick, even though the job may have finished in FastSim during that tick. # TODO: Deal with this, because it messes up the end time of some jobs. # print(f"Extending {job.id} runtime {job.time_limit} to match observed {observed} at finish.") job.time_limit = observed # print((f"Job {job.id} is finished, start time: {job.start_time}, wall time: {job.time_limit}," # f"end time: {job.end_time}, at time {t}. With nodes {job.scheduled_nodes}.")) job.end_time = t job.time_limit = t - job.start_time running.append(job) # Running list reflects exactly FastSim’s R_t for jid in R_t: job = self.jobids_to_jobs.get(jid) if job is not None: # defensively ensure state isn’t stuck at COMPLETED if job.state != JobState.RUNNING: job.state = JobState.RUNNING running.append(job) # Update prev self.prev_running_ids = R_t def end_sim(self): # Ask server to stop try: self._rpc('END') except Exception: pass No newline at end of file