From 00382592057371ddddc57d247024006cd126195e Mon Sep 17 00:00:00 2001 From: Wes Brewer Date: Thu, 30 Jan 2025 16:49:07 -0500 Subject: [PATCH 1/8] Add Frontier aging policy --- main.py | 9 +++++---- raps/policy.py | 9 +++++++++ 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/main.py b/main.py index 6e2ed10..d893fe5 100644 --- a/main.py +++ b/main.py @@ -82,8 +82,8 @@ if args.replay: extracted_date = "Date not found" DIR_NAME = create_casename() - # Read either npz file or telemetry parquet files - if args.replay[0].endswith(".npz"): + # Read telemetry data + if args.replay[0].endswith(".npz"): # read .npz file print(f"Loading {args.replay[0]}...") jobs = td.load_snapshot(args.replay[0]) @@ -98,9 +98,10 @@ if args.replay: job['requested_nodes'] = None job['submit_time'] = next_arrival(1 / config['JOB_ARRIVAL_TIME']) - else: + else: # custom data loader print(*args.replay) jobs = td.load_data(args.replay) + for job in jobs: job['priority'] = sc.policy.aging_boost(job['nodes_required']) td.save_snapshot(jobs, filename=DIR_NAME) # Set number of timesteps based on the last job running which we assume @@ -113,7 +114,7 @@ if args.replay: print(f'Simulating {len(jobs)} jobs for {timesteps} seconds') time.sleep(1) -else: +else: # synthetic jobs wl = Workload(config) jobs = getattr(wl, args.workload)(num_jobs=args.numjobs) diff --git a/raps/policy.py b/raps/policy.py index 1da5b5b..13c6f92 100644 --- a/raps/policy.py +++ b/raps/policy.py @@ -23,6 +23,15 @@ class Policy: else: raise ValueError(f"Unknown policy type: {self.policy_type}") + def aging_boost(self, nnodes): + """Frontier aging policy""" + if nnodes > 5645: + return 8 + elif nnodes > 1882: + return 4 + else: + return 0 + def find_backfill_job(self, queue, num_free_nodes, current_time): """ This implementation is based on pseudocode from Leonenkov and Zhumatiy. "Introducing new backfill-based scheduler for slurm resource manager." -- GitLab From 836ed68e3822383e97dec4a931e991909f6b6271 Mon Sep 17 00:00:00 2001 From: Matthias Maiterth Date: Thu, 30 Jan 2025 23:20:19 -0500 Subject: [PATCH 2/8] Separated Accounts from Jobs and the Scheduler --- main.py | 10 ++++++++-- raps/job.py | 37 +++++++++++++++++++++++++++++++------ raps/scheduler.py | 36 +++++++++++++++++++++--------------- raps/telemetry.py | 8 ++++---- raps/ui.py | 15 ++++++++------- raps/workload.py | 22 +++++++++++++--------- 6 files changed, 85 insertions(+), 43 deletions(-) diff --git a/main.py b/main.py index d893fe5..958599d 100644 --- a/main.py +++ b/main.py @@ -98,7 +98,7 @@ if args.replay: job['requested_nodes'] = None job['submit_time'] = next_arrival(1 / config['JOB_ARRIVAL_TIME']) - else: # custom data loader + else: # custom data loader print(*args.replay) jobs = td.load_data(args.replay) for job in jobs: job['priority'] = sc.policy.aging_boost(job['nodes_required']) @@ -208,7 +208,7 @@ if args.output: if args.uncertainties: # Parquet cannot handle annotated ufloat format AFAIK - print('Data dump not implemented using uncertainties!') + print('Data dump not implemented using uncertainties!') else: if cooling_model: df = pd.DataFrame(cooling_model.fmu_history) @@ -228,3 +228,9 @@ if args.output: json.dump(output_stats, f, indent=4) except: write_dict_to_file(output_stats, OPATH / 'stats.out') + + try: + with open(OPATH / 'account-stats.txt') as f: + json.dump(sc.accounts, f, indent=4) + except: + write_dict_to_file(vars(sc.accounts), OPATH / 'account-stats.out') diff --git a/raps/job.py b/raps/job.py index 84b7f52..75280e8 100644 --- a/raps/job.py +++ b/raps/job.py @@ -1,11 +1,12 @@ from enum import Enum -def job_dict(nodes_required, name, cpu_trace, gpu_trace, ntx_trace, nrx_trace, \ +def job_dict(nodes_required, name, account, cpu_trace, gpu_trace, ntx_trace, nrx_trace, \ wall_time, end_state, scheduled_nodes, time_offset, job_id, priority=0, partition=0): """ Return job info dictionary """ return { 'nodes_required': nodes_required, 'name': name, + 'account': account, 'cpu_trace': cpu_trace, 'gpu_trace': gpu_trace, 'ntx_trace': ntx_trace, @@ -34,27 +35,28 @@ class Job: """Represents a job to be scheduled and executed in the distributed computing system. Each job consists of various attributes such as the number of nodes required for execution, - CPU and GPU utilization, wall time, and other relevant parameters (see utils.job_dict). - The job can transition through different states during its lifecycle, including PENDING, + CPU and GPU utilization, wall time, and other relevant parameters (see utils.job_dict). + The job can transition through different states during its lifecycle, including PENDING, RUNNING, COMPLETED, CANCELLED, FAILED, or TIMEOUT. """ _id_counter = 0 def __init__(self, job_dict, current_time, state=JobState.PENDING): for key, value in job_dict.items(): setattr(self, key, value) - if not self.id: self.id = Job._get_next_id() + if not self.id: self.id = Job._get_next_id() # initializations self.start_time = None self.end_time = None self.running_time = 0 self.power = 0 self.scheduled_nodes = [] - self.power_history = [] + self.power_history = [] self._state = state def __repr__(self): """Return a string representation of the job.""" - return (f"Job(id={self.id}, name={self.name}, nodes_required={self.nodes_required}, " + return (f"Job(id={self.id}, name={self.name}, account={self.account}, " + f"nodes_required={self.nodes_required}, " f"cpu_trace={self.cpu_trace}, gpu_trace={self.gpu_trace}, wall_time={self.wall_time}, " f"end_state={self.end_state}, requested_nodes={self.requested_nodes}, " f"submit_time={self.submit_time}, start_time={self.start_time}, " @@ -90,3 +92,26 @@ class Job: """ cls._id_counter += 1 return cls._id_counter + + def statistics(self): + """ Derive job statistics from the Job Class and return + """ + return JobStatistics(self) + + + +class JobStatistics: + """ + Reduced class for handling statistics after the job has finished. + """ + + def __init__(self,job): + self.id = job.id + self.name = job.name + self.account = job.account + self.num_nodes = len(job.scheduled_nodes) + self.run_time = job.running_time + self.avg_node_power = sum(job.power_history) / len(job.power_history) / self.num_nodes + self.max_node_power = max(job.power_history) / self.num_nodes + self.energy = self.run_time * self.avg_node_power * self.num_nodes + diff --git a/raps/scheduler.py b/raps/scheduler.py index e6acf2d..6428f7c 100644 --- a/raps/scheduler.py +++ b/raps/scheduler.py @@ -46,9 +46,10 @@ from scipy.stats import weibull_min import pandas as pd from .job import Job, JobState +from .account import Accounts from .network import network_utilization from .policy import Policy, PolicyType -from .utils import summarize_ranges, expand_ranges +from .utils import summarize_ranges, expand_ranges, write_dict_to_file @dataclasses.dataclass @@ -88,6 +89,7 @@ class Scheduler: self.num_active_nodes = self.config['TOTAL_NODES'] - self.num_free_nodes - len(self.config['DOWN_NODES']) self.running = [] self.queue = [] + self.accounts = Accounts() self.jobs_completed = 0 self.current_time = 0 self.cooling_model = cooling_model @@ -172,12 +174,11 @@ class Scheduler: self.queue.append(job) # Note, this should be fixed. It shouldn't go to the end of the queue. break - def tick(self): """Simulate a timestep.""" completed_jobs = [job for job in self.running if job.end_time is not None and job.end_time <= self.current_time] - + completed_job_stats = [] # Simulate node failure newly_downed_nodes = self.node_failure(self.config['MTBF']) @@ -258,11 +259,16 @@ class Scheduler: if self.debug: print(f"Released {scheduled_nodes}") self.jobs_completed += 1 - + job_stats = job.statistics() + if self.debug: + print(job_stats) + completed_job_stats.append(job_stats) + self.accounts.update_account_statistics(job_stats) if self.output: + # output power trace with open(self.opath / f'job-power-{job.id}.txt', 'w') as file: print(*job.power_history, sep=', ', file=file) - + write_dict_to_file(vars(job_stats),self.opath / f'job-stats-{job.account}.json') # Ask scheduler to schedule any jobs waiting in queue self.schedule([]) @@ -289,7 +295,7 @@ class Scheduler: output_df = self.power_manager.get_power_df(rack_power, rack_loss) pflops = self.flops_manager.get_system_performance() / 1E15 gflop_per_watt = pflops * 1E6 / (total_power_kw * 1000) - else: + else: pflops, gflop_per_watt = None, None if self.current_time % self.config['POWER_UPDATE_FREQ'] == 0: @@ -297,14 +303,14 @@ class Scheduler: # Power for NUM_CDUS (25 for Frontier) cdu_power = rack_power.T[-1] * 1000 runtime_values = self.cooling_model.generate_runtime_values(cdu_power, self) - + # FMU inputs are N powers and the wetbulb temp fmu_inputs = self.cooling_model.generate_fmu_inputs(runtime_values, uncertainties=self.power_manager.uncertainties) cooling_inputs, cooling_outputs = ( self.cooling_model.step(self.current_time, fmu_inputs, self.config['POWER_UPDATE_FREQ']) ) - + # Get a dataframe of the power data power_df = self.power_manager.get_power_df(rack_power, rack_loss) else: @@ -349,9 +355,9 @@ class Scheduler: """ Generator that yields after each simulation tick """ last_submit_time = 0 self.timesteps = timesteps - if self.debug: + if self.debug: limits = self.get_gauge_limits() - + for timestep in range(timesteps): # Print the current timestep for this partition if timestep % self.config['UI_UPDATE_FREQ'] == 0: @@ -406,9 +412,9 @@ class Scheduler: 'jobs still running': [job.id for job in self.running], 'jobs still in queue': [job.id for job in self.queue], 'average power': f'{average_power_mw:.2f} MW', - 'min loss': f'{min_loss_mw:.2f} MW ({min_loss_pct*100:.2f}%)', + 'min loss': f'{min_loss_mw:.2f} MW ({min_loss_pct*100:.2f}%)', 'average loss': f'{average_loss_mw:.2f} MW ({loss_fraction*100:.2f}%)', - 'max loss': f'{max_loss_mw:.2f} MW ({max_loss_pct*100:.2f}%)', + 'max loss': f'{max_loss_mw:.2f} MW ({max_loss_pct*100:.2f}%)', 'system power efficiency': f'{efficiency*100:.2f}', 'total energy consumed': f'{total_energy_consumed:.2f} MW-hr', 'carbon emissions': f'{emissions:.2f} metric tons CO2', @@ -424,12 +430,12 @@ class Scheduler: # Create a NumPy array of node indices, excluding down nodes down_nodes = expand_ranges(self.down_nodes) - all_nodes = np.setdiff1d(np.arange(self.config['TOTAL_NODES']), + all_nodes = np.setdiff1d(np.arange(self.config['TOTAL_NODES']), np.array(down_nodes, dtype=int)) # Sample the Weibull distribution for all nodes at once - random_values = weibull_min.rvs(shape_parameter, - scale=scale_parameter, + random_values = weibull_min.rvs(shape_parameter, + scale=scale_parameter, size=all_nodes.size) # Identify nodes that have failed diff --git a/raps/telemetry.py b/raps/telemetry.py index aa2f61e..9c006a2 100644 --- a/raps/telemetry.py +++ b/raps/telemetry.py @@ -12,10 +12,10 @@ import argparse if __name__ == "__main__": parser = argparse.ArgumentParser(description='Telemetry data validator') parser.add_argument('--jid', type=str, default='*', help='Replay job id') - parser.add_argument('-f', '--replay', nargs='+', type=str, + parser.add_argument('-f', '--replay', nargs='+', type=str, help='Either: path/to/joblive path/to/jobprofile' + \ ' -or- filename.npz (overrides --workload option)') - parser.add_argument('-p', '--plot', action='store_true', help='Output plots') + parser.add_argument('-p', '--plot', action='store_true', help='Output plots') parser.add_argument('--system', type=str, default='frontier', help='System config to use') parser.add_argument('--reschedule', action='store_true', help='Reschedule the telemetry workload') parser.add_argument('-v', '--verbose', action='store_true', help='Enable verbose output') @@ -76,7 +76,7 @@ class Telemetry: """ Convert cdu index into a name""" return self.dataloader.cdu_index_to_name(index, config = self.config) - + def cdu_pos(self, index: int) -> tuple[int, int]: """ Return (row, col) tuple for a cdu index """ return self.dataloader.cdu_pos(index, config = self.config) @@ -88,7 +88,7 @@ if __name__ == "__main__": config = ConfigManager(system_name=args.system).get_config() args_dict['config'] = config td = Telemetry(**args_dict) - + if args.replay[0].endswith(".npz"): print(f"Loading {args.replay[0]}...") diff --git a/raps/ui.py b/raps/ui.py index 05aec10..dd7725f 100644 --- a/raps/ui.py +++ b/raps/ui.py @@ -86,7 +86,7 @@ class LayoutManager: Flag indicating whether to display node information (default is False). """ # Define columns with header styles - columns = ["JOBID", "WALL TIME", "NAME", "ST", "NODES", "NODE SEGMENTS"] + columns = ["JOBID", "WALL TIME", "NAME", "ACCOUNT", "ST", "NODES", "NODE SEGMENTS"] if show_nodes: columns.append("NODELIST") columns.append("TIME") @@ -111,6 +111,7 @@ class LayoutManager: str(job.id).zfill(5), convert_seconds(job.wall_time), str(job.name), + str(job.account), job.state.value, str(job.nodes_required), nodes_display, @@ -198,18 +199,18 @@ class LayoutManager: # Initialize data dictionary with keys from FMU_COLUMN_MAPPING fmu_cols = self.config['FMU_COLUMN_MAPPING'] data = {key: [] for key in fmu_cols.keys()} - + # Loop over each compute block in the datacenter_outputs dictionary for i in range(1, self.config['NUM_CDUS'] + 1): compute_block_key = f"simulator[1].datacenter[1].computeBlock[{i}].cdu[1].summary." - + # Append data to the corresponding lists dynamically using FMU_COLUMN_MAPPING keys for key in fmu_cols.keys(): data[key].append(cooling_outputs.get(compute_block_key + key)) - + # Convert to DataFrame df = pd.DataFrame(data) - + return df @@ -228,7 +229,7 @@ class LayoutManager: #power_columns = POWER_DF_HEADER[0:RACKS_PER_CDU + 2] + [POWER_DF_HEADER[-1]] # "CDU", "Rack 1", "Rack 2", "Rack 3", "Sum", "Loss" power_columns = self.power_df_header[0:self.racks_per_cdu + 2] + [self.power_df_header[-1]] # "CDU", "Rack 1", "Rack 2", "Rack 3", "Sum", "Loss" fmu_cols = self.config['FMU_COLUMN_MAPPING'] - + # Updated cooling keys to include temperature instead of pressure cooling_keys = ["T_prim_s_C", "T_prim_r_C", "T_sec_s_C", "T_sec_r_C"] @@ -335,7 +336,7 @@ class LayoutManager: for i, value in enumerate(row[display_columns]) ] table.add_row(*row_values) - + total_power_mw, total_loss_mw, percent_loss_str, total_power_kw, total_loss_kw = self.calculate_totals(power_df) # Convert to string with MW units diff --git a/raps/workload.py b/raps/workload.py index 6d22e5e..f6781f1 100644 --- a/raps/workload.py +++ b/raps/workload.py @@ -38,6 +38,9 @@ JOB_NAMES = ["LAMMPS", "GROMACS", "VASP", "Quantum ESPRESSO", "NAMD",\ "TensorFlow", "PyTorch", "BLAST", "Spark", "GAMESS",\ "ORCA", "Simulink", "MOOSE", "ELK"] +ACCT_NAMES = ["ACT01", "ACT02", "ACT03", "ACT04", "ACT05", "ACT06", "ACT07",\ + "ACT08", "ACT09", "ACT10", "ACT11", "ACT12", "ACT13", "ACT14"] + MAX_PRIORITY = 500000 from .utils import truncated_normalvariate, determine_state, next_arrival @@ -66,6 +69,7 @@ class Workload: nodes_required = random.randint(1, config['MAX_NODES_PER_JOB']) name = random.choice(JOB_NAMES) + account = random.choice(ACCT_NAMES) cpu_util = random.random() * config['CPUS_PER_NODE'] gpu_util = random.random() * config['GPUS_PER_NODE'] mu = (config['MAX_WALL_TIME'] + config['MIN_WALL_TIME']) / 2 @@ -79,7 +83,7 @@ class Workload: # Jobs arrive according to Poisson process time_to_next_job = next_arrival(1 / config['JOB_ARRIVAL_TIME']) - jobs.append(job_dict(nodes_required, name, cpu_trace, gpu_trace, net_tx, net_rx, \ + jobs.append(job_dict(nodes_required, name, account, cpu_trace, gpu_trace, net_tx, net_rx, \ wall_time, end_state, None, time_to_next_job, None, priority, partition)) return jobs @@ -177,8 +181,8 @@ class Workload: cpu_util, gpu_util = 1, 4 cpu_trace, gpu_trace = self.compute_traces(cpu_util, gpu_util, 10800, config['TRACE_QUANTA']) job_info = job_dict( - config['AVAILABLE_NODES'], - f"Max Test {partition}", + config['AVAILABLE_NODES'], + f"Max Test {partition}", cpu_trace, gpu_trace, net_tx, net_rx, len(gpu_trace) * config['TRACE_QUANTA'], 'COMPLETED', None, 100, None, 0, partition ) @@ -188,8 +192,8 @@ class Workload: cpu_util, gpu_util = 0, 4 cpu_trace, gpu_trace = self.compute_traces(cpu_util, gpu_util, 3600, config['TRACE_QUANTA']) job_info = job_dict( - config['AVAILABLE_NODES'], - f"OpenMxP {partition}", + config['AVAILABLE_NODES'], + f"OpenMxP {partition}", cpu_trace, gpu_trace, net_tx, net_rx, len(gpu_trace) * config['TRACE_QUANTA'], 'COMPLETED', None, 300, None, 0, partition ) @@ -199,8 +203,8 @@ class Workload: cpu_util, gpu_util = 0.33, 0.79 * 4 # based on 24-01-18 run cpu_trace, gpu_trace = self.compute_traces(cpu_util, gpu_util, 3600, config['TRACE_QUANTA']) job_info = job_dict( - config['AVAILABLE_NODES'], - f"HPL {partition}", + config['AVAILABLE_NODES'], + f"HPL {partition}", cpu_trace, gpu_trace, net_tx, net_rx, len(gpu_trace) * config['TRACE_QUANTA'], 'COMPLETED', None, 200, None, 0, partition ) @@ -210,8 +214,8 @@ class Workload: cpu_util, gpu_util = 0, 0 cpu_trace, gpu_trace = self.compute_traces(cpu_util, gpu_util, 3600, config['TRACE_QUANTA']) job_info = job_dict( - config['AVAILABLE_NODES'], - f"Idle Test {partition}", + config['AVAILABLE_NODES'], + f"Idle Test {partition}", cpu_trace, gpu_trace, net_tx, net_rx, len(gpu_trace) * config['TRACE_QUANTA'], 'COMPLETED', None, 0, None, 0, partition ) -- GitLab From 9f4ec7122c68e7c1ffa75e7a933d726358ca3b45 Mon Sep 17 00:00:00 2001 From: Wes Brewer Date: Fri, 31 Jan 2025 16:11:23 -0500 Subject: [PATCH 3/8] Add initial aging_boost when using --reschedule with Frontier telemetry (and cleanup/document main.py a bit) --- main.py | 14 ++++++++------ raps/dataloaders/frontier.py | 16 +++++++++++++++- 2 files changed, 23 insertions(+), 7 deletions(-) diff --git a/main.py b/main.py index 958599d..c09ecfb 100644 --- a/main.py +++ b/main.py @@ -61,15 +61,17 @@ args_dict['config'] = config flops_manager = FLOPSManager(**args_dict) sc = Scheduler( - power_manager = power_manager, flops_manager = flops_manager, - cooling_model = cooling_model, + power_manager=power_manager, + flops_manager=flops_manager, + cooling_model=cooling_model, **args_dict, ) -layout_manager = LayoutManager(args.layout, scheduler = sc, debug = args.debug, **config) +layout_manager = LayoutManager(args.layout, scheduler=sc, debug=args.debug, **config) if args.replay: - if args.fastforward: args.fastforward = convert_to_seconds(args.fastforward) + if args.fastforward: + args.fastforward = convert_to_seconds(args.fastforward) td = Telemetry(**args_dict) @@ -82,8 +84,8 @@ if args.replay: extracted_date = "Date not found" DIR_NAME = create_casename() - # Read telemetry data - if args.replay[0].endswith(".npz"): # read .npz file + # Read telemetry data (either npz file or via custom data loader) + if args.replay[0].endswith(".npz"): # replay .npz file print(f"Loading {args.replay[0]}...") jobs = td.load_snapshot(args.replay[0]) diff --git a/raps/dataloaders/frontier.py b/raps/dataloaders/frontier.py index a83d163..bb04711 100644 --- a/raps/dataloaders/frontier.py +++ b/raps/dataloaders/frontier.py @@ -17,6 +17,18 @@ from ..job import job_dict from ..utils import power_to_utilization, next_arrival, encrypt +def aging_boost(nnodes): + """Frontier aging policy as per documentation here: + https://docs.olcf.ornl.gov/systems/frontier_user_guide.html#job-priority-by-node-count + """ + if nnodes >= 5645: + return 8*24*3600 # seconds + elif nnodes >= 1882: + return 4*24*3600 + else: + return 0 + + def load_data(files, **kwargs): """ Reads job and job profile data from parquet files and parses them. @@ -133,15 +145,17 @@ def load_data_from_df(jobs_df: pd.DataFrame, jobprofile_df: pd.DataFrame, **kwar if reschedule: # Let the scheduler reschedule the jobs scheduled_nodes = None time_offset = next_arrival(1/config['JOB_ARRIVAL_TIME']) + priority = aging_boost(nodes_required) else: # Prescribed replay scheduled_nodes = [] + priority = 0 # not used for replay for xname in xnames: indices = xname_to_index(xname, config) scheduled_nodes.append(indices) if gpu_trace.size > 0 and (jid == job_id or jid == '*') and time_offset > 0: job_info = job_dict(nodes_required, name, cpu_trace, gpu_trace, [], [], wall_time, - end_state, scheduled_nodes, time_offset, job_id) + end_state, scheduled_nodes, time_offset, job_id, priority) jobs.append(job_info) return jobs -- GitLab From dfe8b08d64413c3478b976b573854b3087debb58 Mon Sep 17 00:00:00 2001 From: Wes Brewer Date: Thu, 30 Jan 2025 16:49:07 -0500 Subject: [PATCH 4/8] Add Frontier aging policy --- main.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/main.py b/main.py index c09ecfb..7a08181 100644 --- a/main.py +++ b/main.py @@ -84,8 +84,8 @@ if args.replay: extracted_date = "Date not found" DIR_NAME = create_casename() - # Read telemetry data (either npz file or via custom data loader) - if args.replay[0].endswith(".npz"): # replay .npz file + # Read telemetry data + if args.replay[0].endswith(".npz"): # read .npz file print(f"Loading {args.replay[0]}...") jobs = td.load_snapshot(args.replay[0]) -- GitLab From 2d499a8bbbbdd238f77839ae83ac100234c8f845 Mon Sep 17 00:00:00 2001 From: Wes Brewer Date: Fri, 31 Jan 2025 17:32:08 -0500 Subject: [PATCH 5/8] Dump output of scheduler history at end of simulation for validation --- main.py | 6 ++++++ raps/scheduler.py | 5 +++++ 2 files changed, 11 insertions(+) diff --git a/main.py b/main.py index c190ab1..be2ca88 100644 --- a/main.py +++ b/main.py @@ -155,6 +155,12 @@ try: except: print(output_stats) +# Schedule history +pd.set_option('display.max_columns', None) +pd.set_option('display.max_rows', None) +schedule_history = pd.DataFrame(sc.get_history()) +print(schedule_history) + if args.plot: if 'power' in args.plot: pl = Plotter('Time (s)', 'Power (kW)', 'Power History', \ diff --git a/raps/scheduler.py b/raps/scheduler.py index e6acf2d..b357c08 100644 --- a/raps/scheduler.py +++ b/raps/scheduler.py @@ -99,6 +99,7 @@ class Scheduler: self.replay = kwargs.get('replay') self.policy = Policy(strategy=kwargs.get('schedule')) self.sys_util_history = [] + self.history = [] def add_job(self, job): @@ -150,6 +151,7 @@ class Scheduler: # Schedule job self.assign_nodes_to_job(job) + self.history.append(dict(id=job.id, time=self.current_time, nodes=job.nodes_required, wall_time=job.wall_time)) if self.debug: scheduled_nodes = summarize_ranges(job.scheduled_nodes) @@ -373,6 +375,9 @@ class Scheduler: if self.debug and timestep % self.config['UI_UPDATE_FREQ'] == 0: print(".", end="", flush=True) + def get_history(self): + return self.history + def get_stats(self): """ Return output statistics """ sum_values = lambda values : sum(x[1] for x in values) -- GitLab From a95e6850228276edd975cfe67b80a6f8389b924a Mon Sep 17 00:00:00 2001 From: Matthias Maiterth Date: Mon, 3 Feb 2025 11:40:44 -0500 Subject: [PATCH 6/8] update to write out account stats into json --- args.py | 2 + main.py | 10 ++-- raps/account.py | 116 ++++++++++++++++++++++++++++++++++++++++++++++ raps/scheduler.py | 2 + raps/utils.py | 22 ++++++--- 5 files changed, 142 insertions(+), 10 deletions(-) create mode 100644 raps/account.py diff --git a/args.py b/args.py index a209120..2623eb6 100644 --- a/args.py +++ b/args.py @@ -34,6 +34,8 @@ parser.add_argument('-w', '--workload', type=str, choices=choices, default=choic choices = ['layout1', 'layout2'] parser.add_argument('-x', '--partitions', nargs='+', default=None, help='List of machine configurations to use, e.g., -x setonix-cpu setonix-gpu') parser.add_argument('--layout', type=str, choices=choices, default=choices[0], help='Layout of UI') +parser.add_argument('--accounts-json', type=str, help='Json of account stats generated in previous run. see raps/accounts.py') + args = parser.parse_args() args_dict = vars(args) print(args_dict) diff --git a/main.py b/main.py index a29035d..e6d3e16 100644 --- a/main.py +++ b/main.py @@ -30,6 +30,7 @@ from raps.telemetry import Telemetry from raps.workload import Workload from raps.weather import Weather from raps.utils import create_casename, convert_to_seconds, write_dict_to_file, next_arrival +from raps.utils import toJSON config = ConfigManager(system_name=args.system).get_config() @@ -238,7 +239,8 @@ if args.output: write_dict_to_file(output_stats, OPATH / 'stats.out') try: - with open(OPATH / 'account-stats.txt') as f: - json.dump(sc.accounts, f, indent=4) - except: - write_dict_to_file(vars(sc.accounts), OPATH / 'account-stats.out') + with open(OPATH / 'accounts.json', 'w') as f: + json_string = json.dumps(sc.accounts.to_dict()) + f.write(json_string) + except TypeError: + raise TypeError(f"{sc.accounts} could not be parsed by json.dump") diff --git a/raps/account.py b/raps/account.py new file mode 100644 index 0000000..0ade4bf --- /dev/null +++ b/raps/account.py @@ -0,0 +1,116 @@ +""" +Module to capture Account classes Classes: +- Account: representation of an Account +- Accounts: collection of all accounts +""" +import json +from .job import JobStatistics + + +class Account: + """Represents an account of a user. + + Each users holds attributes for accounting and statistics, which is used + for summaries + Each job consists of various attributes such as the number of nodes required for execution, + CPU and GPU utilization, wall time, and other relevant parameters (see utils.job_dict). + The job can transition through different states during its lifecycle, including PENDING, + RUNNING, COMPLETED, CANCELLED, FAILED, or TIMEOUT. + """ + + def __init__(self, id, name, priority): + self.id = id + self.name = name + self.priority = priority + self.total_jobs = 0 + self.time_allocated = 0 + self.energy_allocated = 0 + self.avg_power = 0 + self.fugaku_points = 0 + + def update_statistics(self, jobstats, average_user): + self.total_jobs += 1 + self.time_allocated += jobstats.run_time + self.energy_allocated += jobstats.energy + self.avg_power = self.energy_allocated / self.time_allocated + if average_user.avg_power == 0: # If this is the first job use own power + average_user.avg_power = self.avg_power + if average_user.avg_power != 0: # If no energy was computed no points can be computed. + self.fugaku_points = (average_user.energy_allocated - self.energy_allocated) / average_user.avg_power + + def __repr__(self): + return (f"Account(id={self.id}, name={self.name}), " + f"priority: {self.priority}, " + f"total_jobs: {self.total_jobs}, " + f"time_allocated: {self.time_allocated}, " + f"energy_allocated: {self.energy_allocated}, " + f"avg_power: {self.avg_power}, " + f"fugaku_points: {self.fugaku_points}, " + ) + + def to_dict(self): + return { + "id": self.id, + "name": self.name, + "priority": self.priority, + "total_jobs":self.total_jobs, + "time_allocated":self.time_allocated, + "energy_allocated":self.energy_allocated, + "avg_power":self.avg_power, + "fugaku_points":self.fugaku_points + } + + +class Accounts: + + def update_average_user(self): + total_accounts = len(self.account_dict) + self.average_user.total_jobs = self.all_users.total_jobs / total_accounts + self.average_user.time_allocated = self.all_users.time_allocated / total_accounts + self.average_user.energy_allocated = self.all_users.energy_allocated / total_accounts + self.average_user.avg_power = self.all_users.avg_power / total_accounts + self.fugaku_points = self.all_users.fugaku_points / total_accounts # this should be 0 + + def __init__(self): + self._account_id = 0 + self.account_dict = dict() + self.all_users = Account(-2,"All_Users",0) + self.average_user = Account(-1,"Avg_User",0) + + def initialize_accounts_from_json(self,filename): + try: + with open(filename, 'r', encoding='utf-8') as file: + json_object = json.load(file) + if '_account_id' in json_object: + self._account_id = json_object['_account_id'] + if 'account_dict' in json_object: + self.account_dict = json_object['account_dict'] + if 'all_users' in json_object: + self.all_users = json_object['all_users'] + if 'average_user' in json_object: + self.average_user = json_object['average_user'] + except ValueError: + raise ValueError(f"{file} could not be read using json.load()") + + def update_account_statistics(self,jobstats): + #update specific account associated with job + if isinstance(jobstats, JobStatistics): + if jobstats.account not in self.account_dict: + self.account_dict[jobstats.account] = Account(self._account_id,jobstats.account,0) + account = self.account_dict[jobstats.account] + account.update_statistics(jobstats,self.average_user) + self.account_dict[jobstats.account] = account + #update the average_user account and the summary account + self.all_users.update_statistics(jobstats,self.average_user) + self.update_average_user() + + def to_dict(self): + acct_dict = {} + for account_name,account in self.account_dict.items(): + acct_dict[account_name] = account.to_dict() + ret_dict = {} + ret_dict['_account_id'] = self._account_id + ret_dict['account_dict'] = acct_dict + ret_dict['all_users'] = self.all_users.to_dict() + ret_dict['average_user'] = self.average_user.to_dict() + return ret_dict diff --git a/raps/scheduler.py b/raps/scheduler.py index 751baea..6103b2a 100644 --- a/raps/scheduler.py +++ b/raps/scheduler.py @@ -90,6 +90,8 @@ class Scheduler: self.running = [] self.queue = [] self.accounts = Accounts() + if 'accounts_json' in kwargs and kwargs['accounts_json']: + self.accounts.initialize_accounts_from_json(kwargs.get('accounts_json')) self.jobs_completed = 0 self.current_time = 0 self.cooling_model = cooling_model diff --git a/raps/utils.py b/raps/utils.py index e0d79e9..6c9d4d7 100644 --- a/raps/utils.py +++ b/raps/utils.py @@ -15,6 +15,7 @@ import pandas as pd import random import sys import uuid +import json def convert_seconds(seconds): @@ -243,7 +244,7 @@ def create_binary_array_numpy(max_time, trace_quanta, util): def extract_data_csv(fileName, skiprows, header): """ Read passed csv file path - @ In, filename, dataframe, facility telemetry data + @ In, filename, dataframe, facility telemetry data @ In, skiprows, int, number of rows to be skipped @ In, header, list, header of output dataframe @ Out, df, dataframe, read file returned as a dataframe @@ -254,7 +255,7 @@ def extract_data_csv(fileName, skiprows, header): return df def resampledf(df, time_resampled): - """ Match key and return idx + """ Match key and return idx @ In, None @ Out, CDU_names, list, list of CDU names """ @@ -301,7 +302,7 @@ def create_casename(prefix=''): def next_arrival(lambda_rate): if not hasattr(next_arrival, 'next_time'): # Initialize the first time it's called - next_arrival.next_time = 0 + next_arrival.next_time = 0 else: next_arrival.next_time += \ -math.log(1.0 - random.random()) / lambda_rate @@ -316,15 +317,15 @@ def convert_to_seconds(time_str): 'm': 60, # 1 minute = 60 seconds 's': 1 # 1 second = 1 second } - + # Check if the input string ends with a unit or is purely numeric if time_str[-1].isdigit(): return int(time_str) # Directly return the number if it's purely numeric - + # Extract the numeric part and the time unit num = int(time_str[:-1]) unit = time_str[-1] - + # Convert to seconds using the conversion factors if unit in time_factors: return num * time_factors[unit] @@ -350,3 +351,12 @@ def write_dict_to_file(dictionary, file_path): file.write("}\n") else: file.write(f"{key}: {value}\n") + + +def toJSON(obj): + """Function to dump a json string from object""" + return json.dumps( + obj, + default=lambda o:o.__dict__, + sort_keys=True, + indent=4) -- GitLab From 9395b8d3580b6f30ac9e1abe658cb63295f0421e Mon Sep 17 00:00:00 2001 From: Matthias Maiterth Date: Mon, 3 Feb 2025 13:25:57 -0500 Subject: [PATCH 7/8] Added method to intiaizliae account from a json dict. --- raps/account.py | 26 +++++++++++++++++++++++--- 1 file changed, 23 insertions(+), 3 deletions(-) diff --git a/raps/account.py b/raps/account.py index 0ade4bf..8c9dbf9 100644 --- a/raps/account.py +++ b/raps/account.py @@ -60,6 +60,19 @@ class Account: "fugaku_points":self.fugaku_points } + @classmethod + def init_from_dict(acct, account_dict): # id ,name, priority, total_jobs, time_allocated, energy_allocated, avg_power, fugaku_points): + acct = Account(account_dict["id"],account_dict["name"],account_dict["priority"]) + acct.id = account_dict["id"] + acct.name = account_dict["name"] + acct.priority = account_dict["priority"] + acct.total_jobs = account_dict["total_jobs"] + acct.time_allocated = account_dict["time_allocated"] + acct.energy_allocated = account_dict["energy_allocated"] + acct.avg_power = account_dict["avg_power"] + acct.fugaku_points = account_dict["fugaku_points"] + return acct + class Accounts: @@ -70,6 +83,7 @@ class Accounts: self.average_user.energy_allocated = self.all_users.energy_allocated / total_accounts self.average_user.avg_power = self.all_users.avg_power / total_accounts self.fugaku_points = self.all_users.fugaku_points / total_accounts # this should be 0 + return self def __init__(self): self._account_id = 0 @@ -84,11 +98,16 @@ class Accounts: if '_account_id' in json_object: self._account_id = json_object['_account_id'] if 'account_dict' in json_object: - self.account_dict = json_object['account_dict'] + json_dict = json_object['account_dict'] + self.account_dict = {} + for account_name,account_dict in json_dict.items(): + self.account_dict[account_name] = Account.init_from_dict(account_dict) + #self.account_dict = json_object['account_dict'] + if 'all_users' in json_object: - self.all_users = json_object['all_users'] + self.all_users = Account.init_from_dict(json_object['all_users']) if 'average_user' in json_object: - self.average_user = json_object['average_user'] + self.average_user = Account.init_from_dict(json_object['average_user']) except ValueError: raise ValueError(f"{file} could not be read using json.load()") @@ -97,6 +116,7 @@ class Accounts: if isinstance(jobstats, JobStatistics): if jobstats.account not in self.account_dict: self.account_dict[jobstats.account] = Account(self._account_id,jobstats.account,0) + self._account_id += 1 account = self.account_dict[jobstats.account] account.update_statistics(jobstats,self.average_user) self.account_dict[jobstats.account] = account -- GitLab From 4259c4a0e3bfa1a8a0a36d9a1ad9b82f3d9af5ce Mon Sep 17 00:00:00 2001 From: Wes Brewer Date: Mon, 3 Feb 2025 14:44:44 -0500 Subject: [PATCH 8/8] Fix a few bugs related to the addition of account management --- main.py | 1 - raps/account.py | 5 ++++- raps/dataloaders/adastraMI250.py | 3 ++- raps/dataloaders/frontier.py | 4 +++- raps/dataloaders/fugaku.py | 2 ++ raps/dataloaders/lassen.py | 1 + raps/dataloaders/marconi100.py | 3 ++- raps/job.py | 12 ++++++++---- raps/policy.py | 9 --------- raps/workload.py | 11 +++++++---- 10 files changed, 29 insertions(+), 22 deletions(-) diff --git a/main.py b/main.py index e6d3e16..7338964 100644 --- a/main.py +++ b/main.py @@ -104,7 +104,6 @@ if args.replay: else: # custom data loader print(*args.replay) jobs = td.load_data(args.replay) - for job in jobs: job['priority'] = sc.policy.aging_boost(job['nodes_required']) td.save_snapshot(jobs, filename=DIR_NAME) # Set number of timesteps based on the last job running which we assume diff --git a/raps/account.py b/raps/account.py index 8c9dbf9..033bcb9 100644 --- a/raps/account.py +++ b/raps/account.py @@ -32,7 +32,10 @@ class Account: self.total_jobs += 1 self.time_allocated += jobstats.run_time self.energy_allocated += jobstats.energy - self.avg_power = self.energy_allocated / self.time_allocated + if self.time_allocated == 0: + self.avg_power = 0 + else: + self.avg_power = self.energy_allocated / self.time_allocated if average_user.avg_power == 0: # If this is the first job use own power average_user.avg_power = self.avg_power if average_user.avg_power != 0: # If no energy was computed no points can be computed. diff --git a/raps/dataloaders/adastraMI250.py b/raps/dataloaders/adastraMI250.py index b71772b..1f20baf 100644 --- a/raps/dataloaders/adastraMI250.py +++ b/raps/dataloaders/adastraMI250.py @@ -82,6 +82,7 @@ def load_data_from_df(jobs_df: pd.DataFrame, **kwargs): # Map dataframe to job state. Add results to jobs list for jidx in tqdm(range(num_jobs - 1), total=num_jobs, desc="Processing Jobs"): + account = jobs_df.loc[jidx, 'user_id'] # or 'group_id' job_id = jobs_df.loc[jidx, 'job_id'] if not jid == '*': @@ -163,7 +164,7 @@ def load_data_from_df(jobs_df: pd.DataFrame, **kwargs): if time_offset >= 0 and wall_time > 0: #print("start_time",time_start,"\tend_time",time_end,"\twall_time",wall_time,"\tquanta wall time",gpu_trace.size * TRACE_QUANTA ) - job_info = job_dict(nodes_required, name, cpu_trace, gpu_trace, [],[],wall_time, + job_info = job_dict(nodes_required, name, account, cpu_trace, gpu_trace, [],[],wall_time, end_state, scheduled_nodes, time_offset, job_id, priority) jobs.append(job_info) else: diff --git a/raps/dataloaders/frontier.py b/raps/dataloaders/frontier.py index bb04711..2f4af17 100644 --- a/raps/dataloaders/frontier.py +++ b/raps/dataloaders/frontier.py @@ -95,6 +95,8 @@ def load_data_from_df(jobs_df: pd.DataFrame, jobprofile_df: pd.DataFrame, **kwar # Map dataframe to job state. Add results to jobs list for jidx in tqdm(range(num_jobs - 1), total=num_jobs, desc="Processing Jobs"): + user = jobs_df.loc[jidx, 'user'] + account = jobs_df.loc[jidx, 'account'] job_id = jobs_df.loc[jidx, 'job_id'] allocation_id = jobs_df.loc[jidx, 'allocation_id'] nodes_required = jobs_df.loc[jidx, 'node_count'] @@ -154,7 +156,7 @@ def load_data_from_df(jobs_df: pd.DataFrame, jobprofile_df: pd.DataFrame, **kwar scheduled_nodes.append(indices) if gpu_trace.size > 0 and (jid == job_id or jid == '*') and time_offset > 0: - job_info = job_dict(nodes_required, name, cpu_trace, gpu_trace, [], [], wall_time, + job_info = job_dict(nodes_required, name, account, cpu_trace, gpu_trace, [], [], wall_time, end_state, scheduled_nodes, time_offset, job_id, priority) jobs.append(job_info) diff --git a/raps/dataloaders/fugaku.py b/raps/dataloaders/fugaku.py index de1610f..fcd4dbc 100644 --- a/raps/dataloaders/fugaku.py +++ b/raps/dataloaders/fugaku.py @@ -68,6 +68,7 @@ def load_data_from_df(df, **kwargs): # Loop through the DataFrame rows to extract job information for _, row in tqdm(df.iterrows(), total=len(df), desc="Processing Jobs"): nodes_required = row['nnumr'] if 'nnumr' in df.columns else 0 + account = row['usr'] name = row['jnam'] if 'jnam' in df.columns else 'unknown' if validate: @@ -95,6 +96,7 @@ def load_data_from_df(df, **kwargs): job_info = job_dict( nodes_required=nodes_required, name=name, + account=account, cpu_trace=cpu_trace, gpu_trace=gpu_trace, ntx_trace=[], diff --git a/raps/dataloaders/lassen.py b/raps/dataloaders/lassen.py index d307bd1..ebdf348 100644 --- a/raps/dataloaders/lassen.py +++ b/raps/dataloaders/lassen.py @@ -140,6 +140,7 @@ def load_data_from_df(allocation_df, node_df, step_df, **kwargs): job_info = job_dict(nodes_required, row['hashed_user_id'], + row['hashed_user_group_id'], cpu_trace, gpu_trace, net_tx, net_rx, wall_time, row['exit_status'], scheduled_nodes, diff --git a/raps/dataloaders/marconi100.py b/raps/dataloaders/marconi100.py index aa06c35..1348f23 100644 --- a/raps/dataloaders/marconi100.py +++ b/raps/dataloaders/marconi100.py @@ -85,6 +85,7 @@ def load_data_from_df(jobs_df: pd.DataFrame, **kwargs): # Map dataframe to job state. Add results to jobs list for jidx in tqdm(range(num_jobs - 1), total=num_jobs, desc="Processing Jobs"): + account = jobs_df.loc[jidx, 'user_id'] # or 'group_id' job_id = jobs_df.loc[jidx, 'job_id'] if not jid == '*': @@ -150,7 +151,7 @@ def load_data_from_df(jobs_df: pd.DataFrame, **kwargs): scheduled_nodes = (jobs_df.loc[jidx, 'nodes']).tolist() if gpu_trace.size > 0 and time_offset >= 0: - job_info = job_dict(nodes_required, name, cpu_trace, gpu_trace, [], [], wall_time, + job_info = job_dict(nodes_required, name, account, cpu_trace, gpu_trace, [], [], wall_time, end_state, scheduled_nodes, time_offset, job_id, priority) jobs.append(job_info) diff --git a/raps/job.py b/raps/job.py index 75280e8..6ce9f8d 100644 --- a/raps/job.py +++ b/raps/job.py @@ -41,7 +41,7 @@ class Job: """ _id_counter = 0 - def __init__(self, job_dict, current_time, state=JobState.PENDING): + def __init__(self, job_dict, current_time, state=JobState.PENDING, account=None): for key, value in job_dict.items(): setattr(self, key, value) if not self.id: self.id = Job._get_next_id() # initializations @@ -52,6 +52,7 @@ class Job: self.scheduled_nodes = [] self.power_history = [] self._state = state + self.account = account def __repr__(self): """Return a string representation of the job.""" @@ -111,7 +112,10 @@ class JobStatistics: self.account = job.account self.num_nodes = len(job.scheduled_nodes) self.run_time = job.running_time - self.avg_node_power = sum(job.power_history) / len(job.power_history) / self.num_nodes - self.max_node_power = max(job.power_history) / self.num_nodes + if len(job.power_history) == 0: + self.avg_node_power = 0 + self.max_node_power = 0 + else: + self.avg_node_power = sum(job.power_history) / len(job.power_history) / self.num_nodes + self.max_node_power = max(job.power_history) / self.num_nodes self.energy = self.run_time * self.avg_node_power * self.num_nodes - diff --git a/raps/policy.py b/raps/policy.py index 13c6f92..1da5b5b 100644 --- a/raps/policy.py +++ b/raps/policy.py @@ -23,15 +23,6 @@ class Policy: else: raise ValueError(f"Unknown policy type: {self.policy_type}") - def aging_boost(self, nnodes): - """Frontier aging policy""" - if nnodes > 5645: - return 8 - elif nnodes > 1882: - return 4 - else: - return 0 - def find_backfill_job(self, queue, num_free_nodes, current_time): """ This implementation is based on pseudocode from Leonenkov and Zhumatiy. "Introducing new backfill-based scheduler for slurm resource manager." diff --git a/raps/workload.py b/raps/workload.py index f6781f1..14d91cc 100644 --- a/raps/workload.py +++ b/raps/workload.py @@ -112,6 +112,7 @@ class Workload: job_info = job_dict( config['AVAILABLE_NODES'], # Nodes required f"Max Test {partition}", # Name with partition label + ACCT_NAMES[0], # User account cpu_trace, # CPU trace gpu_trace, # GPU trace net_tx, # Network transmit trace @@ -149,6 +150,7 @@ class Workload: job_info = job_dict( config['AVAILABLE_NODES'], # Nodes required f"Idle Test {partition}", # Name with partition label + ACCT_NAMES[0], # User account cpu_trace, # CPU trace gpu_trace, # GPU trace net_tx, # Network transmit trace @@ -170,6 +172,7 @@ class Workload: # List to hold jobs for all partitions jobs = [] + account = ACCT_NAMES[0] # Iterate through each partition and its config for partition in self.partitions: @@ -182,7 +185,7 @@ class Workload: cpu_trace, gpu_trace = self.compute_traces(cpu_util, gpu_util, 10800, config['TRACE_QUANTA']) job_info = job_dict( config['AVAILABLE_NODES'], - f"Max Test {partition}", + f"Max Test {partition}", account, cpu_trace, gpu_trace, net_tx, net_rx, len(gpu_trace) * config['TRACE_QUANTA'], 'COMPLETED', None, 100, None, 0, partition ) @@ -193,7 +196,7 @@ class Workload: cpu_trace, gpu_trace = self.compute_traces(cpu_util, gpu_util, 3600, config['TRACE_QUANTA']) job_info = job_dict( config['AVAILABLE_NODES'], - f"OpenMxP {partition}", + f"OpenMxP {partition}", account, cpu_trace, gpu_trace, net_tx, net_rx, len(gpu_trace) * config['TRACE_QUANTA'], 'COMPLETED', None, 300, None, 0, partition ) @@ -204,7 +207,7 @@ class Workload: cpu_trace, gpu_trace = self.compute_traces(cpu_util, gpu_util, 3600, config['TRACE_QUANTA']) job_info = job_dict( config['AVAILABLE_NODES'], - f"HPL {partition}", + f"HPL {partition}", account, cpu_trace, gpu_trace, net_tx, net_rx, len(gpu_trace) * config['TRACE_QUANTA'], 'COMPLETED', None, 200, None, 0, partition ) @@ -215,7 +218,7 @@ class Workload: cpu_trace, gpu_trace = self.compute_traces(cpu_util, gpu_util, 3600, config['TRACE_QUANTA']) job_info = job_dict( config['AVAILABLE_NODES'], - f"Idle Test {partition}", + f"Idle Test {partition}", account, cpu_trace, gpu_trace, net_tx, net_rx, len(gpu_trace) * config['TRACE_QUANTA'], 'COMPLETED', None, 0, None, 0, partition ) -- GitLab