From eba533ba61e896a7e18bd4944ceece941edac331 Mon Sep 17 00:00:00 2001 From: kevinmenear Date: Fri, 22 Aug 2025 10:43:24 -0400 Subject: [PATCH 1/3] =?UTF-8?q?Add=20ZeroMQ-based=20FastSim=E2=80=93RAPS?= =?UTF-8?q?=20integration=20with=20strict=20lockstep?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- raps/dataloaders/kestrel.py | 162 +++++++++++++++++++++++++++++++++++ raps/engine.py | 24 ++++-- raps/resmgr/default.py | 3 +- raps/schedulers/fastsim.py | 163 ++++++++++++++++++++++++++++++++++++ 4 files changed, 344 insertions(+), 8 deletions(-) create mode 100644 raps/dataloaders/kestrel.py create mode 100644 raps/schedulers/fastsim.py diff --git a/raps/dataloaders/kestrel.py b/raps/dataloaders/kestrel.py new file mode 100644 index 0000000..cb40d55 --- /dev/null +++ b/raps/dataloaders/kestrel.py @@ -0,0 +1,162 @@ +""" + Load data for NREL's Kestrel cluster. +""" +import uuid +import pandas as pd +from tqdm import tqdm + +from ..job import job_dict, Job +from ..utils import power_to_utilization, next_arrival + + +def load_data(jobs_path, **kwargs): + """ + Reads job and job profile data from parquet files and parses them. + + Parameters + ---------- + jobs_path : str + The path to the jobs parquet file. + + Returns + ------- + list + The list of parsed jobs. + """ + jobs_df = pd.read_parquet(jobs_path, engine='pyarrow') + return load_data_from_df(jobs_df, **kwargs) + + +def load_data_from_df(jobs_df: pd.DataFrame, **kwargs): + """ + Reads job and job profile data from parquet files and parses them. + + Requires the following fields in the DataFrame: + - start_time (timestamp): Time execution begins (actual or expected) + - job_id (int): Job ID + - node_power_consumption (List[int]): Power consumption of the job, recorded at Node level + - nodes_required (int): Number of nodes allocated to the job + - cpu_power_consumption (List[int]): Power consumption of the job, recorded at CPU level (don't have this) + - mem_power_consumption (List[int]): Power consumption of the job, recorded at Memory level (don't have this) + - priority (int): Relative priority of the job, 0=held, 1=required nodes DOWN/DRAINED + - job_state (string): State of the job, see enum job_states for possible values + - wall_time (int): Actual runtime of job, in seconds + - nodes (string): List of nodes allocated to job + + Returns + ------- + list + The list of parsed jobs. + """ + config = kwargs.get('config') + min_time = kwargs.get('min_time', None) + reschedule = kwargs.get('reschedule') + fastforward = kwargs.get('fastforward') + validate = kwargs.get('validate') + jid = kwargs.get('jid', '*') + + if fastforward: print(f"fast-forwarding {fastforward} seconds") + + # Sort jobs dataframe based on values in time_start column, adjust indices after sorting + jobs_df = jobs_df.sort_values(by='submit_time') + jobs_df = jobs_df.reset_index(drop=True) + + telemetry_start_timestamp = jobs_df['submit_time'].min() + telemetry_end_timestamp = jobs_df['submit_time'].max() + telemetry_start = 0 + telemetry_end = int((telemetry_end_timestamp - telemetry_start_timestamp).total_seconds()) + + # Take earliest time as baseline reference + # We can use the start time of the first job. + if min_time: + time_zero = min_time + else: + time_zero = jobs_df['submit_time'].min() + + num_jobs = len(jobs_df) + print("time_zero:", time_zero, "num_jobs", num_jobs) + + jobs = [] + + # Map dataframe to job state. Add results to jobs list + for jidx in tqdm(range(num_jobs - 1), total=num_jobs, desc="Processing Kestrel Jobs"): + + job_id = jobs_df.loc[jidx, 'job_id'] + account = jobs_df.loc[jidx, 'account'] + + if not jid == '*': + if int(jid) == int(job_id): + print(f'Extracting {job_id} profile') + else: + continue + nodes_required = jobs_df.loc[jidx, 'nodes_required'] + + name = str(uuid.uuid4())[:6] + + if validate: + cpu_power = jobs_df.loc[jidx, 'power_per_node'] + cpu_trace = cpu_power + + else: + cpu_power = jobs_df.loc[jidx, 'power_per_node'] + cpu_power_array = cpu_power.tolist() + cpu_min_power = nodes_required * config['POWER_CPU_IDLE'] * config['CPUS_PER_NODE'] + cpu_max_power = nodes_required * config['POWER_CPU_MAX'] * config['CPUS_PER_NODE'] + cpu_util = power_to_utilization(cpu_power_array, cpu_min_power, cpu_max_power) + cpu_trace = cpu_util * config['CPUS_PER_NODE'] + gpu_trace = 0 + + # Priority sorting doesn't seem to be implemented at the moment + priority = 0 + + wall_time = jobs_df.loc[jidx, 'wall_time'] + end_state = jobs_df.loc[jidx, 'job_state'] + time_submit = jobs_df.loc[jidx+1, 'submit_time'] + diff = time_submit - time_zero + + if jid == '*': + time_offset = max(diff.total_seconds(), 0) + else: + # When extracting out a single job, run one iteration past the end of the job + time_offset = config['UI_UPDATE_FREQ'] + + if fastforward: time_offset -= fastforward + + if reschedule: # Let the scheduler reschedule the jobs + scheduled_nodes = None + time_offset = next_arrival(1/config['JOB_ARRIVAL_TIME']) + else: # Prescribed replay + scheduled_nodes = None + time_offset = next_arrival(1/config['JOB_ARRIVAL_TIME']) + + if cpu_trace.size > 0 and time_offset >= 0: + job_info = job_dict(nodes_required = nodes_required, + name = name, + account = account, + cpu_trace = cpu_trace, + gpu_trace = gpu_trace, + ntx_trace = [], + nrx_trace = [], + end_state = end_state, + scheduled_nodes = scheduled_nodes, + id = job_id, + priority = priority, + submit_time = time_offset, + wall_time = wall_time) + jobs.append(Job(job_info)) + + return jobs, telemetry_start, telemetry_end + + +def node_index_to_name(index: int, config: dict): + """ Converts an index value back to an name string based on system configuration. """ + return f"node{index:04d}" + + +def cdu_index_to_name(index: int, config: dict): + return f"cdu{index:02d}" + + +def cdu_pos(index: int, config: dict) -> tuple[int, int]: + """ Return (row, col) tuple for a cdu index """ + return (0, index) # TODO \ No newline at end of file diff --git a/raps/engine.py b/raps/engine.py index f79b140..d247cdc 100644 --- a/raps/engine.py +++ b/raps/engine.py @@ -26,6 +26,7 @@ from raps.network import ( from raps.workload import continuous_job_generation from raps.downtime import Downtime +from bisect import bisect_right @dataclasses.dataclass class TickData: @@ -258,7 +259,6 @@ class Engine: self.power_manager.set_idle(job.scheduled_nodes) job.current_state = JobState.COMPLETED job.end_time = self.current_timestep - self.running.remove(job) self.jobs_completed += 1 job_stats = job.statistics() @@ -334,7 +334,8 @@ class Engine: actively_considered_jobs: List, all_jobs: List, replay: bool, - autoshutdown: bool): + autoshutdown: bool, + cursor: int): # 1 update running time of all running jobs # 2 update the current_timestep of the engine (this serves as reference for most computations) # 3 Check if simulation should shutdown @@ -349,7 +350,7 @@ class Engine: len(self.queue) == 0 and \ len(self.running) == 0 and \ not replay and \ - len(all_jobs) == 0 and \ + len(all_jobs) == cursor and \ len(actively_considered_jobs) == 0: if self.debug: print(f"Simulaiton completed early: {self.config['system_name']} - " @@ -403,7 +404,7 @@ class Engine: raise ValueError(f"Job is in running list, but state is not RUNNING: job.state == {job.currentstate}") else: # if job.state == JobState.RUNNING: # Error checks - if job.running_time > job.time_limit: + if job.running_time > job.time_limit and job.end_time is not None: raise Exception(f"Job exceded time limit! " f"{job.running_time} > {job.time_limit}" f"\n{job}" @@ -599,6 +600,9 @@ class Engine: # Process jobs in batches for better performance of timestep loop all_jobs = jobs.copy() + submit_times = [j.submit_time for j in all_jobs] + cursor = 0 + jobs = [] # Batch Jobs into 6h windows based on submit_time or twice the time_delta if larger batch_window = max(60 * 60 * 6, 2 * time_delta) # at least 6h @@ -617,8 +621,13 @@ class Engine: if (self.current_timestep % batch_window == 0) or (self.current_timestep == timestep_start): # Add jobs that are within the batching window and remove them from all jobs - jobs += [job for job in all_jobs if job.submit_time <= self.current_timestep + batch_window] - all_jobs[:] = [job for job in all_jobs if job.submit_time > self.current_timestep + batch_window] + # jobs += [job for job in all_jobs if job.submit_time <= self.current_timestep + batch_window] + # all_jobs[:] = [job for job in all_jobs if job.submit_time > self.current_timestep + batch_window] + cutoff = self.current_timestep + batch_window + r = bisect_right(submit_times, cutoff, lo=cursor) + if r > cursor: + jobs.extend(all_jobs[cursor:r]) + cursor = r # 1. Prepare Timestep: completed_jobs, killed_jobs, newly_downed_nodes, need_reschedule = \ @@ -657,7 +666,8 @@ class Engine: simulation_done = self.complete_timestep(actively_considered_jobs=jobs, all_jobs=all_jobs, replay=replay, - autoshutdown=autoshutdown) + autoshutdown=autoshutdown, + cursor=cursor) if simulation_done: break yield tick_data diff --git a/raps/resmgr/default.py b/raps/resmgr/default.py index c7791f5..ab1e774 100644 --- a/raps/resmgr/default.py +++ b/raps/resmgr/default.py @@ -65,7 +65,8 @@ class ExclusiveNodeResourceManager: if n not in self.available_nodes: self.available_nodes.append(n) else: - raise KeyError(f"node was free but already in available nodes: {n.id}") + raise KeyError((f"Atempting to free node {n} after completion of job {job.id}. " + + "Node is already free (in available nodes)!")) self.available_nodes = sorted(self.available_nodes) def update_system_utilization(self, current_time, running_jobs): diff --git a/raps/schedulers/fastsim.py b/raps/schedulers/fastsim.py new file mode 100644 index 0000000..eb5b263 --- /dev/null +++ b/raps/schedulers/fastsim.py @@ -0,0 +1,163 @@ +import pandas as pd +import sys +import os +import zmq + +from ..policy import PolicyType, BackfillType +from raps.telemetry import Telemetry +from ..job import Job, JobState +from ..args import args +from raps.config import ConfigManager + +# Run with this command: +# python main.py --system kestrel -f ../../data/fastsim_jobs_output.parquet --scheduler fastsim --policy priority --start 2024-09-01T00:00 --end 2024-09-15T00:00 + +class Scheduler(): + """ + FastSim-backed scheduler (strict lockstep via ZeroMQ). + + Protocol (server side is FastSim --serve): + - INIT -> { init_time } + - GET { t } -> { t, running_ids } (server acks t after reply) + - END (on shutdown) -> { ok: true } + + Semantics at engine second t: + - R_t := authoritative running IDs from FastSim for t + - started = R_t - prev_R + -> stamp start_time=t (once), assign nodes once, mark RUNNING + - finished = prev_R - R_t + -> stamp end_time=t (engine will finalize next tick in prepare_timestep) + + running list for this tick = R_t & finished (so those finishing at t remain + visible for one more scheduler call; engine completes them on next second). + """ + + def __init__(self, config, resource_manager, **kwargs): + self.config = config + self.policy = PolicyType(kwargs.get('policy')) + self.bfpolicy = BackfillType(kwargs.get('backfill')) + self.debug = bool(kwargs.get('debug', False)) + + # ---- ZeroMQ client ---- + self.endpoint = kwargs.get('plugin_endpoint', 'ipc:///tmp/fastsim.sock') + self._ctx = zmq.Context.instance() + self._sock = self._ctx.socket(zmq.REQ) + self._sock.setsockopt(zmq.LINGER, 0) + self._sock.connect(self.endpoint) + + # INIT handshake: fetch FastSim's init_time (ISO string). + self.init_time_iso = self._rpc('INIT').get('init_time') + + self.resource_manager = resource_manager + + # Job metadata: id -> Job + self.jobids_to_jobs = {} + self.allocated_jobs = set() # job_ids we have assigned nodes for + self.prev_running_ids = set() # R_{t-1} + + # Build the Job objects from RAPS Telemetry (needed so ExaDigiT subsystems have objects) + args_dict = vars(args) + cfg = ConfigManager(system_name=args.system).get_config() + args_dict['config'] = cfg + td = Telemetry(**args_dict) + + print("...Now loading jobs to FastSim scheduler.") + jobs, _, _ = td.load_data(args.replay) + for job in jobs: + self.jobids_to_jobs[job.id] = job + + if self.debug: + print(f"[RAPS-FastSim] Connected to {self.endpoint}; init_time={self.init_time_iso}", file=sys.stderr) + + def _rpc(self, op, **payload): + """Send a JSON request and return the JSON reply (dict).""" + try: + msg = {'op': op} + msg.update(payload) + self._sock.send_json(msg) + rep = self._sock.recv_json() + except Exception as e: + raise RuntimeError(f"[RAPS-FastSim] RPC {op} failed: {e}") from e + if isinstance(rep, dict) and 'error' in rep: + raise RuntimeError(f"[RAPS-FastSim] RPC {op} error: {rep['error']}") + return rep + + def _fastsim_running_ids(self, t: int): + """Blocking call: get authoritative running job IDs for second t.""" + rep = self._rpc('GET', t=int(t)) + rids = rep.get('running_ids', []) + return set(rids) + + def schedule(self, queue=None, running=None, current_time=None, accounts=None, sorted=False): + """ + Called by Engine when RAPS detects an event. + """ + running = running if running is not None else [] + + t = int(current_time) + + # Get authoritative running set for second t (blocks until available) + R_t = self._fastsim_running_ids(t) + + # Diff vs previous second + started_ids = R_t - self.prev_running_ids + finished_ids = self.prev_running_ids - R_t # these end at t; engine finalizes next tick + + # Handle starts: stamp start_time, assign nodes, mark RUNNING + for jid in started_ids: + job = self.jobids_to_jobs.get(jid) + if job is None: + if self.debug: + print(f"[RAPS-FastSim][WARN] Unknown job id from FastSim: {jid}", file=sys.stderr) + continue + + # Assign nodes exactly once + if jid not in self.allocated_jobs: + self.resource_manager.assign_nodes_to_job(job, t, self.policy) + self.allocated_jobs.add(jid) + + # FastSim is authoritative + job.start_time = t + # IMPORTANT: prevent premature completion by RM’s default behavior + job.end_time = None # Prevents RAPS from removing job + job.state = JobState.RUNNING + + # Handle finishes: stamp end_time=t (engine.prepare_timestep next tick completes) + running.clear() + for jid in finished_ids: + job = self.jobids_to_jobs.get(jid) + if job is not None: + # overwrite any prior value; FastSim is the source of truth + # job.end_time = t + if job.start_time is not None: + observed = t - job.start_time + if (job.wall_time is None) or (job.wall_time < observed): + # This is necessary since RAPS is handling finishing jobs, but schedule is not always + # called at every tick, even though the job may have finished in FastSim during that tick. + # TODO: Deal with this, because it messes up the end time of some jobs. + # print(f"Extending {job.id} runtime {job.wall_time} to match observed {observed} at finish.") + job.wall_time = observed + # print((f"Job {job.id} is finished, start time: {job.start_time}, wall time: {job.wall_time}," + # f"end time: {job.end_time}, at time {t}. With nodes {job.scheduled_nodes}.")) + job.end_time = t + job.wall_time = t - job.start_time + running.append(job) + + # Running list reflects exactly FastSim’s R_t + for jid in R_t: + job = self.jobids_to_jobs.get(jid) + if job is not None: + # defensively ensure state isn’t stuck at COMPLETED + if job.state != JobState.RUNNING: + job.state = JobState.RUNNING + running.append(job) + + # Update prev + self.prev_running_ids = R_t + + def end_sim(self): + # Ask server to stop + try: + self._rpc('END') + except Exception: + pass \ No newline at end of file -- GitLab From 1c0c7e2335b649e1704a053fbeffa24729169c24 Mon Sep 17 00:00:00 2001 From: kevinmenear Date: Thu, 28 Aug 2025 14:02:17 -0400 Subject: [PATCH 2/3] Change max queue jobs displayed in UI --- raps/dataloaders/kestrel.py | 16 ++++++++++++---- raps/schedulers/fastsim.py | 2 +- raps/ui.py | 3 ++- 3 files changed, 15 insertions(+), 6 deletions(-) diff --git a/raps/dataloaders/kestrel.py b/raps/dataloaders/kestrel.py index cb40d55..8d6ef82 100644 --- a/raps/dataloaders/kestrel.py +++ b/raps/dataloaders/kestrel.py @@ -59,10 +59,15 @@ def load_data_from_df(jobs_df: pd.DataFrame, **kwargs): # Sort jobs dataframe based on values in time_start column, adjust indices after sorting jobs_df = jobs_df.sort_values(by='submit_time') + jobs_df = jobs_df[(jobs_df.start_time.between(pd.to_datetime('2024-09-01T00:00:00'), + pd.to_datetime('2024-09-16T00:00:00'), inclusive='right') | + jobs_df.time_end.between(pd.to_datetime('2024-09-01T00:00:00'), + pd.to_datetime('2024-09-16T00:00:00'), inclusive='right') + )].copy() jobs_df = jobs_df.reset_index(drop=True) - telemetry_start_timestamp = jobs_df['submit_time'].min() - telemetry_end_timestamp = jobs_df['submit_time'].max() + telemetry_start_timestamp = jobs_df['start_time'].min() + telemetry_end_timestamp = jobs_df['end_time'].max() telemetry_start = 0 telemetry_end = int((telemetry_end_timestamp - telemetry_start_timestamp).total_seconds()) @@ -99,7 +104,7 @@ def load_data_from_df(jobs_df: pd.DataFrame, **kwargs): else: cpu_power = jobs_df.loc[jidx, 'power_per_node'] - cpu_power_array = cpu_power.tolist() + cpu_power_array = [600] if (pd.isna(cpu_power) or cpu_power == 0) else cpu_power.tolist() cpu_min_power = nodes_required * config['POWER_CPU_IDLE'] * config['CPUS_PER_NODE'] cpu_max_power = nodes_required * config['POWER_CPU_MAX'] * config['CPUS_PER_NODE'] cpu_util = power_to_utilization(cpu_power_array, cpu_min_power, cpu_max_power) @@ -128,6 +133,8 @@ def load_data_from_df(jobs_df: pd.DataFrame, **kwargs): else: # Prescribed replay scheduled_nodes = None time_offset = next_arrival(1/config['JOB_ARRIVAL_TIME']) + + trace_quanta = config['TRACE_QUANTA'] if cpu_trace.size > 0 and time_offset >= 0: job_info = job_dict(nodes_required = nodes_required, @@ -142,7 +149,8 @@ def load_data_from_df(jobs_df: pd.DataFrame, **kwargs): id = job_id, priority = priority, submit_time = time_offset, - wall_time = wall_time) + wall_time = wall_time, + trace_quanta=trace_quanta) jobs.append(Job(job_info)) return jobs, telemetry_start, telemetry_end diff --git a/raps/schedulers/fastsim.py b/raps/schedulers/fastsim.py index eb5b263..14d04bf 100644 --- a/raps/schedulers/fastsim.py +++ b/raps/schedulers/fastsim.py @@ -10,7 +10,7 @@ from ..args import args from raps.config import ConfigManager # Run with this command: -# python main.py --system kestrel -f ../../data/fastsim_jobs_output.parquet --scheduler fastsim --policy priority --start 2024-09-01T00:00 --end 2024-09-15T00:00 +# python main.py --system kestrel -f ../data/fastsim_jobs_output.parquet --scheduler fastsim --policy priority --start 2024-09-01T00:00 --end 2024-09-15T00:00 class Scheduler(): """ diff --git a/raps/ui.py b/raps/ui.py index d9c3bbe..aaa7435 100644 --- a/raps/ui.py +++ b/raps/ui.py @@ -24,6 +24,7 @@ from raps.utils import summarize_ranges, convert_seconds_to_hhmmss, convert_seco from raps.constants import ELLIPSES from raps.engine import TickData, Engine +MAX_ROWS = 20 class LayoutManager: def __init__(self, layout_type, engine: Engine, total_timesteps=0, debug=None, args_dict=None, **config): @@ -153,7 +154,7 @@ class LayoutManager: table.add_column(col, justify="center") # Add data rows - for job in jobs: + for job in jobs[:MAX_ROWS]: # Number of requested nodes as a string # n_nodes = str(job.nodes_required) # Unused -- GitLab From 3cea79eb09d2f1005d8ace76d82b5b51d2a7c371 Mon Sep 17 00:00:00 2001 From: kevinmenear Date: Thu, 28 Aug 2025 18:14:27 -0400 Subject: [PATCH 3/3] Update FastSim integration for new config setup --- config/kestrel.yaml | 53 +++++++++++++++++++++++++++++++++++++ raps/dataloaders/kestrel.py | 4 +-- raps/engine.py | 4 +-- raps/schedulers/fastsim.py | 20 +++++++------- raps/sim_config.py | 2 +- raps/ui.py | 4 +-- 6 files changed, 70 insertions(+), 17 deletions(-) create mode 100644 config/kestrel.yaml diff --git a/config/kestrel.yaml b/config/kestrel.yaml new file mode 100644 index 0000000..15e3ece --- /dev/null +++ b/config/kestrel.yaml @@ -0,0 +1,53 @@ +system: + num_cdus: 6 + racks_per_cdu: 6 + nodes_per_rack: 80 + rectifiers_per_rack: 6 + chassis_per_rack: 1 + nodes_per_blade: 1 + switches_per_chassis: 5 + nics_per_node: 2 + rectifiers_per_chassis: 5 + nodes_per_rectifier: 4 + missing_racks: [] + down_nodes: [] + cpus_per_node: 1 + gpus_per_node: 4 + cpu_peak_flops: 396800000000.0 + gpu_peak_flops: 7800000000000.0 + cpu_fp_ratio: 0.69 + gpu_fp_ratio: 0.69 + +power: + power_gpu_idle: 75 + power_gpu_max: 300 + power_cpu_idle: 100 + power_cpu_max: 800 + power_mem: 74.26 + power_nic: 21 + power_nvme: 45 + power_switch: 250 + power_cdu: 0 + power_update_freq: 20 + rectifier_peak_threshold: 13670 + sivoc_loss_constant: 0 + sivoc_efficiency: 1 + rectifier_loss_constant: 0 + rectifier_efficiency: 1 + power_cost: 0.094 + +scheduler: + seed: 42 + job_arrival_time: 20 + mtbf: 11 + trace_quanta: 20 + min_wall_time: 3600 + max_wall_time: 43200 + ui_update_freq: 3600 + max_nodes_per_job: 3000 + job_end_probs: + COMPLETED: 0.63 + FAILED: 0.13 + CANCELLED: 0.12 + TIMEOUT: 0.11 + NODE_FAIL: 0.01 \ No newline at end of file diff --git a/raps/dataloaders/kestrel.py b/raps/dataloaders/kestrel.py index 8d6ef82..8b8470a 100644 --- a/raps/dataloaders/kestrel.py +++ b/raps/dataloaders/kestrel.py @@ -61,7 +61,7 @@ def load_data_from_df(jobs_df: pd.DataFrame, **kwargs): jobs_df = jobs_df.sort_values(by='submit_time') jobs_df = jobs_df[(jobs_df.start_time.between(pd.to_datetime('2024-09-01T00:00:00'), pd.to_datetime('2024-09-16T00:00:00'), inclusive='right') | - jobs_df.time_end.between(pd.to_datetime('2024-09-01T00:00:00'), + jobs_df.end_time.between(pd.to_datetime('2024-09-01T00:00:00'), pd.to_datetime('2024-09-16T00:00:00'), inclusive='right') )].copy() jobs_df = jobs_df.reset_index(drop=True) @@ -149,7 +149,7 @@ def load_data_from_df(jobs_df: pd.DataFrame, **kwargs): id = job_id, priority = priority, submit_time = time_offset, - wall_time = wall_time, + time_limit = wall_time, trace_quanta=trace_quanta) jobs.append(Job(job_info)) diff --git a/raps/engine.py b/raps/engine.py index d247cdc..896bd6c 100644 --- a/raps/engine.py +++ b/raps/engine.py @@ -269,7 +269,7 @@ class Engine: self.resource_manager.free_nodes_from_job(job) killed_jobs = [job for job in self.running if - job.start_time + job.time_limit <= self.current_timestep] + job.end_time is not None and job.start_time + job.time_limit <= self.current_timestep] for job in killed_jobs: self.power_manager.set_idle(job.scheduled_nodes) @@ -401,7 +401,7 @@ class Engine: job.running_time = self.current_timestep - job.start_time if job.current_state != JobState.RUNNING: - raise ValueError(f"Job is in running list, but state is not RUNNING: job.state == {job.currentstate}") + raise ValueError(f"Job {job.id} is in running list, but state is not RUNNING: job.state == {job.current_state}") else: # if job.state == JobState.RUNNING: # Error checks if job.running_time > job.time_limit and job.end_time is not None: diff --git a/raps/schedulers/fastsim.py b/raps/schedulers/fastsim.py index 14d04bf..855dcbd 100644 --- a/raps/schedulers/fastsim.py +++ b/raps/schedulers/fastsim.py @@ -5,9 +5,9 @@ import zmq from ..policy import PolicyType, BackfillType from raps.telemetry import Telemetry -from ..job import Job, JobState -from ..args import args -from raps.config import ConfigManager +from ..job import JobState +from raps.sim_config import args +from raps.system_config import get_system_config # Run with this command: # python main.py --system kestrel -f ../data/fastsim_jobs_output.parquet --scheduler fastsim --policy priority --start 2024-09-01T00:00 --end 2024-09-15T00:00 @@ -57,8 +57,8 @@ class Scheduler(): # Build the Job objects from RAPS Telemetry (needed so ExaDigiT subsystems have objects) args_dict = vars(args) - cfg = ConfigManager(system_name=args.system).get_config() - args_dict['config'] = cfg + config = get_system_config(args.system).get_legacy() + args_dict['config'] = config td = Telemetry(**args_dict) print("...Now loading jobs to FastSim scheduler.") @@ -131,16 +131,16 @@ class Scheduler(): # job.end_time = t if job.start_time is not None: observed = t - job.start_time - if (job.wall_time is None) or (job.wall_time < observed): + if (job.time_limit is None) or (job.time_limit < observed): # This is necessary since RAPS is handling finishing jobs, but schedule is not always # called at every tick, even though the job may have finished in FastSim during that tick. # TODO: Deal with this, because it messes up the end time of some jobs. - # print(f"Extending {job.id} runtime {job.wall_time} to match observed {observed} at finish.") - job.wall_time = observed - # print((f"Job {job.id} is finished, start time: {job.start_time}, wall time: {job.wall_time}," + # print(f"Extending {job.id} runtime {job.time_limit} to match observed {observed} at finish.") + job.time_limit = observed + # print((f"Job {job.id} is finished, start time: {job.start_time}, wall time: {job.time_limit}," # f"end time: {job.end_time}, at time {t}. With nodes {job.scheduled_nodes}.")) job.end_time = t - job.wall_time = t - job.start_time + job.time_limit = t - job.start_time running.append(job) # Running list reflects exactly FastSim’s R_t diff --git a/raps/sim_config.py b/raps/sim_config.py index 127cec3..c7753cb 100644 --- a/raps/sim_config.py +++ b/raps/sim_config.py @@ -164,7 +164,7 @@ class SimConfig(BaseModel): # Synthetic workloads scheduler: Literal[ - "default", "scheduleflow", "nrel", "anl", "flux", "experimental", "multitenant", + "default", "scheduleflow", "fastsim", "anl", "flux", "experimental", "multitenant", ] = "default" """ Scheduler name """ policy: PolicyType | None = None diff --git a/raps/ui.py b/raps/ui.py index aaa7435..efde3cd 100644 --- a/raps/ui.py +++ b/raps/ui.py @@ -24,7 +24,7 @@ from raps.utils import summarize_ranges, convert_seconds_to_hhmmss, convert_seco from raps.constants import ELLIPSES from raps.engine import TickData, Engine -MAX_ROWS = 20 +MAX_ROWS = 30 class LayoutManager: def __init__(self, layout_type, engine: Engine, total_timesteps=0, debug=None, args_dict=None, **config): @@ -272,7 +272,7 @@ class LayoutManager: else: # For the curious: If the simulation time in seconds is large than # unix timestamp for Jan 2000 this is a unix timestamp, - time_str = f"{datetime.fromtimestamp(time_in_s).strftime("%Y-%m-%d %H:%M")}" + time_str = f"{datetime.fromtimestamp(time_in_s).strftime('%Y-%m-%d %H:%M')}" if timestep_start != 0: # append time simulated time_str += f"\nSim: {convert_seconds_to_hhmm(time_in_s - timestep_start)}" -- GitLab