Commit 836ed68e authored by Maiterth, Matthias's avatar Maiterth, Matthias
Browse files

Separated Accounts from Jobs and the Scheduler

parent 00382592
Loading
Loading
Loading
Loading
+8 −2
Original line number Diff line number Diff line
@@ -228,3 +228,9 @@ if args.output:
                json.dump(output_stats, f, indent=4)
        except:
            write_dict_to_file(output_stats, OPATH / 'stats.out')

        try:
            with open(OPATH / 'account-stats.txt') as f:
                json.dump(sc.accounts, f, indent=4)
        except:
            write_dict_to_file(vars(sc.accounts), OPATH / 'account-stats.out')
+31 −6
Original line number Diff line number Diff line
from enum import Enum

def job_dict(nodes_required, name, cpu_trace, gpu_trace, ntx_trace, nrx_trace, \
def job_dict(nodes_required, name, account, cpu_trace, gpu_trace, ntx_trace, nrx_trace, \
             wall_time, end_state, scheduled_nodes, time_offset, job_id, priority=0, partition=0):
    """ Return job info dictionary """
    return {
        'nodes_required': nodes_required,
        'name': name,
        'account': account,
        'cpu_trace': cpu_trace,
        'gpu_trace': gpu_trace,
        'ntx_trace': ntx_trace,
@@ -54,7 +55,8 @@ class Job:

    def __repr__(self):
        """Return a string representation of the job."""
        return (f"Job(id={self.id}, name={self.name}, nodes_required={self.nodes_required}, "
        return (f"Job(id={self.id}, name={self.name}, account={self.account}, "
                f"nodes_required={self.nodes_required}, "
                f"cpu_trace={self.cpu_trace}, gpu_trace={self.gpu_trace}, wall_time={self.wall_time}, "
                f"end_state={self.end_state}, requested_nodes={self.requested_nodes}, "
                f"submit_time={self.submit_time}, start_time={self.start_time}, "
@@ -90,3 +92,26 @@ class Job:
        """
        cls._id_counter += 1
        return cls._id_counter

    def statistics(self):
        """ Derive job statistics from the Job Class and return
        """
        return JobStatistics(self)



class JobStatistics:
    """
    Reduced class for handling statistics after the job has finished.
    """

    def __init__(self,job):
        self.id = job.id
        self.name = job.name
        self.account = job.account
        self.num_nodes = len(job.scheduled_nodes)
        self.run_time = job.running_time
        self.avg_node_power = sum(job.power_history) / len(job.power_history) / self.num_nodes
        self.max_node_power = max(job.power_history) / self.num_nodes
        self.energy = self.run_time * self.avg_node_power * self.num_nodes
+21 −15
Original line number Diff line number Diff line
@@ -46,9 +46,10 @@ from scipy.stats import weibull_min
import pandas as pd

from .job import Job, JobState
from .account import Accounts
from .network import network_utilization
from .policy import Policy, PolicyType
from .utils import summarize_ranges, expand_ranges
from .utils import summarize_ranges, expand_ranges, write_dict_to_file


@dataclasses.dataclass
@@ -88,6 +89,7 @@ class Scheduler:
        self.num_active_nodes = self.config['TOTAL_NODES'] - self.num_free_nodes - len(self.config['DOWN_NODES'])
        self.running = []
        self.queue = []
        self.accounts = Accounts()
        self.jobs_completed = 0
        self.current_time = 0
        self.cooling_model = cooling_model
@@ -172,12 +174,11 @@ class Scheduler:
                    self.queue.append(job) # Note, this should be fixed. It shouldn't go to the end of the queue.
                break


    def tick(self):
        """Simulate a timestep."""
        completed_jobs = [job for job in self.running if job.end_time
                          is not None and job.end_time <= self.current_time]

        completed_job_stats = []
        # Simulate node failure
        newly_downed_nodes = self.node_failure(self.config['MTBF'])

@@ -258,11 +259,16 @@ class Scheduler:
            if self.debug:
                print(f"Released {scheduled_nodes}")
            self.jobs_completed += 1

            job_stats = job.statistics()
            if self.debug:
                print(job_stats)
            completed_job_stats.append(job_stats)
            self.accounts.update_account_statistics(job_stats)
            if self.output:
                # output power trace
                with open(self.opath / f'job-power-{job.id}.txt', 'w') as file:
                    print(*job.power_history, sep=', ', file=file)

                write_dict_to_file(vars(job_stats),self.opath / f'job-stats-{job.account}.json')
        # Ask scheduler to schedule any jobs waiting in queue
        self.schedule([])

+8 −7
Original line number Diff line number Diff line
@@ -86,7 +86,7 @@ class LayoutManager:
            Flag indicating whether to display node information (default is False).
        """
        # Define columns with header styles
        columns = ["JOBID", "WALL TIME", "NAME", "ST", "NODES", "NODE SEGMENTS"]
        columns = ["JOBID", "WALL TIME", "NAME", "ACCOUNT", "ST", "NODES", "NODE SEGMENTS"]
        if show_nodes:
            columns.append("NODELIST")
        columns.append("TIME")
@@ -111,6 +111,7 @@ class LayoutManager:
                str(job.id).zfill(5),
                convert_seconds(job.wall_time),
                str(job.name),
                str(job.account),
                job.state.value,
                str(job.nodes_required),
                nodes_display,
+13 −9
Original line number Diff line number Diff line
@@ -38,6 +38,9 @@ JOB_NAMES = ["LAMMPS", "GROMACS", "VASP", "Quantum ESPRESSO", "NAMD",\
             "TensorFlow", "PyTorch", "BLAST", "Spark", "GAMESS",\
             "ORCA", "Simulink", "MOOSE", "ELK"]

ACCT_NAMES = ["ACT01", "ACT02", "ACT03", "ACT04", "ACT05", "ACT06", "ACT07",\
              "ACT08", "ACT09", "ACT10", "ACT11", "ACT12", "ACT13", "ACT14"]

MAX_PRIORITY = 500000

from .utils import truncated_normalvariate, determine_state, next_arrival
@@ -66,6 +69,7 @@ class Workload:

            nodes_required = random.randint(1, config['MAX_NODES_PER_JOB'])
            name = random.choice(JOB_NAMES)
            account = random.choice(ACCT_NAMES)
            cpu_util = random.random() * config['CPUS_PER_NODE']
            gpu_util = random.random() * config['GPUS_PER_NODE']
            mu = (config['MAX_WALL_TIME'] + config['MIN_WALL_TIME']) / 2
@@ -79,7 +83,7 @@ class Workload:
            # Jobs arrive according to Poisson process
            time_to_next_job = next_arrival(1 / config['JOB_ARRIVAL_TIME'])

            jobs.append(job_dict(nodes_required, name, cpu_trace, gpu_trace, net_tx, net_rx, \
            jobs.append(job_dict(nodes_required, name, account, cpu_trace, gpu_trace, net_tx, net_rx, \
                        wall_time, end_state, None, time_to_next_job, None, priority, partition))

        return jobs
+4 −4

File changed.

Contains only whitespace changes.

Loading