Commit 048784a2 authored by Brewer, Wes's avatar Brewer, Wes
Browse files

Merge branch 'scheduler-queue-and-sort' into 'main'

First working version of Fugaku Points

See merge request !73
parents ac22834f 1dfed1d0
Loading
Loading
Loading
Loading
+21 −11
Original line number Diff line number Diff line
@@ -29,9 +29,9 @@ from raps.engine import Engine
from raps.job import Job
from raps.telemetry import Telemetry
from raps.workload import Workload
from raps.account import Accounts
from raps.weather import Weather
from raps.utils import create_casename, convert_to_seconds, write_dict_to_file, next_arrival
from raps.utils import toJSON

config = ConfigManager(system_name=args.system).get_config()

@@ -87,9 +87,9 @@ if args.replay:
        DIR_NAME = create_casename()

    # Read telemetry data (either npz file or via custom data loader)
    if args.replay[0].endswith(".npz"): # replay .npz file
    if args.replay[0].endswith(".npz"):  # Replay .npz file
        print(f"Loading {args.replay[0]}...")
        jobs = td.load_snapshot(args.replay[0])
        jobs, accounts = td.load_snapshot(args.replay[0])

        if args.scale:
            for job in tqdm(jobs, desc=f"Scaling jobs to {args.scale} nodes"):
@@ -104,11 +104,13 @@ if args.replay:
        elif args.reschedule == 'submit-time':
            raise NotImplementedError


    else:  # custom data loader
        print(*args.replay)
        jobs = td.load_data(args.replay)
        td.save_snapshot(jobs, filename=DIR_NAME)
        accounts = Accounts(jobs)
        sc.accounts = accounts
        accounts_dict = accounts.to_dict()
        td.save_snapshot(jobs, accounts, filename=DIR_NAME)

    # Set number of timesteps based on the last job running which we assume
    # is the maximum value of submit_time + wall_time of all the jobs
@@ -120,9 +122,15 @@ if args.replay:
    print(f'Simulating {len(jobs)} jobs for {timesteps} seconds')
    time.sleep(1)

else: # synthetic jobs
else:  # Synthetic jobs
    wl = Workload(config)
    jobs = getattr(wl, args.workload)(num_jobs=args.numjobs)
    job_accounts = Accounts(jobs)
    if args.accounts_json:
        loaded_accounts = Accounts.from_json_filename(args.accounts_json)
        accounts = Accounts.merge(loaded_accounts,job_accounts)
    else:
        accounts = job_accounts

    if args.verbose:
        for job_vector in jobs:
@@ -140,6 +148,7 @@ else: # synthetic jobs
OPATH = OUTPUT_PATH / DIR_NAME
print("Output directory is: ", OPATH)
sc.opath = OPATH
sc.accounts = accounts

if args.plot or args.output:
    try:
@@ -231,8 +240,8 @@ if args.output:
        df.to_parquet(OPATH / 'util.parquet', engine='pyarrow')

        # Schedule history
        schedule_history = pd.DataFrame(sc.get_history())
        schedule_history.to_csv(OPATH / "schedule_history.csv", index=False)
        job_history = pd.DataFrame(sc.get_job_history_dict())
        job_history.to_csv(OPATH / "job_history.csv", index=False)

        try:
            with open(OPATH / 'stats.out', 'w') as f:
@@ -246,3 +255,4 @@ if args.output:
                f.write(json_string)
        except TypeError:
            raise TypeError(f"{sc.accounts} could not be parsed by json.dump")
    print("Output directory is: ", OPATH)  # If output is enabled, the user wants this information as last output
