diff --git a/args.py b/args.py index a209120400dd6ce6a899b9ccb22042637e092411..2623eb62cdffd255f05e958975f887d5b1ca6929 100644 --- a/args.py +++ b/args.py @@ -34,6 +34,8 @@ parser.add_argument('-w', '--workload', type=str, choices=choices, default=choic choices = ['layout1', 'layout2'] parser.add_argument('-x', '--partitions', nargs='+', default=None, help='List of machine configurations to use, e.g., -x setonix-cpu setonix-gpu') parser.add_argument('--layout', type=str, choices=choices, default=choices[0], help='Layout of UI') +parser.add_argument('--accounts-json', type=str, help='Json of account stats generated in previous run. see raps/accounts.py') + args = parser.parse_args() args_dict = vars(args) print(args_dict) diff --git a/main.py b/main.py index c190ab1834d5a6ceca6e3f1dd232616127f32621..7338964dbdd7d261053fe7b6700382e1a009f6db 100644 --- a/main.py +++ b/main.py @@ -30,6 +30,7 @@ from raps.telemetry import Telemetry from raps.workload import Workload from raps.weather import Weather from raps.utils import create_casename, convert_to_seconds, write_dict_to_file, next_arrival +from raps.utils import toJSON config = ConfigManager(system_name=args.system).get_config() @@ -61,7 +62,7 @@ args_dict['config'] = config flops_manager = FLOPSManager(**args_dict) sc = Scheduler( - power_manager=power_manager, + power_manager=power_manager, flops_manager=flops_manager, cooling_model=cooling_model, **args_dict, @@ -70,7 +71,7 @@ layout_manager = LayoutManager(args.layout, scheduler=sc, debug=args.debug, **co if args.replay: - if args.fastforward: + if args.fastforward: args.fastforward = convert_to_seconds(args.fastforward) td = Telemetry(**args_dict) @@ -155,6 +156,12 @@ try: except: print(output_stats) +# Schedule history +pd.set_option('display.max_columns', None) +pd.set_option('display.max_rows', None) +schedule_history = pd.DataFrame(sc.get_history()) +print(schedule_history) + if args.plot: if 'power' in args.plot: pl = Plotter('Time (s)', 'Power (kW)', 'Power History', \ @@ -209,7 +216,7 @@ if args.output: if args.uncertainties: # Parquet cannot handle annotated ufloat format AFAIK - print('Data dump not implemented using uncertainties!') + print('Data dump not implemented using uncertainties!') else: if cooling_model: df = pd.DataFrame(cooling_model.fmu_history) @@ -229,3 +236,10 @@ if args.output: json.dump(output_stats, f, indent=4) except: write_dict_to_file(output_stats, OPATH / 'stats.out') + + try: + with open(OPATH / 'accounts.json', 'w') as f: + json_string = json.dumps(sc.accounts.to_dict()) + f.write(json_string) + except TypeError: + raise TypeError(f"{sc.accounts} could not be parsed by json.dump") diff --git a/raps/account.py b/raps/account.py new file mode 100644 index 0000000000000000000000000000000000000000..033bcb9f57b77f5a6e37411bd7ca77a59c20584c --- /dev/null +++ b/raps/account.py @@ -0,0 +1,139 @@ +""" +Module to capture Account classes Classes: +- Account: representation of an Account +- Accounts: collection of all accounts +""" +import json +from .job import JobStatistics + + +class Account: + """Represents an account of a user. + + Each users holds attributes for accounting and statistics, which is used + for summaries + Each job consists of various attributes such as the number of nodes required for execution, + CPU and GPU utilization, wall time, and other relevant parameters (see utils.job_dict). + The job can transition through different states during its lifecycle, including PENDING, + RUNNING, COMPLETED, CANCELLED, FAILED, or TIMEOUT. + """ + + def __init__(self, id, name, priority): + self.id = id + self.name = name + self.priority = priority + self.total_jobs = 0 + self.time_allocated = 0 + self.energy_allocated = 0 + self.avg_power = 0 + self.fugaku_points = 0 + + def update_statistics(self, jobstats, average_user): + self.total_jobs += 1 + self.time_allocated += jobstats.run_time + self.energy_allocated += jobstats.energy + if self.time_allocated == 0: + self.avg_power = 0 + else: + self.avg_power = self.energy_allocated / self.time_allocated + if average_user.avg_power == 0: # If this is the first job use own power + average_user.avg_power = self.avg_power + if average_user.avg_power != 0: # If no energy was computed no points can be computed. + self.fugaku_points = (average_user.energy_allocated - self.energy_allocated) / average_user.avg_power + + def __repr__(self): + return (f"Account(id={self.id}, name={self.name}), " + f"priority: {self.priority}, " + f"total_jobs: {self.total_jobs}, " + f"time_allocated: {self.time_allocated}, " + f"energy_allocated: {self.energy_allocated}, " + f"avg_power: {self.avg_power}, " + f"fugaku_points: {self.fugaku_points}, " + ) + + def to_dict(self): + return { + "id": self.id, + "name": self.name, + "priority": self.priority, + "total_jobs":self.total_jobs, + "time_allocated":self.time_allocated, + "energy_allocated":self.energy_allocated, + "avg_power":self.avg_power, + "fugaku_points":self.fugaku_points + } + + @classmethod + def init_from_dict(acct, account_dict): # id ,name, priority, total_jobs, time_allocated, energy_allocated, avg_power, fugaku_points): + acct = Account(account_dict["id"],account_dict["name"],account_dict["priority"]) + acct.id = account_dict["id"] + acct.name = account_dict["name"] + acct.priority = account_dict["priority"] + acct.total_jobs = account_dict["total_jobs"] + acct.time_allocated = account_dict["time_allocated"] + acct.energy_allocated = account_dict["energy_allocated"] + acct.avg_power = account_dict["avg_power"] + acct.fugaku_points = account_dict["fugaku_points"] + return acct + + +class Accounts: + + def update_average_user(self): + total_accounts = len(self.account_dict) + self.average_user.total_jobs = self.all_users.total_jobs / total_accounts + self.average_user.time_allocated = self.all_users.time_allocated / total_accounts + self.average_user.energy_allocated = self.all_users.energy_allocated / total_accounts + self.average_user.avg_power = self.all_users.avg_power / total_accounts + self.fugaku_points = self.all_users.fugaku_points / total_accounts # this should be 0 + return self + + def __init__(self): + self._account_id = 0 + self.account_dict = dict() + self.all_users = Account(-2,"All_Users",0) + self.average_user = Account(-1,"Avg_User",0) + + def initialize_accounts_from_json(self,filename): + try: + with open(filename, 'r', encoding='utf-8') as file: + json_object = json.load(file) + if '_account_id' in json_object: + self._account_id = json_object['_account_id'] + if 'account_dict' in json_object: + json_dict = json_object['account_dict'] + self.account_dict = {} + for account_name,account_dict in json_dict.items(): + self.account_dict[account_name] = Account.init_from_dict(account_dict) + #self.account_dict = json_object['account_dict'] + + if 'all_users' in json_object: + self.all_users = Account.init_from_dict(json_object['all_users']) + if 'average_user' in json_object: + self.average_user = Account.init_from_dict(json_object['average_user']) + except ValueError: + raise ValueError(f"{file} could not be read using json.load()") + + def update_account_statistics(self,jobstats): + #update specific account associated with job + if isinstance(jobstats, JobStatistics): + if jobstats.account not in self.account_dict: + self.account_dict[jobstats.account] = Account(self._account_id,jobstats.account,0) + self._account_id += 1 + account = self.account_dict[jobstats.account] + account.update_statistics(jobstats,self.average_user) + self.account_dict[jobstats.account] = account + #update the average_user account and the summary account + self.all_users.update_statistics(jobstats,self.average_user) + self.update_average_user() + + def to_dict(self): + acct_dict = {} + for account_name,account in self.account_dict.items(): + acct_dict[account_name] = account.to_dict() + ret_dict = {} + ret_dict['_account_id'] = self._account_id + ret_dict['account_dict'] = acct_dict + ret_dict['all_users'] = self.all_users.to_dict() + ret_dict['average_user'] = self.average_user.to_dict() + return ret_dict diff --git a/raps/dataloaders/adastraMI250.py b/raps/dataloaders/adastraMI250.py index b71772b3c6627b8d540810bef6700b034bc77279..1f20bafddb3d7e3b316061aad90ae053fc87d415 100644 --- a/raps/dataloaders/adastraMI250.py +++ b/raps/dataloaders/adastraMI250.py @@ -82,6 +82,7 @@ def load_data_from_df(jobs_df: pd.DataFrame, **kwargs): # Map dataframe to job state. Add results to jobs list for jidx in tqdm(range(num_jobs - 1), total=num_jobs, desc="Processing Jobs"): + account = jobs_df.loc[jidx, 'user_id'] # or 'group_id' job_id = jobs_df.loc[jidx, 'job_id'] if not jid == '*': @@ -163,7 +164,7 @@ def load_data_from_df(jobs_df: pd.DataFrame, **kwargs): if time_offset >= 0 and wall_time > 0: #print("start_time",time_start,"\tend_time",time_end,"\twall_time",wall_time,"\tquanta wall time",gpu_trace.size * TRACE_QUANTA ) - job_info = job_dict(nodes_required, name, cpu_trace, gpu_trace, [],[],wall_time, + job_info = job_dict(nodes_required, name, account, cpu_trace, gpu_trace, [],[],wall_time, end_state, scheduled_nodes, time_offset, job_id, priority) jobs.append(job_info) else: diff --git a/raps/dataloaders/frontier.py b/raps/dataloaders/frontier.py index bb04711781aca723096f714c9b5d9e7d66a34d1b..2f4af1775966f6491b7b9049b891bb77e860b175 100644 --- a/raps/dataloaders/frontier.py +++ b/raps/dataloaders/frontier.py @@ -95,6 +95,8 @@ def load_data_from_df(jobs_df: pd.DataFrame, jobprofile_df: pd.DataFrame, **kwar # Map dataframe to job state. Add results to jobs list for jidx in tqdm(range(num_jobs - 1), total=num_jobs, desc="Processing Jobs"): + user = jobs_df.loc[jidx, 'user'] + account = jobs_df.loc[jidx, 'account'] job_id = jobs_df.loc[jidx, 'job_id'] allocation_id = jobs_df.loc[jidx, 'allocation_id'] nodes_required = jobs_df.loc[jidx, 'node_count'] @@ -154,7 +156,7 @@ def load_data_from_df(jobs_df: pd.DataFrame, jobprofile_df: pd.DataFrame, **kwar scheduled_nodes.append(indices) if gpu_trace.size > 0 and (jid == job_id or jid == '*') and time_offset > 0: - job_info = job_dict(nodes_required, name, cpu_trace, gpu_trace, [], [], wall_time, + job_info = job_dict(nodes_required, name, account, cpu_trace, gpu_trace, [], [], wall_time, end_state, scheduled_nodes, time_offset, job_id, priority) jobs.append(job_info) diff --git a/raps/dataloaders/fugaku.py b/raps/dataloaders/fugaku.py index de1610fe3a54fcd868932003bbdb9fd2614d75f0..fcd4dbc799dbb3071e1d44e57d24c69a29c48c97 100644 --- a/raps/dataloaders/fugaku.py +++ b/raps/dataloaders/fugaku.py @@ -68,6 +68,7 @@ def load_data_from_df(df, **kwargs): # Loop through the DataFrame rows to extract job information for _, row in tqdm(df.iterrows(), total=len(df), desc="Processing Jobs"): nodes_required = row['nnumr'] if 'nnumr' in df.columns else 0 + account = row['usr'] name = row['jnam'] if 'jnam' in df.columns else 'unknown' if validate: @@ -95,6 +96,7 @@ def load_data_from_df(df, **kwargs): job_info = job_dict( nodes_required=nodes_required, name=name, + account=account, cpu_trace=cpu_trace, gpu_trace=gpu_trace, ntx_trace=[], diff --git a/raps/dataloaders/lassen.py b/raps/dataloaders/lassen.py index d307bd1cd952e1b04be190c5ea279a4cf0a271aa..ebdf348277d6d759f2c1a4aaf7824494564f6826 100644 --- a/raps/dataloaders/lassen.py +++ b/raps/dataloaders/lassen.py @@ -140,6 +140,7 @@ def load_data_from_df(allocation_df, node_df, step_df, **kwargs): job_info = job_dict(nodes_required, row['hashed_user_id'], + row['hashed_user_group_id'], cpu_trace, gpu_trace, net_tx, net_rx, wall_time, row['exit_status'], scheduled_nodes, diff --git a/raps/dataloaders/marconi100.py b/raps/dataloaders/marconi100.py index aa06c35c947e31efa8e5a012f94eee2ee6fc5b0f..1348f23e9dc94e03e58c69195ef84c40de2f60e1 100644 --- a/raps/dataloaders/marconi100.py +++ b/raps/dataloaders/marconi100.py @@ -85,6 +85,7 @@ def load_data_from_df(jobs_df: pd.DataFrame, **kwargs): # Map dataframe to job state. Add results to jobs list for jidx in tqdm(range(num_jobs - 1), total=num_jobs, desc="Processing Jobs"): + account = jobs_df.loc[jidx, 'user_id'] # or 'group_id' job_id = jobs_df.loc[jidx, 'job_id'] if not jid == '*': @@ -150,7 +151,7 @@ def load_data_from_df(jobs_df: pd.DataFrame, **kwargs): scheduled_nodes = (jobs_df.loc[jidx, 'nodes']).tolist() if gpu_trace.size > 0 and time_offset >= 0: - job_info = job_dict(nodes_required, name, cpu_trace, gpu_trace, [], [], wall_time, + job_info = job_dict(nodes_required, name, account, cpu_trace, gpu_trace, [], [], wall_time, end_state, scheduled_nodes, time_offset, job_id, priority) jobs.append(job_info) diff --git a/raps/job.py b/raps/job.py index 84b7f524047c8eef1ac0c335ed385a1598de8d45..6ce9f8d4749305ab01b41eb0673084f9aebcbe6e 100644 --- a/raps/job.py +++ b/raps/job.py @@ -1,11 +1,12 @@ from enum import Enum -def job_dict(nodes_required, name, cpu_trace, gpu_trace, ntx_trace, nrx_trace, \ +def job_dict(nodes_required, name, account, cpu_trace, gpu_trace, ntx_trace, nrx_trace, \ wall_time, end_state, scheduled_nodes, time_offset, job_id, priority=0, partition=0): """ Return job info dictionary """ return { 'nodes_required': nodes_required, 'name': name, + 'account': account, 'cpu_trace': cpu_trace, 'gpu_trace': gpu_trace, 'ntx_trace': ntx_trace, @@ -34,27 +35,29 @@ class Job: """Represents a job to be scheduled and executed in the distributed computing system. Each job consists of various attributes such as the number of nodes required for execution, - CPU and GPU utilization, wall time, and other relevant parameters (see utils.job_dict). - The job can transition through different states during its lifecycle, including PENDING, + CPU and GPU utilization, wall time, and other relevant parameters (see utils.job_dict). + The job can transition through different states during its lifecycle, including PENDING, RUNNING, COMPLETED, CANCELLED, FAILED, or TIMEOUT. """ _id_counter = 0 - def __init__(self, job_dict, current_time, state=JobState.PENDING): + def __init__(self, job_dict, current_time, state=JobState.PENDING, account=None): for key, value in job_dict.items(): setattr(self, key, value) - if not self.id: self.id = Job._get_next_id() + if not self.id: self.id = Job._get_next_id() # initializations self.start_time = None self.end_time = None self.running_time = 0 self.power = 0 self.scheduled_nodes = [] - self.power_history = [] + self.power_history = [] self._state = state + self.account = account def __repr__(self): """Return a string representation of the job.""" - return (f"Job(id={self.id}, name={self.name}, nodes_required={self.nodes_required}, " + return (f"Job(id={self.id}, name={self.name}, account={self.account}, " + f"nodes_required={self.nodes_required}, " f"cpu_trace={self.cpu_trace}, gpu_trace={self.gpu_trace}, wall_time={self.wall_time}, " f"end_state={self.end_state}, requested_nodes={self.requested_nodes}, " f"submit_time={self.submit_time}, start_time={self.start_time}, " @@ -90,3 +93,29 @@ class Job: """ cls._id_counter += 1 return cls._id_counter + + def statistics(self): + """ Derive job statistics from the Job Class and return + """ + return JobStatistics(self) + + + +class JobStatistics: + """ + Reduced class for handling statistics after the job has finished. + """ + + def __init__(self,job): + self.id = job.id + self.name = job.name + self.account = job.account + self.num_nodes = len(job.scheduled_nodes) + self.run_time = job.running_time + if len(job.power_history) == 0: + self.avg_node_power = 0 + self.max_node_power = 0 + else: + self.avg_node_power = sum(job.power_history) / len(job.power_history) / self.num_nodes + self.max_node_power = max(job.power_history) / self.num_nodes + self.energy = self.run_time * self.avg_node_power * self.num_nodes diff --git a/raps/scheduler.py b/raps/scheduler.py index e6acf2d73af0623b52f0334958ee856d7515e862..6103b2a1a14ad5cd86e30f56f2dca577b57ca300 100644 --- a/raps/scheduler.py +++ b/raps/scheduler.py @@ -46,9 +46,10 @@ from scipy.stats import weibull_min import pandas as pd from .job import Job, JobState +from .account import Accounts from .network import network_utilization from .policy import Policy, PolicyType -from .utils import summarize_ranges, expand_ranges +from .utils import summarize_ranges, expand_ranges, write_dict_to_file @dataclasses.dataclass @@ -88,6 +89,9 @@ class Scheduler: self.num_active_nodes = self.config['TOTAL_NODES'] - self.num_free_nodes - len(self.config['DOWN_NODES']) self.running = [] self.queue = [] + self.accounts = Accounts() + if 'accounts_json' in kwargs and kwargs['accounts_json']: + self.accounts.initialize_accounts_from_json(kwargs.get('accounts_json')) self.jobs_completed = 0 self.current_time = 0 self.cooling_model = cooling_model @@ -99,6 +103,7 @@ class Scheduler: self.replay = kwargs.get('replay') self.policy = Policy(strategy=kwargs.get('schedule')) self.sys_util_history = [] + self.history = [] def add_job(self, job): @@ -150,6 +155,7 @@ class Scheduler: # Schedule job self.assign_nodes_to_job(job) + self.history.append(dict(id=job.id, time=self.current_time, nodes=job.nodes_required, wall_time=job.wall_time)) if self.debug: scheduled_nodes = summarize_ranges(job.scheduled_nodes) @@ -172,12 +178,11 @@ class Scheduler: self.queue.append(job) # Note, this should be fixed. It shouldn't go to the end of the queue. break - def tick(self): """Simulate a timestep.""" completed_jobs = [job for job in self.running if job.end_time is not None and job.end_time <= self.current_time] - + completed_job_stats = [] # Simulate node failure newly_downed_nodes = self.node_failure(self.config['MTBF']) @@ -258,11 +263,16 @@ class Scheduler: if self.debug: print(f"Released {scheduled_nodes}") self.jobs_completed += 1 - + job_stats = job.statistics() + if self.debug: + print(job_stats) + completed_job_stats.append(job_stats) + self.accounts.update_account_statistics(job_stats) if self.output: + # output power trace with open(self.opath / f'job-power-{job.id}.txt', 'w') as file: print(*job.power_history, sep=', ', file=file) - + write_dict_to_file(vars(job_stats),self.opath / f'job-stats-{job.account}.json') # Ask scheduler to schedule any jobs waiting in queue self.schedule([]) @@ -289,7 +299,7 @@ class Scheduler: output_df = self.power_manager.get_power_df(rack_power, rack_loss) pflops = self.flops_manager.get_system_performance() / 1E15 gflop_per_watt = pflops * 1E6 / (total_power_kw * 1000) - else: + else: pflops, gflop_per_watt = None, None if self.current_time % self.config['POWER_UPDATE_FREQ'] == 0: @@ -297,14 +307,14 @@ class Scheduler: # Power for NUM_CDUS (25 for Frontier) cdu_power = rack_power.T[-1] * 1000 runtime_values = self.cooling_model.generate_runtime_values(cdu_power, self) - + # FMU inputs are N powers and the wetbulb temp fmu_inputs = self.cooling_model.generate_fmu_inputs(runtime_values, uncertainties=self.power_manager.uncertainties) cooling_inputs, cooling_outputs = ( self.cooling_model.step(self.current_time, fmu_inputs, self.config['POWER_UPDATE_FREQ']) ) - + # Get a dataframe of the power data power_df = self.power_manager.get_power_df(rack_power, rack_loss) else: @@ -349,9 +359,9 @@ class Scheduler: """ Generator that yields after each simulation tick """ last_submit_time = 0 self.timesteps = timesteps - if self.debug: + if self.debug: limits = self.get_gauge_limits() - + for timestep in range(timesteps): # Print the current timestep for this partition if timestep % self.config['UI_UPDATE_FREQ'] == 0: @@ -373,6 +383,9 @@ class Scheduler: if self.debug and timestep % self.config['UI_UPDATE_FREQ'] == 0: print(".", end="", flush=True) + def get_history(self): + return self.history + def get_stats(self): """ Return output statistics """ sum_values = lambda values : sum(x[1] for x in values) @@ -406,9 +419,9 @@ class Scheduler: 'jobs still running': [job.id for job in self.running], 'jobs still in queue': [job.id for job in self.queue], 'average power': f'{average_power_mw:.2f} MW', - 'min loss': f'{min_loss_mw:.2f} MW ({min_loss_pct*100:.2f}%)', + 'min loss': f'{min_loss_mw:.2f} MW ({min_loss_pct*100:.2f}%)', 'average loss': f'{average_loss_mw:.2f} MW ({loss_fraction*100:.2f}%)', - 'max loss': f'{max_loss_mw:.2f} MW ({max_loss_pct*100:.2f}%)', + 'max loss': f'{max_loss_mw:.2f} MW ({max_loss_pct*100:.2f}%)', 'system power efficiency': f'{efficiency*100:.2f}', 'total energy consumed': f'{total_energy_consumed:.2f} MW-hr', 'carbon emissions': f'{emissions:.2f} metric tons CO2', @@ -424,12 +437,12 @@ class Scheduler: # Create a NumPy array of node indices, excluding down nodes down_nodes = expand_ranges(self.down_nodes) - all_nodes = np.setdiff1d(np.arange(self.config['TOTAL_NODES']), + all_nodes = np.setdiff1d(np.arange(self.config['TOTAL_NODES']), np.array(down_nodes, dtype=int)) # Sample the Weibull distribution for all nodes at once - random_values = weibull_min.rvs(shape_parameter, - scale=scale_parameter, + random_values = weibull_min.rvs(shape_parameter, + scale=scale_parameter, size=all_nodes.size) # Identify nodes that have failed diff --git a/raps/telemetry.py b/raps/telemetry.py index aa2f61e0cf84dcbe6639462843972edd28eef538..9c006a2f1b10e0889ba0a708da5cdc13531684fc 100644 --- a/raps/telemetry.py +++ b/raps/telemetry.py @@ -12,10 +12,10 @@ import argparse if __name__ == "__main__": parser = argparse.ArgumentParser(description='Telemetry data validator') parser.add_argument('--jid', type=str, default='*', help='Replay job id') - parser.add_argument('-f', '--replay', nargs='+', type=str, + parser.add_argument('-f', '--replay', nargs='+', type=str, help='Either: path/to/joblive path/to/jobprofile' + \ ' -or- filename.npz (overrides --workload option)') - parser.add_argument('-p', '--plot', action='store_true', help='Output plots') + parser.add_argument('-p', '--plot', action='store_true', help='Output plots') parser.add_argument('--system', type=str, default='frontier', help='System config to use') parser.add_argument('--reschedule', action='store_true', help='Reschedule the telemetry workload') parser.add_argument('-v', '--verbose', action='store_true', help='Enable verbose output') @@ -76,7 +76,7 @@ class Telemetry: """ Convert cdu index into a name""" return self.dataloader.cdu_index_to_name(index, config = self.config) - + def cdu_pos(self, index: int) -> tuple[int, int]: """ Return (row, col) tuple for a cdu index """ return self.dataloader.cdu_pos(index, config = self.config) @@ -88,7 +88,7 @@ if __name__ == "__main__": config = ConfigManager(system_name=args.system).get_config() args_dict['config'] = config td = Telemetry(**args_dict) - + if args.replay[0].endswith(".npz"): print(f"Loading {args.replay[0]}...") diff --git a/raps/ui.py b/raps/ui.py index 05aec10079e844f06005f7cd7f581e7a959a3b4f..dd7725f510938cbce76402bf68adf3aef09e81af 100644 --- a/raps/ui.py +++ b/raps/ui.py @@ -86,7 +86,7 @@ class LayoutManager: Flag indicating whether to display node information (default is False). """ # Define columns with header styles - columns = ["JOBID", "WALL TIME", "NAME", "ST", "NODES", "NODE SEGMENTS"] + columns = ["JOBID", "WALL TIME", "NAME", "ACCOUNT", "ST", "NODES", "NODE SEGMENTS"] if show_nodes: columns.append("NODELIST") columns.append("TIME") @@ -111,6 +111,7 @@ class LayoutManager: str(job.id).zfill(5), convert_seconds(job.wall_time), str(job.name), + str(job.account), job.state.value, str(job.nodes_required), nodes_display, @@ -198,18 +199,18 @@ class LayoutManager: # Initialize data dictionary with keys from FMU_COLUMN_MAPPING fmu_cols = self.config['FMU_COLUMN_MAPPING'] data = {key: [] for key in fmu_cols.keys()} - + # Loop over each compute block in the datacenter_outputs dictionary for i in range(1, self.config['NUM_CDUS'] + 1): compute_block_key = f"simulator[1].datacenter[1].computeBlock[{i}].cdu[1].summary." - + # Append data to the corresponding lists dynamically using FMU_COLUMN_MAPPING keys for key in fmu_cols.keys(): data[key].append(cooling_outputs.get(compute_block_key + key)) - + # Convert to DataFrame df = pd.DataFrame(data) - + return df @@ -228,7 +229,7 @@ class LayoutManager: #power_columns = POWER_DF_HEADER[0:RACKS_PER_CDU + 2] + [POWER_DF_HEADER[-1]] # "CDU", "Rack 1", "Rack 2", "Rack 3", "Sum", "Loss" power_columns = self.power_df_header[0:self.racks_per_cdu + 2] + [self.power_df_header[-1]] # "CDU", "Rack 1", "Rack 2", "Rack 3", "Sum", "Loss" fmu_cols = self.config['FMU_COLUMN_MAPPING'] - + # Updated cooling keys to include temperature instead of pressure cooling_keys = ["T_prim_s_C", "T_prim_r_C", "T_sec_s_C", "T_sec_r_C"] @@ -335,7 +336,7 @@ class LayoutManager: for i, value in enumerate(row[display_columns]) ] table.add_row(*row_values) - + total_power_mw, total_loss_mw, percent_loss_str, total_power_kw, total_loss_kw = self.calculate_totals(power_df) # Convert to string with MW units diff --git a/raps/utils.py b/raps/utils.py index e0d79e9649b94ad0e05efda165ec03ee297f4a9c..6c9d4d763435383e9f9b6d81e1d817494b447354 100644 --- a/raps/utils.py +++ b/raps/utils.py @@ -15,6 +15,7 @@ import pandas as pd import random import sys import uuid +import json def convert_seconds(seconds): @@ -243,7 +244,7 @@ def create_binary_array_numpy(max_time, trace_quanta, util): def extract_data_csv(fileName, skiprows, header): """ Read passed csv file path - @ In, filename, dataframe, facility telemetry data + @ In, filename, dataframe, facility telemetry data @ In, skiprows, int, number of rows to be skipped @ In, header, list, header of output dataframe @ Out, df, dataframe, read file returned as a dataframe @@ -254,7 +255,7 @@ def extract_data_csv(fileName, skiprows, header): return df def resampledf(df, time_resampled): - """ Match key and return idx + """ Match key and return idx @ In, None @ Out, CDU_names, list, list of CDU names """ @@ -301,7 +302,7 @@ def create_casename(prefix=''): def next_arrival(lambda_rate): if not hasattr(next_arrival, 'next_time'): # Initialize the first time it's called - next_arrival.next_time = 0 + next_arrival.next_time = 0 else: next_arrival.next_time += \ -math.log(1.0 - random.random()) / lambda_rate @@ -316,15 +317,15 @@ def convert_to_seconds(time_str): 'm': 60, # 1 minute = 60 seconds 's': 1 # 1 second = 1 second } - + # Check if the input string ends with a unit or is purely numeric if time_str[-1].isdigit(): return int(time_str) # Directly return the number if it's purely numeric - + # Extract the numeric part and the time unit num = int(time_str[:-1]) unit = time_str[-1] - + # Convert to seconds using the conversion factors if unit in time_factors: return num * time_factors[unit] @@ -350,3 +351,12 @@ def write_dict_to_file(dictionary, file_path): file.write("}\n") else: file.write(f"{key}: {value}\n") + + +def toJSON(obj): + """Function to dump a json string from object""" + return json.dumps( + obj, + default=lambda o:o.__dict__, + sort_keys=True, + indent=4) diff --git a/raps/workload.py b/raps/workload.py index 6d22e5ed312b94e618536a99030f784eb2c338dc..14d91cc40388e12dfe3ddd1e9ad1adeecb8fe904 100644 --- a/raps/workload.py +++ b/raps/workload.py @@ -38,6 +38,9 @@ JOB_NAMES = ["LAMMPS", "GROMACS", "VASP", "Quantum ESPRESSO", "NAMD",\ "TensorFlow", "PyTorch", "BLAST", "Spark", "GAMESS",\ "ORCA", "Simulink", "MOOSE", "ELK"] +ACCT_NAMES = ["ACT01", "ACT02", "ACT03", "ACT04", "ACT05", "ACT06", "ACT07",\ + "ACT08", "ACT09", "ACT10", "ACT11", "ACT12", "ACT13", "ACT14"] + MAX_PRIORITY = 500000 from .utils import truncated_normalvariate, determine_state, next_arrival @@ -66,6 +69,7 @@ class Workload: nodes_required = random.randint(1, config['MAX_NODES_PER_JOB']) name = random.choice(JOB_NAMES) + account = random.choice(ACCT_NAMES) cpu_util = random.random() * config['CPUS_PER_NODE'] gpu_util = random.random() * config['GPUS_PER_NODE'] mu = (config['MAX_WALL_TIME'] + config['MIN_WALL_TIME']) / 2 @@ -79,7 +83,7 @@ class Workload: # Jobs arrive according to Poisson process time_to_next_job = next_arrival(1 / config['JOB_ARRIVAL_TIME']) - jobs.append(job_dict(nodes_required, name, cpu_trace, gpu_trace, net_tx, net_rx, \ + jobs.append(job_dict(nodes_required, name, account, cpu_trace, gpu_trace, net_tx, net_rx, \ wall_time, end_state, None, time_to_next_job, None, priority, partition)) return jobs @@ -108,6 +112,7 @@ class Workload: job_info = job_dict( config['AVAILABLE_NODES'], # Nodes required f"Max Test {partition}", # Name with partition label + ACCT_NAMES[0], # User account cpu_trace, # CPU trace gpu_trace, # GPU trace net_tx, # Network transmit trace @@ -145,6 +150,7 @@ class Workload: job_info = job_dict( config['AVAILABLE_NODES'], # Nodes required f"Idle Test {partition}", # Name with partition label + ACCT_NAMES[0], # User account cpu_trace, # CPU trace gpu_trace, # GPU trace net_tx, # Network transmit trace @@ -166,6 +172,7 @@ class Workload: # List to hold jobs for all partitions jobs = [] + account = ACCT_NAMES[0] # Iterate through each partition and its config for partition in self.partitions: @@ -177,8 +184,8 @@ class Workload: cpu_util, gpu_util = 1, 4 cpu_trace, gpu_trace = self.compute_traces(cpu_util, gpu_util, 10800, config['TRACE_QUANTA']) job_info = job_dict( - config['AVAILABLE_NODES'], - f"Max Test {partition}", + config['AVAILABLE_NODES'], + f"Max Test {partition}", account, cpu_trace, gpu_trace, net_tx, net_rx, len(gpu_trace) * config['TRACE_QUANTA'], 'COMPLETED', None, 100, None, 0, partition ) @@ -188,8 +195,8 @@ class Workload: cpu_util, gpu_util = 0, 4 cpu_trace, gpu_trace = self.compute_traces(cpu_util, gpu_util, 3600, config['TRACE_QUANTA']) job_info = job_dict( - config['AVAILABLE_NODES'], - f"OpenMxP {partition}", + config['AVAILABLE_NODES'], + f"OpenMxP {partition}", account, cpu_trace, gpu_trace, net_tx, net_rx, len(gpu_trace) * config['TRACE_QUANTA'], 'COMPLETED', None, 300, None, 0, partition ) @@ -199,8 +206,8 @@ class Workload: cpu_util, gpu_util = 0.33, 0.79 * 4 # based on 24-01-18 run cpu_trace, gpu_trace = self.compute_traces(cpu_util, gpu_util, 3600, config['TRACE_QUANTA']) job_info = job_dict( - config['AVAILABLE_NODES'], - f"HPL {partition}", + config['AVAILABLE_NODES'], + f"HPL {partition}", account, cpu_trace, gpu_trace, net_tx, net_rx, len(gpu_trace) * config['TRACE_QUANTA'], 'COMPLETED', None, 200, None, 0, partition ) @@ -210,8 +217,8 @@ class Workload: cpu_util, gpu_util = 0, 0 cpu_trace, gpu_trace = self.compute_traces(cpu_util, gpu_util, 3600, config['TRACE_QUANTA']) job_info = job_dict( - config['AVAILABLE_NODES'], - f"Idle Test {partition}", + config['AVAILABLE_NODES'], + f"Idle Test {partition}", account, cpu_trace, gpu_trace, net_tx, net_rx, len(gpu_trace) * config['TRACE_QUANTA'], 'COMPLETED', None, 0, None, 0, partition )