Commit a28d22da authored by Maiterth, Matthias's avatar Maiterth, Matthias
Browse files

Updated Accounts and Account class

- In Accounts, removed ID 'id' as the account name is used as an ID
- removed verbose wording 'total' from total_jobs_completed etc.
- Added merge classes to Account and Accounts and refactored previous version as classmethods.
- Added add_account Method to Accounts
- Refactored from_dict and from_json_filename

Engine:
- Re-added job history to work with the current version of job_dicts for -o
Therefore -o works again.

Tested with
-o
--accounts-json of previous output
smoketest
and normal runs without options.
parent 531ec149
Loading
Loading
Loading
Loading
+5 −4
Original line number Diff line number Diff line
@@ -127,8 +127,8 @@ else: # Synthetic jobs
    jobs = getattr(wl, args.workload)(num_jobs=args.numjobs)
    job_accounts = Accounts(jobs)
    if args.accounts_json:
        loaded_accounts = Accounts.initialize_accounts_from_json(args.accounts_json)
        accounts = loaded_accounts.merge(loaded_accounts,job_accounts)
        loaded_accounts = Accounts.from_json_filename(args.accounts_json)
        accounts = Accounts.merge(loaded_accounts,job_accounts)
    else:
        accounts = job_accounts

@@ -240,8 +240,8 @@ if args.output:
        df.to_parquet(OPATH / 'util.parquet', engine='pyarrow')

        # Schedule history
        schedule_history = pd.DataFrame(sc.get_history())
        schedule_history.to_csv(OPATH / "schedule_history.csv", index=False)
        job_history = pd.DataFrame(sc.get_job_history_dict())
        job_history.to_csv(OPATH / "job_history.csv", index=False)

        try:
            with open(OPATH / 'stats.out', 'w') as f:
@@ -255,3 +255,4 @@ if args.output:
                f.write(json_string)
        except TypeError:
            raise TypeError(f"{sc.accounts} could not be parsed by json.dump")
    print("Output directory is: ", OPATH)  # If output is enabled, the user wants this information as last output
+136 −81
Original line number Diff line number Diff line
@@ -18,20 +18,19 @@ class Account:
    RUNNING, COMPLETED, CANCELLED, FAILED, or TIMEOUT.
    """

    def __init__(self, id, name,
    def __init__(self, name,
                 priority=0,
                 total_jobs_enqueued=0,
                 total_jobs_completed=0,
                 jobs_enqueued=0,
                 jobs_completed=0,
                 time_allocated=0,
                 energy_allocated=0,
                 avg_power=0,
                 fugaku_points=0
                 ):
        self.id = id
        self.name = name
        self.priority = priority
        self.total_jobs_enqueued = total_jobs_enqueued
        self.total_jobs_completed = total_jobs_completed
        self.jobs_enqueued = jobs_enqueued
        self.jobs_completed = jobs_completed
        self.time_allocated = time_allocated
        self.energy_allocated = energy_allocated
        self.avg_power = avg_power
@@ -45,7 +44,7 @@ class Account:
        self.fugaku_points = (average_energy - self.energy_allocated) / average_power

    def update_statistics(self, jobstats, average_user):
        self.total_jobs_completed += 1
        self.jobs_completed += 1
        self.time_allocated += jobstats.run_time
        self.energy_allocated += jobstats.energy
        if self.time_allocated == 0:
@@ -58,10 +57,10 @@ class Account:
            self.update_fugaku_points(average_user.energy_allocated, average_user.avg_power)

    def __repr__(self):
        return (f"Account(id={self.id}, name={self.name}), "
        return (f"Account(name={self.name}), "
                f"priority: {self.priority}, "
                f"total_jobs_enqueued: {self.total_jobs_enqueued}, "
                f"total_jobs_completed: {self.total_jobs_completed}, "
                f"jobs_enqueued: {self.jobs_enqueued}, "
                f"jobs_completed: {self.jobs_completed}, "
                f"time_allocated: {self.time_allocated}, "
                f"energy_allocated: {self.energy_allocated}, "
                f"avg_power: {self.avg_power}, "
@@ -70,11 +69,10 @@ class Account:

    def to_dict(self):
        return {
            "id": self.id,
            "name": self.name,
            "priority": self.priority,
            "total_jobs_enqueued": self.total_jobs_enqueued,
            "total_jobs_completed": self.total_jobs_completed,
            "jobs_enqueued": self.jobs_enqueued,
            "jobs_completed": self.jobs_completed,
            "time_allocated": self.time_allocated,
            "energy_allocated": self.energy_allocated,
            "avg_power": self.avg_power,
@@ -82,34 +80,41 @@ class Account:
        }

    @classmethod
    def init_from_dict(acct, account_dict):  # id ,name, priority, total_jobs, time_allocated, energy_allocated, avg_power, fugaku_points):
        acct = Account(account_dict["id"], account_dict["name"], priority=account_dict["priority"])
        acct.id = account_dict["id"]
    def from_dict(acct, account_dict):  # name, priority, jobs_enqueue, jobs_completed, time_allocated, energy_allocated, avg_power, fugaku_points):
        acct = Account(account_dict["name"], priority=account_dict["priority"])
        acct.name = account_dict["name"]
        acct.priority = account_dict["priority"]
        acct.total_jobs = account_dict["total_jobs"]
        acct.jobs_enqueued = account_dict["jobs_enqueued"]
        acct.jobs_completed = account_dict["jobs_completed"]
        acct.time_allocated = account_dict["time_allocated"]
        acct.energy_allocated = account_dict["energy_allocated"]
        acct.avg_power = account_dict["avg_power"]
        acct.fugaku_points = account_dict["fugaku_points"]
        return acct

    @classmethod
    def merge(cls,account1:'Account', account2:'Account') -> 'Account':
        """
        Destructive merge

