Commit 94c84694 authored by kevinmenear's avatar kevinmenear
Browse files

Vectorize update flop state and update power state

parent d93b054f
Loading
Loading
Loading
Loading
+20 −5
Original line number Diff line number Diff line
@@ -86,6 +86,10 @@ class Engine:
                              - len(self.resource_manager.down_nodes)

        # Update running time for all running jobs
        scheduled_nodes = []
        cpu_utils = []
        gpu_utils = []
        net_utils = []
        for job in self.running:
            if job.end_time == self.current_time:
                job.state = JobState.COMPLETED
@@ -101,12 +105,23 @@ class Engine:
                    net_tx = self.get_utilization(job.ntx_trace, time_quanta_index)
                    net_rx = self.get_utilization(job.nrx_trace, time_quanta_index)
                    net_util = network_utilization(net_tx, net_rx)
                    net_utils.append(net_util)
                else:
                    net_utils.append(0)

                self.flops_manager.update_flop_state(job.scheduled_nodes, cpu_util, gpu_util)
                job.power = self.power_manager.update_power_state(job.scheduled_nodes, cpu_util, gpu_util, net_util)
                scheduled_nodes.append(job.scheduled_nodes)
                cpu_utils.append(cpu_util)
                gpu_utils.append(gpu_util)
        
        if len(scheduled_nodes) > 0:
            self.flops_manager.update_flop_state(scheduled_nodes, cpu_utils, gpu_utils)
            jobs_power = self.power_manager.update_power_state(scheduled_nodes, cpu_utils, gpu_utils, net_utils)
            job_index = 0
            for job in self.running:
                if job.state == JobState.RUNNING:
                    if job.running_time % self.config['TRACE_QUANTA'] == 0:
                    job.power_history.append(job.power)
                        job.power_history.append(jobs_power[job_index] * len(job.scheduled_nodes))
                    job_index += len(job.scheduled_nodes)

        for job in completed_jobs:
            self.running.remove(job)
+26 −8
Original line number Diff line number Diff line
@@ -9,17 +9,35 @@ class FLOPSManager():
        self.flop_state = np.zeros(self.config['SC_SHAPE'])

    def update_flop_state(self, scheduled_nodes, cpu_util, gpu_util):
        node_indices = linear_to_3d_index(scheduled_nodes, self.config['SC_SHAPE'])
        if self.validate:   # cpu_util is in fact node_Watts in this case
            self.flop_state[node_indices] = \
                (self.config['CPU_FP_RATIO']*self.config['CPU_PEAK_FLOPS'] + self.config['GPU_FP_RATIO'] * self.config['GPU_PEAK_FLOPS']) * (cpu_util / (self.config['POWER_CPU_MAX']*self.config['CPUS_PER_NODE'] + self.config['POWER_GPU_MAX']*self.config['GPUS_PER_NODE']+ self.config['POWER_NIC']*self.config['NICS_PER_NODE']+self.config['POWER_NVME']))
        else:   
            self.flop_state[node_indices] = \
                self.config['CPU_FP_RATIO'] * cpu_util * self.config['CPU_PEAK_FLOPS'] + \
                self.config['GPU_FP_RATIO'] * gpu_util * self.config['GPU_PEAK_FLOPS']
        cpu_util = np.asarray(cpu_util)
        gpu_util = np.asarray(gpu_util)
        job_lengths = np.array([len(job) for job in scheduled_nodes])
        flattened_nodes = np.concatenate(scheduled_nodes, axis=0)

        cpu_util_flat = np.repeat(cpu_util, job_lengths)
        gpu_util_flat = np.repeat(gpu_util, job_lengths)

        node_indices = linear_to_3d_index(flattened_nodes, self.config['SC_SHAPE'])


        if self.validate:   # cpu_util is in fact node_Watts in this case
            total_peak = (
                self.config['CPU_FP_RATIO'] * self.config['CPU_PEAK_FLOPS'] + 
                self.config['GPU_FP_RATIO'] * self.config['GPU_PEAK_FLOPS']
                )
            denominator = (
                self.config['POWER_CPU_MAX'] * self.config['CPUS_PER_NODE'] + 
                self.config['POWER_GPU_MAX'] * self.config['GPUS_PER_NODE'] + 
                self.config['POWER_NIC'] * self.config['NICS_PER_NODE'] +
                self.config['POWER_NVME']
                )
            self.flop_state[node_indices] = total_peak * (cpu_util_flat / denominator)
        else:   
            self.flop_state[node_indices] = (
                self.config['CPU_FP_RATIO'] * cpu_util_flat * self.config['CPU_PEAK_FLOPS'] +
                self.config['GPU_FP_RATIO'] * gpu_util_flat * self.config['GPU_PEAK_FLOPS']
            )

    def get_rpeak(self):
        node_peak_flops = self.config['CPUS_PER_NODE'] * self.config['CPU_PEAK_FLOPS'] \
                        + self.config['GPUS_PER_NODE'] * self.config['GPU_PEAK_FLOPS']
+19 −4
Original line number Diff line number Diff line
@@ -58,6 +58,9 @@ def compute_node_power(cpu_util, gpu_util, net_util, config):
        power_nic = config['POWER_NIC_IDLE'] + \
                    (config['POWER_NIC_MAX'] - config['POWER_NIC_IDLE']) * net_util
    except:
        if isinstance(net_util, np.ndarray):
            power_nic = config['POWER_NIC'] * np.ones(net_util.shape)
        else:
            power_nic = config['POWER_NIC']

    power_total = power_cpu + power_gpu + config['POWER_MEM'] + \
@@ -260,11 +263,23 @@ class PowerManager:
        float
            Total power consumption of the scheduled nodes.
        """
        node_indices = linear_to_3d_index(scheduled_nodes, self.sc_shape)
        power_value, sivoc_loss = self.power_func(cpu_util, gpu_util, net_util, self.config)
        cpu_util = np.asarray(cpu_util)
        gpu_util = np.asarray(gpu_util)
        net_util = np.asarray(net_util)
        job_lengths = np.array([len(job) for job in scheduled_nodes])
        flattened_nodes = np.concatenate(scheduled_nodes, axis=0)

        cpu_util_flat = np.repeat(cpu_util, job_lengths)
        gpu_util_flat = np.repeat(gpu_util, job_lengths)
        net_util_flat = np.repeat(net_util, job_lengths)

        node_indices = linear_to_3d_index(flattened_nodes, self.config['SC_SHAPE'])

        power_value, sivoc_loss = self.power_func(cpu_util_flat, gpu_util_flat, net_util_flat, self.config)
        self.power_state[node_indices] = power_value
        self.sivoc_loss[node_indices] = sivoc_loss
        return power_value * len(scheduled_nodes)
        return power_value
    

    def calculate_rectifiers_needed(self, power_state_summed):
        """