Commit 5f0d7a64 authored by Maiterth, Matthias's avatar Maiterth, Matthias
Browse files

Update to refactor of accounts

1. Added accounts into main and accounts generation from jobs.
2. Updated Accounts fugaku points computation
3. Introduced Account and Accounts merging functionality for loading
   (TODO: merge previous changes and test -o flag and subsequent --accounts-json flag)
4. Revereted account specifc sorting, but added sorted=False flag to scheduler
parent 5e978691
Loading
Loading
Loading
Loading
+8 −3
Original line number Diff line number Diff line
@@ -125,8 +125,12 @@ if args.replay:
else:  # Synthetic jobs
    wl = Workload(config)
    jobs = getattr(wl, args.workload)(num_jobs=args.numjobs)
    accounts = Accounts(jobs)
    sc.accounts = accounts
    job_accounts = Accounts(jobs)
    if args.accounts_json:
        loaded_accounts = Accounts.initialize_accounts_from_json(args.accounts_json)
        accounts = loaded_accounts.merge(loaded_accounts,job_accounts)
    else:
        accounts = job_accounts

    if args.verbose:
        for job_vector in jobs:
@@ -144,6 +148,7 @@ else: # Synthetic jobs
OPATH = OUTPUT_PATH / DIR_NAME
print("Output directory is: ", OPATH)
sc.opath = OPATH
sc.accounts = accounts

if args.plot or args.output:
    try:
+46 −1
Original line number Diff line number Diff line
@@ -39,6 +39,11 @@ class Account:
            self.avg_power = self.time_allocated / self.energy_allocated
        self.fugaku_points = fugaku_points

    def update_fugaku_points(self, average_energy, average_power):
        if average_power == 0:
            raise ValueError(f"{average_power} is zero")
        self.fugaku_points = (average_energy - self.energy_allocated) / average_power

    def update_statistics(self, jobstats, average_user):
        self.total_jobs_completed += 1
        self.time_allocated += jobstats.run_time
@@ -50,7 +55,7 @@ class Account:
        if average_user.avg_power == 0:  # If this is the first job use own power
            average_user.avg_power = self.avg_power
        if average_user.avg_power != 0:  # If no energy was computed no points can be computed.
            self.fugaku_points = (average_user.energy_allocated - self.energy_allocated) / average_user.avg_power
            self.update_fugaku_points(average_user.energy_allocated, average_user.avg_power)

    def __repr__(self):
        return (f"Account(id={self.id}, name={self.name}), "
@@ -90,6 +95,30 @@ class Account:
        return acct


def merge_account_of_same_id(account1:Account, account2:Account, new_id) -> Account:
    merged_account = Account()
    if account1.name != account2.name:
        raise KeyError(f"{account1.name} != {account2.name}. Input arguments missmatch.")
    merged_account.name = account1.name
    merged_account.id = new_id  # This has to be relative to the Accounts Object and cannot be derived from the individual Account objects
    if account1.priority is 0:
        merged_account.priority = account2.priority
    elif account2.priority is 0:
        merged_account.priority = account1.priority
    else:
        raise ValueError("Priority Cannot be derived!")

    merged_account.total_jobs_enqueued = account1.total_jobs_enqueued + account2.total_jobs_enqueued
    merged_account.total_jobs_completed = account1.total_jobs_completed + account2.total_jobs_completed
    merged_account.time_allocated = account1.time_allocated + account2.time_allocated
    merged_account.energy_allocated = account1.energy_allocated + account2.energy_allocated
    if merged_account.energy_allocated != 0:
        merged_account.avg_power = merged_account.time_allocated / merged_account.energy_allocated
    else:
        merged_account.avg_power = 0
    merged_account.fugaku_points = None  # Needs to be invalidated, as averages are not known!


class Accounts:

    def update_average_user(self):
@@ -165,3 +194,19 @@ class Accounts:
        ret_dict['all_users'] = self.all_users.to_dict()
        ret_dict['average_user'] = self.average_user.to_dict()
        return ret_dict


def merge_accounts(accounts1: Accounts,accounts2: Accounts) -> Accounts:
    merged_accounts = Accounts()
    merged_accounts._account_id = len(accounts1.account_dict) + len(accounts2.account_dict)
    merged_accounts.account_dict = accounts1.account_dict
    for ac2_k, ac2_v in accounts2.account_dict.items():
        if ac2_k in accounts1.account_dict:
            merged_accounts.account_dict[ac2_k] = merge_account_of_same_id(accounts1.account_dict[ac2_k], accounts2.account_dict[ac2_k])
    # update all uers -> then update average user -> then fugagku points for all users (order is important!)
    merged_accounts.all_users = merge_account_of_same_id(accounts1.all_users,accounts2.all_users)
    merged_accounts.all_users.update_fugaku_points(merged_accounts.average_user.energy_allocated, merged_accounts.average_user.avg_power)
    merged_accounts.update_average_user()
    for ac_k, ac_v in merged_accounts.account_dict.items():
        merged_accounts[ac_k].update_fugaku_points(merged_accounts.average_user.energy_allocated, merged_accounts.average_user.avg_power)
    return merged_accounts
+1 −1
Original line number Diff line number Diff line
@@ -220,7 +220,7 @@ class Engine:
            #sort the queue according to the policy
            self.queue = self.scheduler.sort_jobs(self.queue, self.accounts)
            # Schedule jobs that are now in the queue.
            self.scheduler.schedule(self.queue, self.running, self.current_time)
            self.scheduler.schedule(self.queue, self.running, self.current_time, sorted = True)

            # Stop the simulation if no more jobs are running or in the queue.
            if autoshutdown and not self.queue and not self.running and not self.replay:
+4 −16
Original line number Diff line number Diff line
@@ -30,25 +30,13 @@ class Scheduler:
        elif self.policy == PolicyType.SJF:
            return sorted(queue, key=lambda job: job.wall_time)
        elif self.policy == PolicyType.PRIORITY:
            return self.sort_by_job_and_account_priority(queue, accounts)
            return sorted(queue, key=lambda job: job.priority, reverse=True)
        else:
            raise ValueError(f"Unknown policy type: {self.policy}")

    def sort_by_job_and_account_priority(self, queue, accounts=None):
        priority_tuple_list = []
        for job in queue:
            # create a tuple of the job and the priority
            priority = job.priority
            if job.account in accounts:
                priority += accounts[job.account].priority
                priority = max(priority, MAX_PRIORITY)
            priority_tuple_list.append((priority,job))
        priority_tuple_list = sorted(priority_tuple_list, key=lambda x:x[1], reverse=True)
        _, queue = zip(*priority_tuple_list)
        return queue

    def schedule(self, queue, running, current_time, debug=False):
    def schedule(self, queue, running, current_time, sorted=False, debug=False):
        # Sort the queue in place.
        if not sorted:
            queue[:] = self.sort_jobs(queue)

        # Iterate over a copy of the queue since we might remove items