def merge_account_of_same_id(account1:Account, account2:Account, new_id) -> Account:
    merged_account = Account()
        Priorities are only set if one is zero or both are equal.
        """
        if account1.name != account2.name:
            raise KeyError(f"{account1.name} != {account2.name}. Input arguments missmatch.")
    merged_account.name = account1.name
    merged_account.id = new_id  # This has to be relative to the Accounts Object and cannot be derived from the individual Account objects
    if account1.priority is 0:

        merged_account = cls(account1.name)

        if account1.priority == account2.priority:
            merged_account.priority = account1.priority
        elif account1.priority == 0:
            merged_account.priority = account2.priority
    elif account2.priority is 0:
        elif account2.priority == 0:
            merged_account.priority = account1.priority
        else:
            raise ValueError("Priority Cannot be derived!")

    merged_account.total_jobs_enqueued = account1.total_jobs_enqueued + account2.total_jobs_enqueued
    merged_account.total_jobs_completed = account1.total_jobs_completed + account2.total_jobs_completed
        merged_account.jobs_enqueued = account1.jobs_enqueued + account2.jobs_enqueued
        merged_account.jobs_completed = account1.jobs_completed + account2.jobs_completed
        merged_account.time_allocated = account1.time_allocated + account2.time_allocated
        merged_account.energy_allocated = account1.energy_allocated + account2.energy_allocated
        if merged_account.energy_allocated != 0:
@@ -118,21 +123,26 @@ def merge_account_of_same_id(account1:Account, account2:Account, new_id) -> Acco
            merged_account.avg_power = 0
        merged_account.fugaku_points = None  # Needs to be invalidated, as averages are not known!

        account1 = None
        account2 = None

        return merged_account


class Accounts:

    def update_average_user(self):
        total_accounts = len(self.account_dict)
        self.average_user.total_jobs_enqueued = self.all_users.total_jobs_enqueued / total_accounts
        self.average_user.total_jobs_completed = self.all_users.total_jobs_completed / total_accounts
        self.average_user.jobs_enqueued = self.all_users.jobs_enqueued / total_accounts
        self.average_user.jobs_completed = self.all_users.jobs_completed / total_accounts
        self.average_user.time_allocated = self.all_users.time_allocated / total_accounts
        self.average_user.energy_allocated = self.all_users.energy_allocated / total_accounts
        self.average_user.avg_power = self.all_users.avg_power / total_accounts
        self.fugaku_points = self.all_users.fugaku_points / total_accounts  # this should be 0
        if self.average_user.jobs_completed != 0.0:
            self.average_user.update_fugaku_points(self.average_user.energy_allocated,self.average_user.avg_power)
        return self

    def __init__(self, jobs=None):
        self._account_id = 0
        self.account_dict = dict()
        self.all_users = Account(-2, "All_Users")
        self.average_user = Account(-1, "Avg_User")
@@ -143,30 +153,58 @@ class Accounts:
                if not isinstance(job_dict,dict):
                    raise TypeError
                if job_dict["account"] not in self.account_dict:
                    self.account_dict[job_dict["account"]] = Account(self._account_id, job_dict["account"], total_jobs_enqueued=1)
                    self._account_id += 1
                else:
                    self.account_dict[job_dict["account"]].total_jobs_enqueued += 1

    def initialize_accounts_from_dict(self, dictionary):
        if '_account_id' in dictionary:
            self._account_id = dictionary['_account_id']
        if 'account_dict' in dictionary:
            dics_from_dictionary = dictionary['account_dict']
            self.account_dict = {}
            for account_name, account_dict in dics_from_dictionary.items():
                self.account_dict[account_name] = Account.init_from_dict(account_dict)

        if 'all_users' in dictionary:
            self.all_users = Account.init_from_dict(dictionary['all_users'])
        if 'average_user' in dictionary:
            self.average_user = Account.init_from_dict(dictionary['average_user'])

    def initialize_accounts_from_json(self, filename):
                    self.account_dict[job_dict["account"]] = Account(job_dict["account"], jobs_enqueued=0)
                self.account_dict[job_dict["account"]].jobs_enqueued += 1
                self.all_users.jobs_enqueued += 1
            self.update_average_user()
        pass

    def updates_all_users_by_account(self,account:Account):
        self.all_users.jobs_enqueued += account.jobs_enqueued
        self.all_users.jobs_completed += account.jobs_completed
        self.all_users.time_allocated += account.time_allocated
        self.all_users.energy_allocated += account.energy_allocated
        self.all_users.avg_power = self.energy_allocated / self.time_allocated
        self.update_average_user()  # Only necessary if averag_user was not updated before calling update all users.
        # Therefore As this is needed for fugaku points this should always be called.
        self.all_users.update_fugaku_points(self.average_user.energy_allocated,self.average_user.avg_power)



    def add_account(self, account:Account):
        self.account_dict[account.name] = account
        self.add_user_stats_to_all_users(account)
        # update_average_user() is already called


    @classmethod
    def from_dict(cls, dictionary):
        accounts = cls()

        if 'account_dict' not in dictionary:
            raise KeyError("'account_dict' not in dictionary. Failed to restore.")
        dicts_from_dictionary = dictionary['account_dict']
        accounts.account_dict = {}
        if not isinstance(dicts_from_dictionary, dict):
            raise KeyError("'account_dict' is not a dictionary. Failed to restore.")
        for account_name, account_dict in dicts_from_dictionary.items():
            accounts.account_dict[account_name] = Account.from_dict(account_dict)

        if 'all_users' not in dictionary:
            raise KeyError("'all_users' not in dictionary. Failed to restore.")
        accounts.all_users = Account.from_dict(dictionary['all_users'])

        if 'average_user' not in dictionary:
            raise KeyError("'average_user' not in dictionary. Failed to restore.")
        accounts.average_user = Account.from_dict(dictionary['average_user'])
        return accounts

    @classmethod
    def from_json_filename(cls, filename):
        try:
            with open(filename, 'r', encoding='utf-8') as file:
                json_object = json.load(file)
            self.initialize_accounts_from_dict(json_object)
            return cls.from_dict(json_object)
        except ValueError:
            raise ValueError(f"{file} could not be read using json.load()")

@@ -175,8 +213,7 @@ class Accounts:
        if isinstance(jobstats, JobStatistics):
            if jobstats.account not in self.account_dict:
                raise ValueError(f"Account {jobstats.account} not registered in Accounts object {self}")
                # self.account_dict[jobstats.account] = Account(self._account_id, jobstats.account)
                # self._account_id += 1
                # self.account_dict[jobstats.account] = Account(jobstats.account)
            account = self.account_dict[jobstats.account]
            account.update_statistics(jobstats, self.average_user)
            self.account_dict[jobstats.account] = account
@@ -189,24 +226,42 @@ class Accounts:
        for account_name,account in self.account_dict.items():
            acct_dict[account_name] = account.to_dict()
        ret_dict = {}
        ret_dict['_account_id'] = self._account_id
        ret_dict['account_dict'] = acct_dict
        ret_dict['all_users'] = self.all_users.to_dict()
        ret_dict['average_user'] = self.average_user.to_dict()
        return ret_dict


def merge_accounts(accounts1: Accounts,accounts2: Accounts) -> Accounts:
    @classmethod
    def merge(cls, accounts1:'Accounts', accounts2:'Accounts') -> 'Accounts':
        """
        Destructive merge of accounts
        """
        merged_accounts = Accounts()
    merged_accounts._account_id = len(accounts1.account_dict) + len(accounts2.account_dict)
        merged_accounts.account_dict = accounts1.account_dict

        for ac2_k, ac2_v in accounts2.account_dict.items():
            if ac2_k in accounts1.account_dict:
            merged_accounts.account_dict[ac2_k] = merge_account_of_same_id(accounts1.account_dict[ac2_k], accounts2.account_dict[ac2_k])
                merged_accounts.account_dict[ac2_k] = Account.merge(accounts1.account_dict[ac2_k], accounts2.account_dict[ac2_k])
            else:
                merged_accounts.account_dict[ac2_k] = ac2_v
        for ac1_k, ac1_v in accounts1.account_dict.items():
            if ac1_k not in accounts2.account_dict:
                merged_accounts.account_dict[ac1_k] = ac1_v
            else:
                # Already added above
                pass

        # update all uers -> then update average user -> then fugagku points for all users (order is important!)
    merged_accounts.all_users = merge_account_of_same_id(accounts1.all_users,accounts2.all_users)
    merged_accounts.all_users.update_fugaku_points(merged_accounts.average_user.energy_allocated, merged_accounts.average_user.avg_power)
        merged_accounts.all_users = Account.merge(accounts1.all_users,accounts2.all_users)
        merged_accounts.update_average_user()
        # update to average user is needed before fugaku points can be caluculated.
        if merged_accounts.all_users.jobs_completed != 0:
            merged_accounts.all_users.update_fugaku_points(merged_accounts.average_user.energy_allocated, merged_accounts.average_user.avg_power)

        for ac_k, ac_v in merged_accounts.account_dict.items():
        merged_accounts[ac_k].update_fugaku_points(merged_accounts.average_user.energy_allocated, merged_accounts.average_user.avg_power)
            if merged_accounts.account_dict[ac_k].jobs_completed != 0:
                merged_accounts.account_dict[ac_k].update_fugaku_points(merged_accounts.average_user.energy_allocated, merged_accounts.average_user.avg_power)

        accounts1 = None
        accounts2 = None
        return merged_accounts
+5 −0
Original line number Diff line number Diff line
@@ -41,6 +41,7 @@ class Engine:
        self.running = []
        self.queue = []
        self.accounts = None
        self.job_history_dict = []
        self.jobs_completed = 0
        self.current_time = 0
        self.cooling_model = cooling_model
@@ -135,6 +136,7 @@ class Engine:
            self.jobs_completed += 1
            job_stats = job.statistics()
            self.accounts.update_account_statistics(job_stats)
            self.job_history_dict.append(job_stats.__dict__)
            # Free the nodes via the resource manager.
            self.resource_manager.free_nodes_from_job(job)

@@ -268,3 +270,6 @@ class Engine:
        }

        return stats

    def get_job_history_dict(self):
        return self.job_history_dict
+3 −0
Original line number Diff line number Diff line
@@ -115,6 +115,9 @@ class JobStatistics:
        self.account = job.account
        self.num_nodes = len(job.scheduled_nodes)
        self.run_time = job.running_time
        self.start_time = job.start_time
        self.end_time = job.end_time
        self.state = job._state
        if len(job.power_history) == 0:
            self.avg_node_power = 0
            self.max_node_power = 0