Commit 853055aa authored by Maiterth, Matthias's avatar Maiterth, Matthias
Browse files

Moved Fugaku_pts and other experimental scheduling strategies to raps/schedulers/experimental.py

This serves as blueprint for other experimental schedulers and how to extend them,
without the need to extend to an existing scheduler, see ScheduleFlow or FastSim.

Updated lassen
- Threads per core in config
- cpu_util function from LAST dataset corrected

Update Accounts:
- calculation of fugaku points dealing with 0 average power.
- time_allocated for a user counts runtime * nodes allocated.

Updated engine to deal with traces that are missing data in the front.

Default Scheduler:
- does not contain FUGAKU_PTS anymore

Experimental Scheduler:
- Contains
	ACCT_FUGAKU_PTS = 'acct_fugaku_pts'
	ACCT_AVG_P = 'acct_avg_power'
	ACCT_AVG_PW4LJ = 'acct_avg_power_w4lj'
	ACCT_EDP = 'acct_edp'
	ACCT_ED2P = 'acct_ed2p'
	ACCT_PDP = 'acct_pdp'
parent 14112a0b
Loading
Loading
Loading
Loading
+2 −2
Original line number Diff line number Diff line
@@ -46,10 +46,10 @@ choices = ['random', 'benchmark', 'peak', 'idle']
parser.add_argument('-w', '--workload', type=str, choices=choices, default=choices[0], help='Type of synthetic workload')

# Scheduling options
choices = ['default', 'scheduleflow', 'nrel', 'anl', 'flux']
choices = ['default', 'scheduleflow', 'nrel', 'anl', 'flux', 'experimental']
parser.add_argument('--scheduler', type=str, choices=choices, default=choices[0], help='Name of scheduler')
choices = [policy.value for policy in PolicyType]
parser.add_argument('--policy', type=str, choices=choices, default=None, help='Schedule policy to use')
parser.add_argument('--policy', type=str, default=None, help='Schedule policy to use, e.g.:' + str(choices) + " or extended policies")
choices = [policy.value for policy in BackfillType]
parser.add_argument('--backfill', type=str, choices=choices, default=None, help='Backfill Policy')

+1 −0
Original line number Diff line number Diff line
@@ -13,6 +13,7 @@
    "DOWN_NODES": [],
    "CPUS_PER_NODE": 2,
    "CORES_PER_CPU": 22,
    "THREADS_PER_CORE": 4,
    "CPU_FREQUENCY": 2400000000,
    "GPUS_PER_NODE": 4,
    "CPU_PEAK_FLOPS": 396.8E9,
+4 −3
Original line number Diff line number Diff line
@@ -40,12 +40,13 @@ class Account:

    def update_fugaku_points(self, average_energy, average_power):
        if average_power == 0:
            return
            self.fugaku_points = 0
        else:
            self.fugaku_points = (average_energy - self.energy_allocated) / average_power

    def update_statistics(self, jobstats, average_user):
        self.jobs_completed += 1
        self.time_allocated += jobstats.run_time
        self.time_allocated += jobstats.run_time * jobstats.num_nodes
        self.energy_allocated += jobstats.energy
        if self.time_allocated == 0:
            self.avg_power = 0
+6 −3
Original line number Diff line number Diff line
@@ -154,10 +154,12 @@ def load_data_from_df(allocation_df, node_df, step_df, **kwargs):
            cpu_node_usage[cpu_node_usage < 0] = 0.0
            cpu_node_usage[cpu_node_usage == np.NaN] = 0.0
            if wall_time > 0:
                cpu_util = cpu_node_usage.sum() / nodes_required / wall_time / config['CPU_FREQUENCY'] / config['CORES_PER_CPU']
                threads_per_core = config['THREADS_PER_CORE']
                cpu_util = cpu_node_usage.sum() / 10e9 / nodes_required / wall_time / threads_per_core
            else:
                cpu_util = 0.0
            assert cpu_util >= 0, f"{cpu_util} = {cpu_node_usage.sum()} / {nodes_required} / {wall_time} / {config['CPU_FREQUENCY']} / {config['CORES_PER_CPU']}"
            assert cpu_util >= 0, f"{cpu_util} = {cpu_node_usage.sum()} / 10e9 / {nodes_required} / {wall_time} / {threads_per_core}"

            # cpu_util should be between 0 an 2 (2 CPUs)

            cpu_trace = cpu_util
@@ -169,7 +171,8 @@ def load_data_from_df(allocation_df, node_df, step_df, **kwargs):
        ib_tx = 4 * node_data['ib_tx'].sum() if node_data['ib_tx'].values.size > 0 else []
        ib_rx = 4 * node_data['ib_rx'].sum() if node_data['ib_rx'].values.size > 0 else []

        net_tx, net_rx = generate_network_sequences(ib_tx, ib_rx, samples, lambda_poisson=0.3)
        #net_tx, net_rx = generate_network_sequences(ib_tx, ib_rx, samples, lambda_poisson=0.3)
        net_tx, net_rx = [],[]  # generate_network_sequences generates errors (e.g. -ff 800d -t 1d )

        # no priorities defined!
        priority = row.get('priority', 0)
+6 −1
Original line number Diff line number Diff line
@@ -184,7 +184,12 @@ class Engine:
                # job ended before.
                # For every other error condition trace_start_ and
                # _end_time are used!
                #print(type(job.cpu_trace))
                # #print(type(job.cpu_trace))
                if time_quanta_index < 0:
                    time_quanta_index = 0
                # Similar with the first time_quanta index: If the job started
                # in the past and no trace if there, read index 0 until values
                # are available.
                if isinstance(job.cpu_trace,list) or isinstance(job.cpu_trace,np.ndarray):
                    if time_quanta_index < len(job.cpu_trace):
                        cpu_util = get_utilization(job.cpu_trace, time_quanta_index)
Loading