Commit f97c81a3 authored by Brewer, Wes's avatar Brewer, Wes
Browse files

Merge branch 'main' into test-scheduling

parents 0cff1144 bde550d8
Loading
Loading
Loading
Loading
+20 −17
Original line number Diff line number Diff line
@@ -22,28 +22,11 @@ def load_data(files, **kwargs):
    """
    Reads job and job profile data from parquet files and parses them.

    Parameters
    ----------
    jobs_path : str
        The path to the jobs parquet file.
    jobprofile_path : str
        The path to the job profile parquet file.
    min_time: Value to use as zero time. Setting this to None will
              autocompute the min value.

    Returns
    -------
    list
        The list of parsed jobs.
    """
    encrypt_bool = kwargs.get('encrypt')
    reschedule = kwargs.get('reschedule')
    validate = kwargs.get('validate')
    jid = kwargs.get('jid')

    #min_time = kwargs.get('min_time', None)
    min_time = None

    assert(len(files) == 2), "Frontier dataloader requires two files: joblive and jobprofile"

    jobs_path = files[0]
@@ -52,6 +35,26 @@ def load_data(files, **kwargs):
    jobprofile_path = files[1]
    jobprofile_df = pd.read_parquet(jobprofile_path, engine='pyarrow')

    return load_data_from_df(jobs_df, jobprofile_df, **kwargs)


def load_data_from_df(jobs_df: pd.DataFrame, jobprofile_df: pd.DataFrame, **kwargs):
    """
    Reads job and job profile data from dataframes files and parses them.

    Returns
    -------
    list
        The list of parsed jobs.
    """
    encrypt_bool = kwargs.get('encrypt')
    reschedule = kwargs.get('reschedule')
    validate = kwargs.get('validate')
    jid = kwargs.get('jid', '*')

    #min_time = kwargs.get('min_time', None)
    min_time = None

    # Sort jobs dataframe based on values in time_start column, adjust indices after sorting
    jobs_df = jobs_df[jobs_df['time_start'].notna()]
    jobs_df = jobs_df.drop_duplicates(subset='job_id', keep='last').reset_index()
+14 −3
Original line number Diff line number Diff line
@@ -28,6 +28,19 @@ def load_data(jobs_path, **kwargs):
    jobs_path : str
        The path to the jobs parquet file.

    Returns
    -------
    list
        The list of parsed jobs.
    """
    jobs_df = pd.read_parquet(jobs_path, engine='pyarrow')
    return load_data_from_df(jobs_df, **kwargs)


def load_data_from_df(jobs_df: pd.DataFrame, **kwargs):
    """
    Reads job and job profile data from parquet files and parses them.

    Returns
    -------
    list
@@ -36,9 +49,7 @@ def load_data(jobs_path, **kwargs):
    min_time = None
    reschedule = kwargs.get('reschedule')
    validate = kwargs.get('validate')
    jid = kwargs.get('jid')

    jobs_df = pd.read_parquet(jobs_path, engine='pyarrow')
    jid = kwargs.get('jid', '*')

    # Sort jobs dataframe based on values in time_start column, adjust indices after sorting
    jobs_df = jobs_df.sort_values(by='start_time')
