diff --git a/main.py b/main.py index 58e714703e07eb677c09c6d658bcee66562066ce..7e8919d3fa498fd2e6e17777e6743b08bccb2a9b 100644 --- a/main.py +++ b/main.py @@ -29,9 +29,9 @@ from raps.engine import Engine from raps.job import Job from raps.telemetry import Telemetry from raps.workload import Workload +from raps.account import Accounts from raps.weather import Weather from raps.utils import create_casename, convert_to_seconds, write_dict_to_file, next_arrival -from raps.utils import toJSON config = ConfigManager(system_name=args.system).get_config() @@ -45,7 +45,7 @@ if args.cooling: args.layout = "layout2" if args_dict['start']: - cooling_model.weather = Weather(args_dict['start'], config = config) + cooling_model.weather = Weather(args_dict['start'], config=config) else: cooling_model = None @@ -87,14 +87,14 @@ if args.replay: DIR_NAME = create_casename() # Read telemetry data (either npz file or via custom data loader) - if args.replay[0].endswith(".npz"): # replay .npz file + if args.replay[0].endswith(".npz"): # Replay .npz file print(f"Loading {args.replay[0]}...") - jobs = td.load_snapshot(args.replay[0]) + jobs, accounts = td.load_snapshot(args.replay[0]) if args.scale: for job in tqdm(jobs, desc=f"Scaling jobs to {args.scale} nodes"): job['nodes_required'] = random.randint(1, args.scale) - job['requested_nodes'] = None # Setting to None triggers scheduler to assign nodes + job['requested_nodes'] = None # Setting to None triggers scheduler to assign nodes if args.reschedule == 'poisson': print("available nodes:", config['AVAILABLE_NODES']) @@ -104,11 +104,13 @@ if args.replay: elif args.reschedule == 'submit-time': raise NotImplementedError - else: # custom data loader print(*args.replay) jobs = td.load_data(args.replay) - td.save_snapshot(jobs, filename=DIR_NAME) + accounts = Accounts(jobs) + sc.accounts = accounts + accounts_dict = accounts.to_dict() + td.save_snapshot(jobs, accounts, filename=DIR_NAME) # Set number of timesteps based on the last job running which we assume # is the maximum value of submit_time + wall_time of all the jobs @@ -120,9 +122,15 @@ if args.replay: print(f'Simulating {len(jobs)} jobs for {timesteps} seconds') time.sleep(1) -else: # synthetic jobs +else: # Synthetic jobs wl = Workload(config) jobs = getattr(wl, args.workload)(num_jobs=args.numjobs) + job_accounts = Accounts(jobs) + if args.accounts_json: + loaded_accounts = Accounts.from_json_filename(args.accounts_json) + accounts = Accounts.merge(loaded_accounts,job_accounts) + else: + accounts = job_accounts if args.verbose: for job_vector in jobs: @@ -133,13 +141,14 @@ else: # synthetic jobs if args.time: timesteps = convert_to_seconds(args.time) else: - timesteps = 88200 # 24 hours + timesteps = 88200 # 24 hours DIR_NAME = create_casename() OPATH = OUTPUT_PATH / DIR_NAME print("Output directory is: ", OPATH) sc.opath = OPATH +sc.accounts = accounts if args.plot or args.output: try: @@ -231,8 +240,8 @@ if args.output: df.to_parquet(OPATH / 'util.parquet', engine='pyarrow') # Schedule history - schedule_history = pd.DataFrame(sc.get_history()) - schedule_history.to_csv(OPATH / "schedule_history.csv", index=False) + job_history = pd.DataFrame(sc.get_job_history_dict()) + job_history.to_csv(OPATH / "job_history.csv", index=False) try: with open(OPATH / 'stats.out', 'w') as f: @@ -246,3 +255,4 @@ if args.output: f.write(json_string) except TypeError: raise TypeError(f"{sc.accounts} could not be parsed by json.dump") + print("Output directory is: ", OPATH) # If output is enabled, the user wants this information as last output diff --git a/raps/account.py b/raps/account.py index 033bcb9f57b77f5a6e37411bd7ca77a59c20584c..5ff80b992cfd71e96428a9baaf0833b4e314e31e 100644 --- a/raps/account.py +++ b/raps/account.py @@ -18,18 +18,33 @@ class Account: RUNNING, COMPLETED, CANCELLED, FAILED, or TIMEOUT. """ - def __init__(self, id, name, priority): - self.id = id + def __init__(self, name, + priority=0, + jobs_enqueued=0, + jobs_completed=0, + time_allocated=0, + energy_allocated=0, + avg_power=0, + fugaku_points=0 + ): self.name = name self.priority = priority - self.total_jobs = 0 - self.time_allocated = 0 - self.energy_allocated = 0 - self.avg_power = 0 - self.fugaku_points = 0 + self.jobs_enqueued = jobs_enqueued + self.jobs_completed = jobs_completed + self.time_allocated = time_allocated + self.energy_allocated = energy_allocated + self.avg_power = avg_power + if self.avg_power == 0 and self.energy_allocated != 0: + self.avg_power = self.time_allocated / self.energy_allocated + self.fugaku_points = fugaku_points + + def update_fugaku_points(self, average_energy, average_power): + if average_power == 0: + raise ValueError(f"{average_power} is zero") + self.fugaku_points = (average_energy - self.energy_allocated) / average_power def update_statistics(self, jobstats, average_user): - self.total_jobs += 1 + self.jobs_completed += 1 self.time_allocated += jobstats.run_time self.energy_allocated += jobstats.energy if self.time_allocated == 0: @@ -39,12 +54,13 @@ class Account: if average_user.avg_power == 0: # If this is the first job use own power average_user.avg_power = self.avg_power if average_user.avg_power != 0: # If no energy was computed no points can be computed. - self.fugaku_points = (average_user.energy_allocated - self.energy_allocated) / average_user.avg_power + self.update_fugaku_points(average_user.energy_allocated, average_user.avg_power) def __repr__(self): - return (f"Account(id={self.id}, name={self.name}), " + return (f"Account(name={self.name}), " f"priority: {self.priority}, " - f"total_jobs: {self.total_jobs}, " + f"jobs_enqueued: {self.jobs_enqueued}, " + f"jobs_completed: {self.jobs_completed}, " f"time_allocated: {self.time_allocated}, " f"energy_allocated: {self.energy_allocated}, " f"avg_power: {self.avg_power}, " @@ -53,77 +69,155 @@ class Account: def to_dict(self): return { - "id": self.id, "name": self.name, "priority": self.priority, - "total_jobs":self.total_jobs, - "time_allocated":self.time_allocated, - "energy_allocated":self.energy_allocated, - "avg_power":self.avg_power, - "fugaku_points":self.fugaku_points + "jobs_enqueued": self.jobs_enqueued, + "jobs_completed": self.jobs_completed, + "time_allocated": self.time_allocated, + "energy_allocated": self.energy_allocated, + "avg_power": self.avg_power, + "fugaku_points": self.fugaku_points } @classmethod - def init_from_dict(acct, account_dict): # id ,name, priority, total_jobs, time_allocated, energy_allocated, avg_power, fugaku_points): - acct = Account(account_dict["id"],account_dict["name"],account_dict["priority"]) - acct.id = account_dict["id"] + def from_dict(acct, account_dict): + acct = Account(account_dict["name"], priority=account_dict["priority"]) acct.name = account_dict["name"] acct.priority = account_dict["priority"] - acct.total_jobs = account_dict["total_jobs"] + acct.jobs_enqueued = account_dict["jobs_enqueued"] + acct.jobs_completed = account_dict["jobs_completed"] acct.time_allocated = account_dict["time_allocated"] acct.energy_allocated = account_dict["energy_allocated"] acct.avg_power = account_dict["avg_power"] acct.fugaku_points = account_dict["fugaku_points"] return acct + @classmethod + def merge(cls,account1:'Account', account2:'Account') -> 'Account': + """ + Destructive merge + + Priorities are only set if one is zero or both are equal. + """ + if account1.name != account2.name: + raise KeyError(f"{account1.name} != {account2.name}. Input arguments missmatch.") + + merged_account = cls(account1.name) + + if account1.priority == account2.priority: + merged_account.priority = account1.priority + elif account1.priority == 0: + merged_account.priority = account2.priority + elif account2.priority == 0: + merged_account.priority = account1.priority + else: + raise ValueError("Priority Cannot be derived!") + + merged_account.jobs_enqueued = account1.jobs_enqueued + account2.jobs_enqueued + merged_account.jobs_completed = account1.jobs_completed + account2.jobs_completed + merged_account.time_allocated = account1.time_allocated + account2.time_allocated + merged_account.energy_allocated = account1.energy_allocated + account2.energy_allocated + if merged_account.energy_allocated != 0: + merged_account.avg_power = merged_account.time_allocated / merged_account.energy_allocated + else: + merged_account.avg_power = 0 + merged_account.fugaku_points = None # Needs to be invalidated, as averages are not known! + + account1 = None + account2 = None + + return merged_account + class Accounts: def update_average_user(self): total_accounts = len(self.account_dict) - self.average_user.total_jobs = self.all_users.total_jobs / total_accounts + self.average_user.jobs_enqueued = self.all_users.jobs_enqueued / total_accounts + self.average_user.jobs_completed = self.all_users.jobs_completed / total_accounts self.average_user.time_allocated = self.all_users.time_allocated / total_accounts self.average_user.energy_allocated = self.all_users.energy_allocated / total_accounts self.average_user.avg_power = self.all_users.avg_power / total_accounts - self.fugaku_points = self.all_users.fugaku_points / total_accounts # this should be 0 + if self.average_user.jobs_completed != 0.0: + self.average_user.update_fugaku_points(self.average_user.energy_allocated,self.average_user.avg_power) return self - def __init__(self): - self._account_id = 0 + def __init__(self, jobs=None): self.account_dict = dict() - self.all_users = Account(-2,"All_Users",0) - self.average_user = Account(-1,"Avg_User",0) + self.all_users = Account(-2, "All_Users") + self.average_user = Account(-1, "Avg_User") + if jobs: + if not isinstance(jobs,list): + raise TypeError + for job_dict in jobs: + if not isinstance(job_dict,dict): + raise TypeError + if job_dict["account"] not in self.account_dict: + self.account_dict[job_dict["account"]] = Account(job_dict["account"], jobs_enqueued=0) + self.account_dict[job_dict["account"]].jobs_enqueued += 1 + self.all_users.jobs_enqueued += 1 + self.update_average_user() + pass + + def updates_all_users_by_account(self,account:Account): + self.all_users.jobs_enqueued += account.jobs_enqueued + self.all_users.jobs_completed += account.jobs_completed + self.all_users.time_allocated += account.time_allocated + self.all_users.energy_allocated += account.energy_allocated + self.all_users.avg_power = self.energy_allocated / self.time_allocated + self.update_average_user() # Only necessary if averag_user was not updated before calling update all users. + # Therefore As this is needed for fugaku points this should always be called. + self.all_users.update_fugaku_points(self.average_user.energy_allocated,self.average_user.avg_power) + + - def initialize_accounts_from_json(self,filename): + def add_account(self, account:Account): + self.account_dict[account.name] = account + self.add_user_stats_to_all_users(account) + # update_average_user() is already called + + + @classmethod + def from_dict(cls, dictionary): + accounts = cls() + + if 'account_dict' not in dictionary: + raise KeyError("'account_dict' not in dictionary. Failed to restore.") + dicts_from_dictionary = dictionary['account_dict'] + accounts.account_dict = {} + if not isinstance(dicts_from_dictionary, dict): + raise KeyError("'account_dict' is not a dictionary. Failed to restore.") + for account_name, account_dict in dicts_from_dictionary.items(): + accounts.account_dict[account_name] = Account.from_dict(account_dict) + + if 'all_users' not in dictionary: + raise KeyError("'all_users' not in dictionary. Failed to restore.") + accounts.all_users = Account.from_dict(dictionary['all_users']) + + if 'average_user' not in dictionary: + raise KeyError("'average_user' not in dictionary. Failed to restore.") + accounts.average_user = Account.from_dict(dictionary['average_user']) + return accounts + + @classmethod + def from_json_filename(cls, filename): try: with open(filename, 'r', encoding='utf-8') as file: json_object = json.load(file) - if '_account_id' in json_object: - self._account_id = json_object['_account_id'] - if 'account_dict' in json_object: - json_dict = json_object['account_dict'] - self.account_dict = {} - for account_name,account_dict in json_dict.items(): - self.account_dict[account_name] = Account.init_from_dict(account_dict) - #self.account_dict = json_object['account_dict'] - - if 'all_users' in json_object: - self.all_users = Account.init_from_dict(json_object['all_users']) - if 'average_user' in json_object: - self.average_user = Account.init_from_dict(json_object['average_user']) + return cls.from_dict(json_object) except ValueError: raise ValueError(f"{file} could not be read using json.load()") - def update_account_statistics(self,jobstats): - #update specific account associated with job + def update_account_statistics(self, jobstats): + # Update specific account associated with job if isinstance(jobstats, JobStatistics): if jobstats.account not in self.account_dict: - self.account_dict[jobstats.account] = Account(self._account_id,jobstats.account,0) - self._account_id += 1 + raise ValueError(f"Account {jobstats.account} not registered in Accounts object {self}") + # self.account_dict[jobstats.account] = Account(jobstats.account) account = self.account_dict[jobstats.account] - account.update_statistics(jobstats,self.average_user) + account.update_statistics(jobstats, self.average_user) self.account_dict[jobstats.account] = account - #update the average_user account and the summary account + # Update the summary account (all_users) and the average_user account self.all_users.update_statistics(jobstats,self.average_user) self.update_average_user() @@ -132,8 +226,42 @@ class Accounts: for account_name,account in self.account_dict.items(): acct_dict[account_name] = account.to_dict() ret_dict = {} - ret_dict['_account_id'] = self._account_id ret_dict['account_dict'] = acct_dict ret_dict['all_users'] = self.all_users.to_dict() ret_dict['average_user'] = self.average_user.to_dict() return ret_dict + + @classmethod + def merge(cls, accounts1:'Accounts', accounts2:'Accounts') -> 'Accounts': + """ + Destructive merge of accounts + """ + merged_accounts = Accounts() + merged_accounts.account_dict = accounts1.account_dict + + for ac2_k, ac2_v in accounts2.account_dict.items(): + if ac2_k in accounts1.account_dict: + merged_accounts.account_dict[ac2_k] = Account.merge(accounts1.account_dict[ac2_k], accounts2.account_dict[ac2_k]) + else: + merged_accounts.account_dict[ac2_k] = ac2_v + for ac1_k, ac1_v in accounts1.account_dict.items(): + if ac1_k not in accounts2.account_dict: + merged_accounts.account_dict[ac1_k] = ac1_v + else: + # Already added above + pass + + # Update all users -> then update average user -> then fugagku points for all users (order is important!) + merged_accounts.all_users = Account.merge(accounts1.all_users,accounts2.all_users) + merged_accounts.update_average_user() + # Update to average user is needed before fugaku points can be caluculated. + if merged_accounts.all_users.jobs_completed != 0: + merged_accounts.all_users.update_fugaku_points(merged_accounts.average_user.energy_allocated, merged_accounts.average_user.avg_power) + + for ac_k, ac_v in merged_accounts.account_dict.items(): + if merged_accounts.account_dict[ac_k].jobs_completed != 0: + merged_accounts.account_dict[ac_k].update_fugaku_points(merged_accounts.average_user.energy_allocated, merged_accounts.average_user.avg_power) + + accounts1 = None + accounts2 = None + return merged_accounts diff --git a/raps/engine.py b/raps/engine.py index ccb27fab23ee136121db16a6c170ed3be05dd087..0886b3863bd3157bbe001ceeeb1dcfb76b7bb662 100644 --- a/raps/engine.py +++ b/raps/engine.py @@ -3,7 +3,6 @@ import dataclasses import pandas as pd from .job import Job, JobState -from .account import Accounts from .network import network_utilization from .utils import summarize_ranges, expand_ranges, get_utilization from .resmgr import ResourceManager @@ -30,20 +29,20 @@ class TickData: class Engine: """Job scheduling simulation engine.""" + def __init__(self, *, power_manager, flops_manager, cooling_model=None, config, **kwargs): self.config = config self.down_nodes = summarize_ranges(self.config['DOWN_NODES']) self.resource_manager = ResourceManager( total_nodes=self.config['TOTAL_NODES'], down_nodes=self.config['DOWN_NODES'] - ) + ) # Initialize running and queue, etc. self.running = [] self.queue = [] - self.accounts = Accounts() - if 'accounts_json' in kwargs and kwargs['accounts_json']: - self.accounts.initialize_accounts_from_json(kwargs.get('accounts_json')) + self.accounts = None + self.job_history_dict = [] self.jobs_completed = 0 self.current_time = 0 self.cooling_model = cooling_model @@ -54,7 +53,7 @@ class Engine: self.output = kwargs.get('output') self.replay = kwargs.get('replay') self.sys_util_history = [] - + # Get scheduler type from command-line args or default scheduler_type = kwargs.get('scheduler', 'default') self.scheduler = load_scheduler(scheduler_type)( @@ -64,16 +63,18 @@ class Engine: ) print(f"Using scheduler: {scheduler_type}") - - def add_job(self, job): - self.queue.append(job) - self.queue = self.scheduler.sort_jobs(self.queue) - + def eligible_jobs(self,jobs_to_submit): + eligible_jobs_list = [] + while jobs_to_submit and jobs_to_submit[0]['submit_time'] <= self.current_time: + job_info = jobs_to_submit.pop(0) + job = Job(job_info, self.current_time) + eligible_jobs_list.append(job) + return eligible_jobs_list def tick(self): """Simulate a timestep.""" completed_jobs = [job for job in self.running if job.end_time is not None and job.end_time <= self.current_time] - + # Simulate node failure newly_downed_nodes = self.resource_manager.node_failure(self.config['MTBF']) for node in newly_downed_nodes: @@ -112,7 +113,7 @@ class Engine: scheduled_nodes.append(job.scheduled_nodes) cpu_utils.append(cpu_util) gpu_utils.append(gpu_util) - + if len(scheduled_nodes) > 0: self.flops_manager.update_flop_state(scheduled_nodes, cpu_utils, gpu_utils) jobs_power = self.power_manager.update_power_state(scheduled_nodes, cpu_utils, gpu_utils, net_utils) @@ -130,11 +131,12 @@ class Engine: self.jobs_completed += 1 job_stats = job.statistics() self.accounts.update_account_statistics(job_stats) + self.job_history_dict.append(job_stats.__dict__) # Free the nodes via the resource manager. self.resource_manager.free_nodes_from_job(job) # Ask scheduler to schedule any jobs waiting in queue - self.scheduler.schedule(self.queue, self.running, self.current_time) + self.scheduler.schedule(self.queue, self.running, self.current_time, self.accounts) # Update the power array UI component rack_power, rect_losses = self.power_manager.compute_rack_power() @@ -169,7 +171,7 @@ class Engine: # FMU inputs are N powers and the wetbulb temp fmu_inputs = self.cooling_model.generate_fmu_inputs(runtime_values, - uncertainties=self.power_manager.uncertainties) + uncertainties=self.power_manager.uncertainties) cooling_inputs, cooling_outputs = ( self.cooling_model.step(self.current_time, fmu_inputs, self.config['POWER_UPDATE_FREQ']) ) @@ -199,7 +201,6 @@ class Engine: self.current_time += 1 return tick_data - def run_simulation(self, jobs, timesteps, autoshutdown=False): """Generator that yields after each simulation tick.""" self.timesteps = timesteps @@ -208,14 +209,13 @@ class Engine: jobs_to_submit = sorted(jobs, key=lambda j: j['submit_time']) for timestep in range(timesteps): - # Submit jobs whose submit_time is <= current_time - while jobs_to_submit and jobs_to_submit[0]['submit_time'] <= self.current_time: - job_info = jobs_to_submit.pop(0) - job = Job(job_info, self.current_time) - self.add_job(job) + # Identify eligible jobs and add them to the queue. + self.queue += self.eligible_jobs(jobs_to_submit) + # Sort the queue according to the policy + self.queue = self.scheduler.sort_jobs(self.queue, self.accounts) # Schedule jobs that are now in the queue. - self.scheduler.schedule(self.queue, self.running, self.current_time) + self.scheduler.schedule(self.queue, self.running, self.current_time, sorted=True) # Stop the simulation if no more jobs are running or in the queue. if autoshutdown and not self.queue and not self.running and not self.replay: @@ -227,7 +227,6 @@ class Engine: yield self.tick() - def get_stats(self): """ Return output statistics """ sum_values = lambda values: sum(x[1] for x in values) if values else 0 @@ -264,3 +263,6 @@ class Engine: } return stats + + def get_job_history_dict(self): + return self.job_history_dict diff --git a/raps/job.py b/raps/job.py index 6ce9f8d4749305ab01b41eb0673084f9aebcbe6e..c0b0e9be6dc69bbdce4ee1e3c0c9b324132f8de7 100644 --- a/raps/job.py +++ b/raps/job.py @@ -42,9 +42,7 @@ class Job: _id_counter = 0 def __init__(self, job_dict, current_time, state=JobState.PENDING, account=None): - for key, value in job_dict.items(): setattr(self, key, value) - if not self.id: self.id = Job._get_next_id() - # initializations + # Initializations: self.start_time = None self.end_time = None self.running_time = 0 @@ -53,6 +51,12 @@ class Job: self.power_history = [] self._state = state self.account = account + # If a job dict was given, override the values from the job_dict: + for key, value in job_dict.items(): + setattr(self, key, value) + # In any case: provide a job_id! + if not self.id: + self.id = Job._get_next_id() def __repr__(self): """Return a string representation of the job.""" @@ -100,7 +104,6 @@ class Job: return JobStatistics(self) - class JobStatistics: """ Reduced class for handling statistics after the job has finished. @@ -112,8 +115,11 @@ class JobStatistics: self.account = job.account self.num_nodes = len(job.scheduled_nodes) self.run_time = job.running_time + self.start_time = job.start_time + self.end_time = job.end_time + self.state = job._state if len(job.power_history) == 0: - self.avg_node_power = 0 + self.avg_node_power = 0 self.max_node_power = 0 else: self.avg_node_power = sum(job.power_history) / len(job.power_history) / self.num_nodes diff --git a/raps/schedulers/default.py b/raps/schedulers/default.py index 7ab664bf34dc7f0af3d790ef4e1ec1e9eb5c4ffb..c3291ac0440c54e65cc628151e803527d1540bcb 100644 --- a/raps/schedulers/default.py +++ b/raps/schedulers/default.py @@ -1,19 +1,20 @@ from enum import Enum -from ..job import Job, JobState from ..utils import summarize_ranges +from ..workload import MAX_PRIORITY + class PolicyType(Enum): """Supported scheduling policies.""" FCFS = 'fcfs' BACKFILL = 'backfill' PRIORITY = 'priority' + FUGAKU_PTS = 'fugaku_pts' SJF = 'sjf' class Scheduler: """ Default job scheduler with various scheduling policies. """ - def __init__(self, config, policy, resource_manager=None): self.config = config @@ -23,8 +24,7 @@ class Scheduler: self.resource_manager = resource_manager self.debug = False - - def sort_jobs(self, queue): + def sort_jobs(self, queue, accounts=None): """Sort jobs based on the selected scheduling policy.""" if self.policy == PolicyType.FCFS or self.policy == PolicyType.BACKFILL: return sorted(queue, key=lambda job: job.submit_time) @@ -32,13 +32,15 @@ class Scheduler: return sorted(queue, key=lambda job: job.wall_time) elif self.policy == PolicyType.PRIORITY: return sorted(queue, key=lambda job: job.priority, reverse=True) + elif self.policy == PolicyType.FUGAKU_PTS: + return self.sort_fugaku_redeeming(queue, accounts) else: raise ValueError(f"Unknown policy type: {self.policy}") - - def schedule(self, queue, running, current_time, debug=False): + def schedule(self, queue, running, current_time, accounts=None, sorted=False, debug=False): # Sort the queue in place. - queue[:] = self.sort_jobs(queue) + if not sorted: + queue[:] = self.sort_jobs(queue, accounts) # Iterate over a copy of the queue since we might remove items for job in queue[:]: @@ -63,9 +65,9 @@ class Scheduler: else: if self.policy == PolicyType.BACKFILL: # Try to find a backfill candidate from the entire queue. - backfill_job = self.find_backfill_job(queue, len(available_nodes), current_time) + backfill_job = self.find_backfill_job(queue, len(self.resource_manager.available_nodes), current_time) if backfill_job: - self.assign_nodes_to_job(backfill_job, available_nodes, current_time) + self.assign_nodes_to_job(backfill_job, self.resource_manager.available_nodes, current_time) running.append(backfill_job) queue.remove(backfill_job) if debug: @@ -75,8 +77,8 @@ class Scheduler: def find_backfill_job(self, queue, num_free_nodes, current_time): """Finds a backfill job based on available nodes and estimated completion times. - - Based on pseudocode from Leonenkov and Zhumatiy, 'Introducing new backfill-based + + Based on pseudocode from Leonenkov and Zhumatiy, 'Introducing new backfill-based scheduler for slurm resource manager.' Procedia computer science 66 (2015): 661-669. """ @@ -112,3 +114,35 @@ class Scheduler: return job return None + + def sort_fugaku_redeeming(self, queue, accounts=None): + if queue == []: + return queue + # Priority queues not yet implemented: + # Strategy: Sort by Fugaku Points Representing the Priority Queue + # Everything with negative Fugaku Points get sorted according to normal priority + priority_triple_list = [] + for job in queue: + fugaku_priority = accounts.account_dict[job.account].fugaku_points + # Create a tuple of the job and the priority + priority = job.priority + priority_triple_list.append((fugaku_priority,priority,job)) + # Sort everythin according to fugaku_points + priority_triple_list = sorted(priority_triple_list, key=lambda x:x[0], reverse=True) + # Find the first element with negative fugaku_points + for cutoff, triple in enumerate(priority_triple_list): + fugaku_priority, _, _ = triple + if fugaku_priority < 0: + break + first_part = priority_triple_list[:cutoff] + # Sort everything afterwards according to job priority + second_part = sorted(priority_triple_list[cutoff:], key=lambda x:x[1], reverse=True) + queue_a = [] + queue_b = [] + if first_part != []: + _, _, queue_a = zip(*first_part) + queue_a = list(queue_a) + if second_part != []: + _, _, queue_b = zip(*second_part) + queue_b = list(queue_b) + return queue_a + queue_b diff --git a/raps/telemetry.py b/raps/telemetry.py index 833ca39305cb4d2255ac3b28ab52aaa1a6379351..2616bc78d7198bb1b961012f64317c821e9b77a5 100644 --- a/raps/telemetry.py +++ b/raps/telemetry.py @@ -27,6 +27,7 @@ from tqdm import tqdm from .config import ConfigManager from .job import Job +from .account import Accounts from .plotting import plot_submit_times, plot_nodes_histogram from .utils import next_arrival @@ -39,45 +40,38 @@ class Telemetry: self.system = kwargs.get('system') self.config = kwargs.get('config') try: - self.dataloader = importlib.import_module(f".dataloaders.{self.system}", package = __package__) + self.dataloader = importlib.import_module(f".dataloaders.{self.system}", package=__package__) except: print("WARNING: Failed to load dataloader") - - def save_snapshot(self, jobs: list, filename: str): + def save_snapshot(self, jobs: list, accounts: dict, filename: str): """Saves a snapshot of the jobs to a compressed file. """ - np.savez_compressed(filename, jobs=jobs) - + np.savez_compressed(filename, jobs=jobs, accounts=accounts) - def load_snapshot(self, snapshot: str) -> list: + def load_snapshot(self, snapshot: str) -> (list, dict): """Reads a snapshot from a compressed file and returns the jobs.""" - jobs = np.load(snapshot, allow_pickle=True, mmap_mode='r') - return jobs['jobs'].tolist() - + jobs, accounts_dict = np.load(snapshot, allow_pickle=True, mmap_mode='r') + return jobs['jobs'].tolist(), Accounts.initialize_accounts_from_dict(accounts_dict) def load_data(self, files): """Load telemetry data using custom data loaders.""" return self.dataloader.load_data(files, **self.kwargs) - def load_data_from_df(self, *args, **kwargs): """Load telemetry data using custom data loaders.""" return self.dataloader.load_data_from_df(*args, **kwargs) - def node_index_to_name(self, index: int): """ Convert node index into a name""" - return self.dataloader.node_index_to_name(index, config = self.config) - + return self.dataloader.node_index_to_name(index, config=self.config) def cdu_index_to_name(self, index: int): """ Convert cdu index into a name""" - return self.dataloader.cdu_index_to_name(index, config = self.config) - + return self.dataloader.cdu_index_to_name(index, config=self.config) def cdu_pos(self, index: int) -> tuple[int, int]: """ Return (row, col) tuple for a cdu index """ - return self.dataloader.cdu_pos(index, config = self.config) + return self.dataloader.cdu_pos(index, config=self.config) if __name__ == "__main__": @@ -87,7 +81,6 @@ if __name__ == "__main__": args_dict['config'] = config td = Telemetry(**args_dict) - if args.replay[0].endswith(".npz"): print(f"Loading {args.replay[0]}...") jobs = td.load_snapshot(args.replay[0]) @@ -114,7 +107,8 @@ if __name__ == "__main__": dt = job.submit_time - last dt_list.append(dt) last = job.submit_time - if args.verbose: print(job) + if args.verbose: + print(job) print(f'Simulation will run for {timesteps} seconds') print(f'Average job arrival time is: {np.mean(dt_list):.2f}s') diff --git a/raps/ui.py b/raps/ui.py index 8bee172df2b524a9be38212eac72034b5e44047c..d71caf5bae7308a2d978aaa5745b01b84b093cb1 100644 --- a/raps/ui.py +++ b/raps/ui.py @@ -66,7 +66,7 @@ class LayoutManager: formatted_row = [func(cell) for func, cell in zip(format_funcs, row)] table.add_row(*formatted_row) - def calculate_totals(self, df): # 'Sum' and 'Loss' columns + def calculate_totals(self, df): # 'Sum' and 'Loss' columns total_power_kw = df[self.power_column].sum() + (self.config['NUM_CDUS'] * self.config['POWER_CDU'] / 1000.0) total_power_mw = total_power_kw / 1000.0 total_loss_kw = df[self.loss_column].sum() @@ -379,7 +379,7 @@ class LayoutManager: if self.engine.cooling_model: self.update_powertemp_array( data.power_df, data.fmu_outputs, data.p_flops, data.g_flops_w, data.system_util, - uncertainties = uncertainties, + uncertainties=uncertainties, ) self.update_pressflow_array(data.fmu_outputs) @@ -390,7 +390,7 @@ class LayoutManager: ) self.update_power_array( data.power_df, data.p_flops, data.g_flops_w, - data.system_util, uncertainties = uncertainties, + data.system_util, uncertainties=uncertainties, ) def render(self): diff --git a/raps/workload.py b/raps/workload.py index b368246a0c24c503e2a7ac9868a7b80b3dad4090..c5dc89845dc3248119ce808bec1401f232cf0242 100644 --- a/raps/workload.py +++ b/raps/workload.py @@ -111,7 +111,7 @@ class Workload: job_info = job_dict( config['AVAILABLE_NODES'], # Nodes required f"Max Test {partition}", # Name with partition label - ACCT_NAMES[0], # User account + ACCT_NAMES[0], # User account cpu_trace, # CPU trace gpu_trace, # GPU trace net_tx, # Network transmit trace