Commit 5e978691 authored by Maiterth, Matthias's avatar Maiterth, Matthias
Browse files

Initial refactor to derive accounts from jobs.

The intended way to obtain accounts is
1: Load account information from an account file (intended for a continued run tracking account statistics)
2: Derive from job information (If job information is stored in a npz file also the accounts should be stored in the npz,
such that both can simply be loaded and no conversion/derivation is necessary)

This is not yet fully tested.
parent a717d5c9
Loading
Loading
Loading
Loading
+11 −6
Original line number Diff line number Diff line
@@ -29,6 +29,7 @@ from raps.engine import Engine
from raps.job import Job
from raps.telemetry import Telemetry
from raps.workload import Workload
from raps.account import Accounts
from raps.weather import Weather
from raps.utils import create_casename, convert_to_seconds, write_dict_to_file, next_arrival

@@ -86,9 +87,9 @@ if args.replay:
        DIR_NAME = create_casename()

    # Read telemetry data (either npz file or via custom data loader)
    if args.replay[0].endswith(".npz"): # replay .npz file
    if args.replay[0].endswith(".npz"):  # Replay .npz file
        print(f"Loading {args.replay[0]}...")
        jobs = td.load_snapshot(args.replay[0])
        jobs, accounts = td.load_snapshot(args.replay[0])

        if args.scale:
            for job in tqdm(jobs, desc=f"Scaling jobs to {args.scale} nodes"):
@@ -103,11 +104,13 @@ if args.replay:
        elif args.reschedule == 'submit-time':
            raise NotImplementedError


    else:  # custom data loader
        print(*args.replay)
        jobs = td.load_data(args.replay)
        td.save_snapshot(jobs, filename=DIR_NAME)
        accounts = Accounts(jobs)
        sc.accounts = accounts
        accounts_dict = accounts.to_dict()
        td.save_snapshot(jobs, accounts, filename=DIR_NAME)

    # Set number of timesteps based on the last job running which we assume
    # is the maximum value of submit_time + wall_time of all the jobs
@@ -119,9 +122,11 @@ if args.replay:
    print(f'Simulating {len(jobs)} jobs for {timesteps} seconds')
    time.sleep(1)

else: # synthetic jobs
else:  # Synthetic jobs
    wl = Workload(config)
    jobs = getattr(wl, args.workload)(num_jobs=args.numjobs)
    accounts = Accounts(jobs)
    sc.accounts = accounts

    if args.verbose:
        for job_vector in jobs:
