Loading raps/account.py +3 −3 Original line number Diff line number Diff line Loading @@ -80,7 +80,7 @@ class Account: } @classmethod def from_dict(acct, account_dict): # name, priority, jobs_enqueue, jobs_completed, time_allocated, energy_allocated, avg_power, fugaku_points): def from_dict(acct, account_dict): acct = Account(account_dict["name"], priority=account_dict["priority"]) acct.name = account_dict["name"] acct.priority = account_dict["priority"] Loading Loading @@ -251,10 +251,10 @@ class Accounts: # Already added above pass # update all uers -> then update average user -> then fugagku points for all users (order is important!) # Update all users -> then update average user -> then fugagku points for all users (order is important!) merged_accounts.all_users = Account.merge(accounts1.all_users,accounts2.all_users) merged_accounts.update_average_user() # update to average user is needed before fugaku points can be caluculated. # Update to average user is needed before fugaku points can be caluculated. if merged_accounts.all_users.jobs_completed != 0: merged_accounts.all_users.update_fugaku_points(merged_accounts.average_user.energy_allocated, merged_accounts.average_user.avg_power) Loading raps/job.py +3 −3 Original line number Diff line number Diff line Loading @@ -42,7 +42,7 @@ class Job: _id_counter = 0 def __init__(self, job_dict, current_time, state=JobState.PENDING, account=None): # initializations # Initializations: self.start_time = None self.end_time = None self.running_time = 0 Loading @@ -51,10 +51,10 @@ class Job: self.power_history = [] self._state = state self.account = account # if a job dict was given, override the values from the job_dict: # If a job dict was given, override the values from the job_dict: for key, value in job_dict.items(): setattr(self, key, value) # in any case: provide a job_id! # In any case: provide a job_id! if not self.id: self.id = Job._get_next_id() Loading raps/schedulers/default.py +2 −2 Original line number Diff line number Diff line from enum import Enum #from ..job import Job, JobState # Unused from ..utils import summarize_ranges from ..workload import MAX_PRIORITY class PolicyType(Enum): """Supported scheduling policies.""" FCFS = 'fcfs' Loading Loading @@ -124,7 +124,7 @@ class Scheduler: priority_triple_list = [] for job in queue: fugaku_priority = accounts.account_dict[job.account].fugaku_points # create a tuple of the job and the priority # Create a tuple of the job and the priority priority = job.priority priority_triple_list.append((fugaku_priority,priority,job)) # Sort everythin according to fugaku_points Loading raps/telemetry.py +6 −13 Original line number Diff line number Diff line Loading @@ -44,38 +44,31 @@ class Telemetry: except: print("WARNING: Failed to load dataloader") def save_snapshot(self, jobs: list, accounts: dict, filename: str): """Saves a snapshot of the jobs to a compressed file. """ np.savez_compressed(filename, jobs=jobs, accounts=accounts) def load_snapshot(self, snapshot: str) -> (list, dict): """Reads a snapshot from a compressed file and returns the jobs.""" jobs, accounts_dict = np.load(snapshot, allow_pickle=True, mmap_mode='r') return jobs['jobs'].tolist(), Accounts.initialize_accounts_from_dict(accounts_dict) def load_data(self, files): """Load telemetry data using custom data loaders.""" return self.dataloader.load_data(files, **self.kwargs) def load_data_from_df(self, *args, **kwargs): """Load telemetry data using custom data loaders.""" return self.dataloader.load_data_from_df(*args, **kwargs) def node_index_to_name(self, index: int): """ Convert node index into a name""" return self.dataloader.node_index_to_name(index, config=self.config) def cdu_index_to_name(self, index: int): """ Convert cdu index into a name""" return self.dataloader.cdu_index_to_name(index, config=self.config) def cdu_pos(self, index: int) -> tuple[int, int]: """ Return (row, col) tuple for a cdu index """ return self.dataloader.cdu_pos(index, config=self.config) Loading @@ -88,7 +81,6 @@ if __name__ == "__main__": args_dict['config'] = config td = Telemetry(**args_dict) if args.replay[0].endswith(".npz"): print(f"Loading {args.replay[0]}...") jobs = td.load_snapshot(args.replay[0]) Loading @@ -115,7 +107,8 @@ if __name__ == "__main__": dt = job.submit_time - last dt_list.append(dt) last = job.submit_time if args.verbose: print(job) if args.verbose: print(job) print(f'Simulation will run for {timesteps} seconds') print(f'Average job arrival time is: {np.mean(dt_list):.2f}s') Loading raps/ui.py +1 −1 File changed.Contains only whitespace changes. Show changes Loading
raps/account.py +3 −3 Original line number Diff line number Diff line Loading @@ -80,7 +80,7 @@ class Account: } @classmethod def from_dict(acct, account_dict): # name, priority, jobs_enqueue, jobs_completed, time_allocated, energy_allocated, avg_power, fugaku_points): def from_dict(acct, account_dict): acct = Account(account_dict["name"], priority=account_dict["priority"]) acct.name = account_dict["name"] acct.priority = account_dict["priority"] Loading Loading @@ -251,10 +251,10 @@ class Accounts: # Already added above pass # update all uers -> then update average user -> then fugagku points for all users (order is important!) # Update all users -> then update average user -> then fugagku points for all users (order is important!) merged_accounts.all_users = Account.merge(accounts1.all_users,accounts2.all_users) merged_accounts.update_average_user() # update to average user is needed before fugaku points can be caluculated. # Update to average user is needed before fugaku points can be caluculated. if merged_accounts.all_users.jobs_completed != 0: merged_accounts.all_users.update_fugaku_points(merged_accounts.average_user.energy_allocated, merged_accounts.average_user.avg_power) Loading
raps/job.py +3 −3 Original line number Diff line number Diff line Loading @@ -42,7 +42,7 @@ class Job: _id_counter = 0 def __init__(self, job_dict, current_time, state=JobState.PENDING, account=None): # initializations # Initializations: self.start_time = None self.end_time = None self.running_time = 0 Loading @@ -51,10 +51,10 @@ class Job: self.power_history = [] self._state = state self.account = account # if a job dict was given, override the values from the job_dict: # If a job dict was given, override the values from the job_dict: for key, value in job_dict.items(): setattr(self, key, value) # in any case: provide a job_id! # In any case: provide a job_id! if not self.id: self.id = Job._get_next_id() Loading
raps/schedulers/default.py +2 −2 Original line number Diff line number Diff line from enum import Enum #from ..job import Job, JobState # Unused from ..utils import summarize_ranges from ..workload import MAX_PRIORITY class PolicyType(Enum): """Supported scheduling policies.""" FCFS = 'fcfs' Loading Loading @@ -124,7 +124,7 @@ class Scheduler: priority_triple_list = [] for job in queue: fugaku_priority = accounts.account_dict[job.account].fugaku_points # create a tuple of the job and the priority # Create a tuple of the job and the priority priority = job.priority priority_triple_list.append((fugaku_priority,priority,job)) # Sort everythin according to fugaku_points Loading
raps/telemetry.py +6 −13 Original line number Diff line number Diff line Loading @@ -44,38 +44,31 @@ class Telemetry: except: print("WARNING: Failed to load dataloader") def save_snapshot(self, jobs: list, accounts: dict, filename: str): """Saves a snapshot of the jobs to a compressed file. """ np.savez_compressed(filename, jobs=jobs, accounts=accounts) def load_snapshot(self, snapshot: str) -> (list, dict): """Reads a snapshot from a compressed file and returns the jobs.""" jobs, accounts_dict = np.load(snapshot, allow_pickle=True, mmap_mode='r') return jobs['jobs'].tolist(), Accounts.initialize_accounts_from_dict(accounts_dict) def load_data(self, files): """Load telemetry data using custom data loaders.""" return self.dataloader.load_data(files, **self.kwargs) def load_data_from_df(self, *args, **kwargs): """Load telemetry data using custom data loaders.""" return self.dataloader.load_data_from_df(*args, **kwargs) def node_index_to_name(self, index: int): """ Convert node index into a name""" return self.dataloader.node_index_to_name(index, config=self.config) def cdu_index_to_name(self, index: int): """ Convert cdu index into a name""" return self.dataloader.cdu_index_to_name(index, config=self.config) def cdu_pos(self, index: int) -> tuple[int, int]: """ Return (row, col) tuple for a cdu index """ return self.dataloader.cdu_pos(index, config=self.config) Loading @@ -88,7 +81,6 @@ if __name__ == "__main__": args_dict['config'] = config td = Telemetry(**args_dict) if args.replay[0].endswith(".npz"): print(f"Loading {args.replay[0]}...") jobs = td.load_snapshot(args.replay[0]) Loading @@ -115,7 +107,8 @@ if __name__ == "__main__": dt = job.submit_time - last dt_list.append(dt) last = job.submit_time if args.verbose: print(job) if args.verbose: print(job) print(f'Simulation will run for {timesteps} seconds') print(f'Average job arrival time is: {np.mean(dt_list):.2f}s') Loading