+1 −1
Original line number Diff line number Diff line
@@ -345,7 +345,7 @@ class PowerManager:

        Returns:
        tuple
            Tuple containing rack power and rectifier losses.
            Tuple containing rack power (kW) and rectifier losses (kW).
        """
        shape = (self.sc_shape[0], self.sc_shape[1], CHASSIS_PER_RACK, -1)
        power_state_reshaped = np.reshape(self.power_state, shape)
+22 −16
Original line number Diff line number Diff line
@@ -211,6 +211,7 @@ class TickData:
    cooling_df: Optional[pd.DataFrame]
    p_flops: float
    g_flops_w: float
    system_util: float


class Scheduler:
@@ -353,6 +354,11 @@ class Scheduler:
        # Simulate node failure
        newly_downed_nodes = self.node_failure(MTBF)

        # Update active/free nodes
        self.num_free_nodes = len(self.available_nodes)
        self.num_active_nodes = TOTAL_NODES - self.num_free_nodes \
                              - len(expand_ranges(self.down_nodes))

        # Update running time for all running jobs
        for job in self.running:

@@ -436,18 +442,20 @@ class Scheduler:
        system_util = self.num_active_nodes / AVAILABLE_NODES * 100
        self.sys_util_history.append((self.current_time, system_util))

        # Render the updated layout
        output_df = None

        # Update power history every 15s
        pflops, gflop_per_watt = 0, 0
        if self.current_time % POWER_UPDATE_FREQ == 0:
            total_power_kw = sum(row[-1] for row in rack_power) + NUM_CDUS * POWER_CDU / 1000.0
            total_loss_kw = sum(row[-1] for row in rack_loss)
            self.power_manager.history.append((self.current_time, total_power_kw))
            self.power_manager.loss_history.append((self.current_time, total_loss_kw))
            output_df = self.power_manager.get_power_df(rack_power, rack_loss)
            pflops = self.flops_manager.get_system_performance() / 1E15
            gflop_per_watt = pflops * 1E6 / (total_power_kw * 1000)

        # Render the updated layout
        output_df = None
        else:    
            pflops, gflop_per_watt = None, None

        if self.cooling_model:

@@ -475,17 +483,11 @@ class Scheduler:
                    self.layout_manager.update_pressflow_array(cooling_df)

        if self.current_time % UI_UPDATE_FREQ == 0:

            # Get a dataframe of the power data
            power_df = self.power_manager.get_power_df(rack_power, rack_loss)

            if self.layout_manager:
                self.layout_manager.update_scheduled_jobs(self.running + self.queue)

                self.num_free_nodes = len(self.available_nodes)
                self.num_active_nodes = TOTAL_NODES - self.num_free_nodes - \
                        len(expand_ranges(self.down_nodes))
                
                self.layout_manager.update_status(self.current_time, len(self.running),
                                              len(self.queue), self.num_active_nodes,
                                              self.num_free_nodes, self.down_nodes[1:])
@@ -495,11 +497,12 @@ class Scheduler:

        tick_data = TickData(
            current_time = self.current_time,
            jobs = self.running + self.queue,
            jobs = completed_jobs + self.running + self.queue,
            down_nodes = expand_ranges(self.down_nodes[1:]),
            cooling_df = output_df,
            p_flops = pflops,
            g_flops_w = gflop_per_watt
            g_flops_w = gflop_per_watt,
            system_util = system_util
        )

        self.current_time += 1
@@ -529,12 +532,15 @@ class Scheduler:
            print(limits)
        
        for _ in range(timesteps):
            if self.current_time >= time_to_next_job:
                if jobs:
            while self.current_time >= time_to_next_job and jobs:
                job = jobs.pop(0)
                self.schedule([job])
                    time_to_next_job = job[7]
                if jobs:
                    time_to_next_job = job[7]  # Update time to next job based on the next job's scheduled time
                else:
                    time_to_next_job = float('inf')  # No more jobs, set to infinity or some large number to avoid triggering again
            yield self.tick()

            # Stop the simulation if no more jobs running or are in the queue
            if not self.queue and not self.running and not self.replay:
                print("stopping simulation at time", self.current_time)
+7 −1
Original line number Diff line number Diff line
@@ -51,10 +51,16 @@ class Telemetry:

    def load_data(self, files):
        """Load telemetry data using custom data loaders."""
        module = importlib.import_module('raps.dataloaders.' + self.system)
        module = importlib.import_module(f".dataloaders.{self.system}", package=__package__)
        return module.load_data(files, **self.kwargs)


    def load_data_from_df(self, *args, **kwargs):
        """Load telemetry data using custom data loaders."""
        module = importlib.import_module(f".dataloaders.{self.system}", package=__package__)
        return module.load_data_from_df(*args, **kwargs)


if __name__ == "__main__":

    args_dict = vars(args)
Loading