+67 −39
Original line number Diff line number Diff line
@@ -18,18 +18,29 @@ class Account:
    RUNNING, COMPLETED, CANCELLED, FAILED, or TIMEOUT.
    """

    def __init__(self, id, name, priority):
    def __init__(self, id, name,
                 priority=0,
                 total_jobs_enqueued=0,
                 total_jobs_completed=0,
                 time_allocated=0,
                 energy_allocated=0,
                 avg_power=0,
                 fugaku_points=0
                 ):
        self.id = id
        self.name = name
        self.priority = priority
        self.total_jobs = 0
        self.time_allocated = 0
        self.energy_allocated = 0
        self.avg_power = 0
        self.fugaku_points = 0
        self.total_jobs_enqueued = total_jobs_enqueued
        self.total_jobs_completed = total_jobs_completed
        self.time_allocated = time_allocated
        self.energy_allocated = energy_allocated
        self.avg_power = avg_power
        if self.avg_power == 0 and self.energy_allocated != 0:
            self.avg_power = self.time_allocated / self.energy_allocated
        self.fugaku_points = fugaku_points

    def update_statistics(self, jobstats, average_user):
        self.total_jobs += 1
        self.total_jobs_completed += 1
        self.time_allocated += jobstats.run_time
        self.energy_allocated += jobstats.energy
        if self.time_allocated == 0:
@@ -44,7 +55,8 @@ class Account:
    def __repr__(self):
        return (f"Account(id={self.id}, name={self.name}), "
                f"priority: {self.priority}, "
                f"total_jobs: {self.total_jobs}, "
                f"total_jobs_enqueued: {self.total_jobs_enqueued}, "
                f"total_jobs_completed: {self.total_jobs_completed}, "
                f"time_allocated: {self.time_allocated}, "
                f"energy_allocated: {self.energy_allocated}, "
                f"avg_power: {self.avg_power}, "
@@ -56,7 +68,8 @@ class Account:
            "id": self.id,
            "name": self.name,
            "priority": self.priority,
            "total_jobs":self.total_jobs,
            "total_jobs_enqueued": self.total_jobs_enqueued,
            "total_jobs_completed": self.total_jobs_completed,
            "time_allocated": self.time_allocated,
            "energy_allocated": self.energy_allocated,
            "avg_power": self.avg_power,
@@ -65,7 +78,7 @@ class Account:

    @classmethod
    def init_from_dict(acct, account_dict):  # id ,name, priority, total_jobs, time_allocated, energy_allocated, avg_power, fugaku_points):
        acct = Account(account_dict["id"],account_dict["name"],account_dict["priority"])
        acct = Account(account_dict["id"], account_dict["name"], priority=account_dict["priority"])
        acct.id = account_dict["id"]
        acct.name = account_dict["name"]
        acct.priority = account_dict["priority"]
@@ -81,49 +94,64 @@ class Accounts:

    def update_average_user(self):
        total_accounts = len(self.account_dict)
        self.average_user.total_jobs = self.all_users.total_jobs / total_accounts
        self.average_user.total_jobs_enqueued = self.all_users.total_jobs_enqueued / total_accounts
        self.average_user.total_jobs_completed = self.all_users.total_jobs_completed / total_accounts
        self.average_user.time_allocated = self.all_users.time_allocated / total_accounts
        self.average_user.energy_allocated = self.all_users.energy_allocated / total_accounts
        self.average_user.avg_power = self.all_users.avg_power / total_accounts
        self.fugaku_points = self.all_users.fugaku_points / total_accounts  # this should be 0
        return self

    def __init__(self):
    def __init__(self, jobs=None):
        self._account_id = 0
        self.account_dict = dict()
        self.all_users = Account(-2,"All_Users",0)
        self.average_user = Account(-1,"Avg_User",0)
        self.all_users = Account(-2, "All_Users")
        self.average_user = Account(-1, "Avg_User")
        if jobs:
            if not isinstance(jobs,list):
                raise TypeError
            for job_dict in jobs:
                if not isinstance(job_dict,dict):
                    raise TypeError
                if job_dict["account"] not in self.account_dict:
                    self.account_dict[job_dict["account"]] = Account(self._account_id, job_dict["account"], total_jobs_enqueued=1)
                    self._account_id += 1
                else:
                    self.account_dict[job_dict["account"]].total_jobs_enqueued += 1

    def initialize_accounts_from_dict(self, dictionary):
        if '_account_id' in dictionary:
            self._account_id = dictionary['_account_id']
        if 'account_dict' in dictionary:
            dics_from_dictionary = dictionary['account_dict']
            self.account_dict = {}
            for account_name, account_dict in dics_from_dictionary.items():
                self.account_dict[account_name] = Account.init_from_dict(account_dict)

        if 'all_users' in dictionary:
            self.all_users = Account.init_from_dict(dictionary['all_users'])
        if 'average_user' in dictionary:
            self.average_user = Account.init_from_dict(dictionary['average_user'])

    def initialize_accounts_from_json(self, filename):
        try:
            with open(filename, 'r', encoding='utf-8') as file:
                json_object = json.load(file)
            if '_account_id' in json_object:
                self._account_id = json_object['_account_id']
            if 'account_dict' in json_object:
                json_dict = json_object['account_dict']
                self.account_dict = {}
                for account_name,account_dict in json_dict.items():
                    self.account_dict[account_name] = Account.init_from_dict(account_dict)
                #self.account_dict = json_object['account_dict']

            if 'all_users' in json_object:
                self.all_users = Account.init_from_dict(json_object['all_users'])
            if 'average_user' in json_object:
                self.average_user = Account.init_from_dict(json_object['average_user'])
            self.initialize_accounts_from_dict(json_object)
        except ValueError:
            raise ValueError(f"{file} could not be read using json.load()")

    def update_account_statistics(self, jobstats):
        #update specific account associated with job
        # Update specific account associated with job
        if isinstance(jobstats, JobStatistics):
            if jobstats.account not in self.account_dict:
                self.account_dict[jobstats.account] = Account(self._account_id,jobstats.account,0)  # new account from job's.account_id with priority 0
                self._account_id += 1
                raise ValueError(f"Account {jobstats.account} not registered in Accounts object {self}")
                # self.account_dict[jobstats.account] = Account(self._account_id, jobstats.account)
                # self._account_id += 1
            account = self.account_dict[jobstats.account]
            account.update_statistics(jobstats, self.average_user)
            self.account_dict[jobstats.account] = account
            #update the average_user account and the summary account
            # Update the  summary account (all_users) and the average_user account
            self.all_users.update_statistics(jobstats,self.average_user)
            self.update_average_user()

+1 −3
Original line number Diff line number Diff line
@@ -41,9 +41,7 @@ class Engine:
        # Initialize running and queue, etc.
        self.running = []
        self.queue = []
        self.accounts = Accounts()
        if 'accounts_json' in kwargs and kwargs['accounts_json']:
            self.accounts.initialize_accounts_from_json(kwargs.get('accounts_json'))
        self.accounts = None
        self.jobs_completed = 0
        self.current_time = 0
        self.cooling_model = cooling_model
+6 −5
Original line number Diff line number Diff line
@@ -27,6 +27,7 @@ from tqdm import tqdm

from .config import ConfigManager
from .job import Job
from .account import Accounts
from .plotting import plot_submit_times, plot_nodes_histogram
from .utils import next_arrival

@@ -44,15 +45,15 @@ class Telemetry:
            print("WARNING: Failed to load dataloader")


    def save_snapshot(self, jobs: list, filename: str):
    def save_snapshot(self, jobs: list, accounts: dict, filename: str):
        """Saves a snapshot of the jobs to a compressed file. """
        np.savez_compressed(filename, jobs=jobs)
        np.savez_compressed(filename, jobs=jobs, accounts=accounts)


    def load_snapshot(self, snapshot: str) -> list:
    def load_snapshot(self, snapshot: str) -> (list, dict):
        """Reads a snapshot from a compressed file and returns the jobs."""
        jobs = np.load(snapshot, allow_pickle=True, mmap_mode='r')
        return jobs['jobs'].tolist()
        jobs, accounts_dict = np.load(snapshot, allow_pickle=True, mmap_mode='r')
        return jobs['jobs'].tolist(), Accounts.initialize_accounts_from_dict(accounts_dict)


    def load_data(self, files):
+2 −2

File changed.

Contains only whitespace changes.

+1 −1

File changed.

Contains only whitespace changes.

Loading