+176 −48
Original line number Diff line number Diff line
@@ -18,18 +18,33 @@ class Account:
    RUNNING, COMPLETED, CANCELLED, FAILED, or TIMEOUT.
    """

    def __init__(self, id, name, priority):
        self.id = id
    def __init__(self, name,
                 priority=0,
                 jobs_enqueued=0,
                 jobs_completed=0,
                 time_allocated=0,
                 energy_allocated=0,
                 avg_power=0,
                 fugaku_points=0
                 ):
        self.name = name
        self.priority = priority
        self.total_jobs = 0
        self.time_allocated = 0
        self.energy_allocated = 0
        self.avg_power = 0
        self.fugaku_points = 0
        self.jobs_enqueued = jobs_enqueued
        self.jobs_completed = jobs_completed
        self.time_allocated = time_allocated
        self.energy_allocated = energy_allocated
        self.avg_power = avg_power
        if self.avg_power == 0 and self.energy_allocated != 0:
            self.avg_power = self.time_allocated / self.energy_allocated
        self.fugaku_points = fugaku_points

    def update_fugaku_points(self, average_energy, average_power):
        if average_power == 0:
            raise ValueError(f"{average_power} is zero")
        self.fugaku_points = (average_energy - self.energy_allocated) / average_power

    def update_statistics(self, jobstats, average_user):
        self.total_jobs += 1
        self.jobs_completed += 1
        self.time_allocated += jobstats.run_time
        self.energy_allocated += jobstats.energy
        if self.time_allocated == 0:
@@ -39,12 +54,13 @@ class Account:
        if average_user.avg_power == 0:  # If this is the first job use own power
            average_user.avg_power = self.avg_power
        if average_user.avg_power != 0:  # If no energy was computed no points can be computed.
            self.fugaku_points = (average_user.energy_allocated - self.energy_allocated) / average_user.avg_power
            self.update_fugaku_points(average_user.energy_allocated, average_user.avg_power)

    def __repr__(self):
        return (f"Account(id={self.id}, name={self.name}), "
        return (f"Account(name={self.name}), "
                f"priority: {self.priority}, "
                f"total_jobs: {self.total_jobs}, "
                f"jobs_enqueued: {self.jobs_enqueued}, "
                f"jobs_completed: {self.jobs_completed}, "
                f"time_allocated: {self.time_allocated}, "
                f"energy_allocated: {self.energy_allocated}, "
                f"avg_power: {self.avg_power}, "
@@ -53,10 +69,10 @@ class Account:

    def to_dict(self):
        return {
            "id": self.id,
            "name": self.name,
            "priority": self.priority,
            "total_jobs":self.total_jobs,
            "jobs_enqueued": self.jobs_enqueued,
            "jobs_completed": self.jobs_completed,
            "time_allocated": self.time_allocated,
            "energy_allocated": self.energy_allocated,
            "avg_power": self.avg_power,
@@ -64,66 +80,144 @@ class Account:
        }

    @classmethod
    def init_from_dict(acct, account_dict):  # id ,name, priority, total_jobs, time_allocated, energy_allocated, avg_power, fugaku_points):
        acct = Account(account_dict["id"],account_dict["name"],account_dict["priority"])
        acct.id = account_dict["id"]
    def from_dict(acct, account_dict):
        acct = Account(account_dict["name"], priority=account_dict["priority"])
        acct.name = account_dict["name"]
        acct.priority = account_dict["priority"]
        acct.total_jobs = account_dict["total_jobs"]
        acct.jobs_enqueued = account_dict["jobs_enqueued"]
        acct.jobs_completed = account_dict["jobs_completed"]
        acct.time_allocated = account_dict["time_allocated"]
        acct.energy_allocated = account_dict["energy_allocated"]
        acct.avg_power = account_dict["avg_power"]
        acct.fugaku_points = account_dict["fugaku_points"]
        return acct

    @classmethod
    def merge(cls,account1:'Account', account2:'Account') -> 'Account':
        """
        Destructive merge

        Priorities are only set if one is zero or both are equal.
        """
        if account1.name != account2.name:
            raise KeyError(f"{account1.name} != {account2.name}. Input arguments missmatch.")

        merged_account = cls(account1.name)

        if account1.priority == account2.priority:
            merged_account.priority = account1.priority
        elif account1.priority == 0:
            merged_account.priority = account2.priority
        elif account2.priority == 0:
            merged_account.priority = account1.priority
        else:
            raise ValueError("Priority Cannot be derived!")

        merged_account.jobs_enqueued = account1.jobs_enqueued + account2.jobs_enqueued
        merged_account.jobs_completed = account1.jobs_completed + account2.jobs_completed
        merged_account.time_allocated = account1.time_allocated + account2.time_allocated
        merged_account.energy_allocated = account1.energy_allocated + account2.energy_allocated
        if merged_account.energy_allocated != 0:
            merged_account.avg_power = merged_account.time_allocated / merged_account.energy_allocated
        else:
            merged_account.avg_power = 0
        merged_account.fugaku_points = None  # Needs to be invalidated, as averages are not known!

        account1 = None
        account2 = None

        return merged_account


class Accounts:

    def update_average_user(self):
        total_accounts = len(self.account_dict)
        self.average_user.total_jobs = self.all_users.total_jobs / total_accounts
        self.average_user.jobs_enqueued = self.all_users.jobs_enqueued / total_accounts
        self.average_user.jobs_completed = self.all_users.jobs_completed / total_accounts
        self.average_user.time_allocated = self.all_users.time_allocated / total_accounts
        self.average_user.energy_allocated = self.all_users.energy_allocated / total_accounts
        self.average_user.avg_power = self.all_users.avg_power / total_accounts
        self.fugaku_points = self.all_users.fugaku_points / total_accounts  # this should be 0
        if self.average_user.jobs_completed != 0.0:
            self.average_user.update_fugaku_points(self.average_user.energy_allocated,self.average_user.avg_power)
        return self

    def __init__(self):
        self._account_id = 0
    def __init__(self, jobs=None):
        self.account_dict = dict()
        self.all_users = Account(-2,"All_Users",0)
        self.average_user = Account(-1,"Avg_User",0)
        self.all_users = Account(-2, "All_Users")
        self.average_user = Account(-1, "Avg_User")
        if jobs:
            if not isinstance(jobs,list):
                raise TypeError
            for job_dict in jobs:
                if not isinstance(job_dict,dict):
                    raise TypeError
                if job_dict["account"] not in self.account_dict:
                    self.account_dict[job_dict["account"]] = Account(job_dict["account"], jobs_enqueued=0)
                self.account_dict[job_dict["account"]].jobs_enqueued += 1
                self.all_users.jobs_enqueued += 1
            self.update_average_user()
        pass

    def updates_all_users_by_account(self,account:Account):
        self.all_users.jobs_enqueued += account.jobs_enqueued
        self.all_users.jobs_completed += account.jobs_completed
        self.all_users.time_allocated += account.time_allocated
        self.all_users.energy_allocated += account.energy_allocated
        self.all_users.avg_power = self.energy_allocated / self.time_allocated
        self.update_average_user()  # Only necessary if averag_user was not updated before calling update all users.
        # Therefore As this is needed for fugaku points this should always be called.
        self.all_users.update_fugaku_points(self.average_user.energy_allocated,self.average_user.avg_power)



    def initialize_accounts_from_json(self,filename):
    def add_account(self, account:Account):
        self.account_dict[account.name] = account
        self.add_user_stats_to_all_users(account)
        # update_average_user() is already called


    @classmethod
    def from_dict(cls, dictionary):
        accounts = cls()

        if 'account_dict' not in dictionary:
            raise KeyError("'account_dict' not in dictionary. Failed to restore.")
        dicts_from_dictionary = dictionary['account_dict']
        accounts.account_dict = {}
        if not isinstance(dicts_from_dictionary, dict):
            raise KeyError("'account_dict' is not a dictionary. Failed to restore.")
        for account_name, account_dict in dicts_from_dictionary.items():
            accounts.account_dict[account_name] = Account.from_dict(account_dict)

        if 'all_users' not in dictionary:
            raise KeyError("'all_users' not in dictionary. Failed to restore.")
        accounts.all_users = Account.from_dict(dictionary['all_users'])

        if 'average_user' not in dictionary:
            raise KeyError("'average_user' not in dictionary. Failed to restore.")
        accounts.average_user = Account.from_dict(dictionary['average_user'])
        return accounts

    @classmethod
    def from_json_filename(cls, filename):
        try:
            with open(filename, 'r', encoding='utf-8') as file:
                json_object = json.load(file)
            if '_account_id' in json_object:
                self._account_id = json_object['_account_id']
            if 'account_dict' in json_object:
                json_dict = json_object['account_dict']
                self.account_dict = {}
                for account_name,account_dict in json_dict.items():
                    self.account_dict[account_name] = Account.init_from_dict(account_dict)
                #self.account_dict = json_object['account_dict']

            if 'all_users' in json_object:
                self.all_users = Account.init_from_dict(json_object['all_users'])
            if 'average_user' in json_object:
                self.average_user = Account.init_from_dict(json_object['average_user'])
            return cls.from_dict(json_object)
        except ValueError:
            raise ValueError(f"{file} could not be read using json.load()")

    def update_account_statistics(self, jobstats):
        #update specific account associated with job
        # Update specific account associated with job
        if isinstance(jobstats, JobStatistics):
            if jobstats.account not in self.account_dict:
                self.account_dict[jobstats.account] = Account(self._account_id,jobstats.account,0)
                self._account_id += 1
                raise ValueError(f"Account {jobstats.account} not registered in Accounts object {self}")
                # self.account_dict[jobstats.account] = Account(jobstats.account)
            account = self.account_dict[jobstats.account]
            account.update_statistics(jobstats, self.average_user)
            self.account_dict[jobstats.account] = account
            #update the average_user account and the summary account
            # Update the  summary account (all_users) and the average_user account
            self.all_users.update_statistics(jobstats,self.average_user)
            self.update_average_user()

@@ -132,8 +226,42 @@ class Accounts:
        for account_name,account in self.account_dict.items():
            acct_dict[account_name] = account.to_dict()
        ret_dict = {}
        ret_dict['_account_id'] = self._account_id
        ret_dict['account_dict'] = acct_dict
        ret_dict['all_users'] = self.all_users.to_dict()
        ret_dict['average_user'] = self.average_user.to_dict()
        return ret_dict

    @classmethod
    def merge(cls, accounts1:'Accounts', accounts2:'Accounts') -> 'Accounts':
        """
        Destructive merge of accounts
        """
        merged_accounts = Accounts()
        merged_accounts.account_dict = accounts1.account_dict

        for ac2_k, ac2_v in accounts2.account_dict.items():
            if ac2_k in accounts1.account_dict:
                merged_accounts.account_dict[ac2_k] = Account.merge(accounts1.account_dict[ac2_k], accounts2.account_dict[ac2_k])
            else:
                merged_accounts.account_dict[ac2_k] = ac2_v
        for ac1_k, ac1_v in accounts1.account_dict.items():
            if ac1_k not in accounts2.account_dict:
                merged_accounts.account_dict[ac1_k] = ac1_v
            else:
                # Already added above
                pass

        # Update all users -> then update average user -> then fugagku points for all users (order is important!)
        merged_accounts.all_users = Account.merge(accounts1.all_users,accounts2.all_users)
        merged_accounts.update_average_user()
        # Update to average user is needed before fugaku points can be caluculated.
        if merged_accounts.all_users.jobs_completed != 0:
            merged_accounts.all_users.update_fugaku_points(merged_accounts.average_user.energy_allocated, merged_accounts.average_user.avg_power)

        for ac_k, ac_v in merged_accounts.account_dict.items():
            if merged_accounts.account_dict[ac_k].jobs_completed != 0:
                merged_accounts.account_dict[ac_k].update_fugaku_points(merged_accounts.average_user.energy_allocated, merged_accounts.average_user.avg_power)

        accounts1 = None
        accounts2 = None
        return merged_accounts
+25 −23
Original line number Diff line number Diff line
@@ -3,7 +3,6 @@ import dataclasses
import pandas as pd

from .job import Job, JobState
from .account import Accounts
from .network import network_utilization
from .utils import summarize_ranges, expand_ranges, get_utilization
from .resmgr import ResourceManager
@@ -30,6 +29,7 @@ class TickData:

class Engine:
    """Job scheduling simulation engine."""

    def __init__(self, *, power_manager, flops_manager, cooling_model=None, config, **kwargs):
        self.config = config
        self.down_nodes = summarize_ranges(self.config['DOWN_NODES'])
@@ -41,9 +41,8 @@ class Engine:
        # Initialize running and queue, etc.
        self.running = []
        self.queue = []
        self.accounts = Accounts()
        if 'accounts_json' in kwargs and kwargs['accounts_json']:
            self.accounts.initialize_accounts_from_json(kwargs.get('accounts_json'))
        self.accounts = None
        self.job_history_dict = []
        self.jobs_completed = 0
        self.current_time = 0
        self.cooling_model = cooling_model
@@ -64,11 +63,13 @@ class Engine:
        )
        print(f"Using scheduler: {scheduler_type}")


    def add_job(self, job):
        self.queue.append(job)
        self.queue = self.scheduler.sort_jobs(self.queue)

    def eligible_jobs(self,jobs_to_submit):
        eligible_jobs_list = []
        while jobs_to_submit and jobs_to_submit[0]['submit_time'] <= self.current_time:
            job_info = jobs_to_submit.pop(0)
            job = Job(job_info, self.current_time)
            eligible_jobs_list.append(job)
        return eligible_jobs_list

    def tick(self):
        """Simulate a timestep."""
@@ -130,11 +131,12 @@ class Engine:
            self.jobs_completed += 1
            job_stats = job.statistics()
            self.accounts.update_account_statistics(job_stats)
            self.job_history_dict.append(job_stats.__dict__)
            # Free the nodes via the resource manager.
            self.resource_manager.free_nodes_from_job(job)

        # Ask scheduler to schedule any jobs waiting in queue
        self.scheduler.schedule(self.queue, self.running, self.current_time)
        self.scheduler.schedule(self.queue, self.running, self.current_time, self.accounts)

        # Update the power array UI component
        rack_power, rect_losses = self.power_manager.compute_rack_power()
@@ -199,7 +201,6 @@ class Engine:
        self.current_time += 1
        return tick_data


    def run_simulation(self, jobs, timesteps, autoshutdown=False):
        """Generator that yields after each simulation tick."""
        self.timesteps = timesteps
@@ -208,14 +209,13 @@ class Engine:
        jobs_to_submit = sorted(jobs, key=lambda j: j['submit_time'])

        for timestep in range(timesteps):
            # Submit jobs whose submit_time is <= current_time
            while jobs_to_submit and jobs_to_submit[0]['submit_time'] <= self.current_time:
                job_info = jobs_to_submit.pop(0)
                job = Job(job_info, self.current_time)
                self.add_job(job)

            # Identify eligible jobs and add them to the queue.
            self.queue += self.eligible_jobs(jobs_to_submit)
            # Sort the queue according to the policy
            self.queue = self.scheduler.sort_jobs(self.queue, self.accounts)
            # Schedule jobs that are now in the queue.
            self.scheduler.schedule(self.queue, self.running, self.current_time)
            self.scheduler.schedule(self.queue, self.running, self.current_time, sorted=True)

            # Stop the simulation if no more jobs are running or in the queue.
            if autoshutdown and not self.queue and not self.running and not self.replay:
@@ -227,7 +227,6 @@ class Engine:

            yield self.tick()


    def get_stats(self):
        """ Return output statistics """
        sum_values = lambda values: sum(x[1] for x in values) if values else 0
@@ -264,3 +263,6 @@ class Engine:
        }

        return stats

    def get_job_history_dict(self):
        return self.job_history_dict
+11 −5
Original line number Diff line number Diff line
@@ -42,9 +42,7 @@ class Job:
    _id_counter = 0

    def __init__(self, job_dict, current_time, state=JobState.PENDING, account=None):
        for key, value in job_dict.items(): setattr(self, key, value)
        if not self.id: self.id = Job._get_next_id()
        # initializations
        # Initializations:
        self.start_time = None
        self.end_time = None
        self.running_time = 0
@@ -53,6 +51,12 @@ class Job:
        self.power_history = []
        self._state = state
        self.account = account
        # If a job dict was given, override the values from the job_dict:
        for key, value in job_dict.items():
            setattr(self, key, value)
        # In any case: provide a job_id!
        if not self.id:
            self.id = Job._get_next_id()

    def __repr__(self):
        """Return a string representation of the job."""
@@ -100,7 +104,6 @@ class Job:
        return JobStatistics(self)



class JobStatistics:
    """
    Reduced class for handling statistics after the job has finished.
@@ -112,6 +115,9 @@ class JobStatistics:
        self.account = job.account
        self.num_nodes = len(job.scheduled_nodes)
        self.run_time = job.running_time
        self.start_time = job.start_time
        self.end_time = job.end_time
        self.state = job._state
        if len(job.power_history) == 0:
            self.avg_node_power = 0
            self.max_node_power = 0
+45 −11

File changed.

Preview size limit exceeded, changes collapsed.

Loading