Unverified Commit 231f144a authored by Brewer, Wes's avatar Brewer, Wes Committed by GitHub
Browse files

Merge pull request #2 from ExaDigiT/refactor-layout-manager

Refactor layout manager
parents f7bf727d e9bd67ab
Loading
Loading
Loading
Loading
+0 −1
Original line number Diff line number Diff line
@@ -4,7 +4,6 @@
    "ZIP_CODE": 37831,
    "COUNTRY_CODE": "US",
    "FMU_PATH": "models/Simulator_olcf5_base.fmu",
    "FMU_UPDATE_FREQ": 15,
    "FMU_COLUMN_MAPPING": {
        "T_sec_r_C": "Rack Return Temperature (\u00b0C)",
        "T_sec_s_C": "Rack Supply Temperature (\u00b0C)",
+4 −3
Original line number Diff line number Diff line
@@ -97,13 +97,13 @@ else:
        power_manager = PowerManager(compute_node_power, **config)

flops_manager = FLOPSManager(**config)
layout_manager = LayoutManager(args.layout, args.debug, **config)
args_dict['config'] = config
sc = Scheduler(
    power_manager = power_manager, flops_manager = flops_manager, layout_manager = layout_manager,
    power_manager = power_manager, flops_manager = flops_manager,
    cooling_model = cooling_model,
    **args_dict,
)
layout_manager = LayoutManager(args.layout, scheduler = sc, debug = args.debug, **config)

if args.replay:

@@ -173,7 +173,8 @@ if args.plot or args.output:
if args.verbose:
    print(jobs)

sc.run_simulation_blocking(jobs, timesteps=timesteps)
layout_manager.run(jobs, timesteps=timesteps)

output_stats = sc.get_stats()
# Following b/c we get the following error when we use PM100 telemetry dataset
# TypeError: Object of type int64 is not JSON serializable
+25 −41
Original line number Diff line number Diff line
@@ -31,7 +31,6 @@ Config parameters used:
- MAX_TIME: Maximum simulation time.
- POWER_UPDATE_FREQ: Frequency of updating power-related metrics.
- POWER_DF_HEADER: Header for the power related components of DataFrame.
- FMU_UPDATE_FREQ: Frequency of updating the FMU model.
- POWER_CDU: Power consumption of CDU.
- TOTAL_NODES: Total number of nodes in the system.
- COOLING_EFFICIENCY: Cooling efficiency factor.
@@ -56,14 +55,18 @@ from .utils import summarize_ranges, expand_ranges
class TickData:
    """ Represents the state output from the simulation each tick """
    current_time: int
    jobs: list[Job]
    completed: list[Job]
    running: list[Job]
    queue: list[Job]
    down_nodes: list[int]
    power_df: Optional[pd.DataFrame]
    p_flops: float
    g_flops_w: float
    p_flops: Optional[float]
    g_flops_w: Optional[float]
    system_util: float
    fmu_inputs: Optional[dict]
    fmu_outputs: Optional[dict]
    num_active_nodes: int
    num_free_nodes: int


def get_utilization(trace, time_quanta_index):
@@ -77,7 +80,7 @@ def get_utilization(trace, time_quanta_index):

class Scheduler:
    """Job scheduler and simulation manager."""
    def __init__(self, *, power_manager, flops_manager, layout_manager, cooling_model=None, config, **kwargs):
    def __init__(self, *, power_manager, flops_manager, cooling_model=None, config, **kwargs):
        self.config = config
        self.down_nodes = summarize_ranges(self.config['DOWN_NODES'])
        self.available_nodes = list(set(range(self.config['TOTAL_NODES'])) - set(self.config['DOWN_NODES']))
@@ -88,7 +91,6 @@ class Scheduler:
        self.jobs_completed = 0
        self.current_time = 0
        self.cooling_model = cooling_model
        self.layout_manager = layout_manager
        self.power_manager = power_manager
        self.flops_manager = flops_manager
        self.debug = kwargs.get('debug')
@@ -288,44 +290,30 @@ class Scheduler:
        else:    
            pflops, gflop_per_watt = None, None

        if self.current_time % self.config['POWER_UPDATE_FREQ'] == 0:
            if self.cooling_model:

            if self.current_time % self.config['FMU_UPDATE_FREQ'] == 0:
                # Power for NUM_CDUS (25 for Frontier)
                cdu_power = rack_power.T[-1] * 1000
                runtime_values = self.cooling_model.generate_runtime_values(cdu_power, self)
                
                # FMU inputs are N powers and the wetbulb temp
                fmu_inputs = self.cooling_model.generate_fmu_inputs(runtime_values, \
                fmu_inputs = self.cooling_model.generate_fmu_inputs(runtime_values,
                                uncertainties=self.power_manager.uncertainties)
                cooling_inputs, cooling_outputs =\
                    self.cooling_model.step(self.current_time, fmu_inputs, self.config['FMU_UPDATE_FREQ'])
                cooling_inputs, cooling_outputs = (
                    self.cooling_model.step(self.current_time, fmu_inputs, self.config['POWER_UPDATE_FREQ'])
                )
                
                # Get a dataframe of the power data
                power_df = self.power_manager.get_power_df(rack_power, rack_loss)

                if self.layout_manager:
                    self.layout_manager.update_powertemp_array(power_df, \
                               cooling_outputs, pflops, gflop_per_watt, \
                               system_util, uncertainties=self.power_manager.uncertainties)
                    self.layout_manager.update_pressflow_array(cooling_outputs)

        if self.current_time % self.config['UI_UPDATE_FREQ'] == 0:
            else:
                # Get a dataframe of the power data
                power_df = self.power_manager.get_power_df(rack_power, rack_loss)

            if self.layout_manager and not self.debug:
                self.layout_manager.update_scheduled_jobs(self.running + self.queue)
                self.layout_manager.update_status(self.current_time, len(self.running),
                                              len(self.queue), self.num_active_nodes,
                                              self.num_free_nodes, self.down_nodes[1:])
                self.layout_manager.update_power_array(power_df, pflops, gflop_per_watt, \
                                    system_util, uncertainties=self.power_manager.uncertainties)
                self.layout_manager.render()

        tick_data = TickData(
            current_time = self.current_time,
            jobs = completed_jobs + self.running + self.queue,
            completed = completed_jobs,
            running = self.running,
            queue =  self.queue,
            down_nodes = expand_ranges(self.down_nodes[1:]),
            power_df = power_df,
            p_flops = pflops,
@@ -333,6 +321,8 @@ class Scheduler:
            system_util = system_util,
            fmu_inputs = cooling_inputs,
            fmu_outputs = cooling_outputs,
            num_active_nodes =  self.num_active_nodes,
            num_free_nodes = self.num_free_nodes,
        )

        self.current_time += 1
@@ -361,7 +351,7 @@ class Scheduler:
            limits = self.get_gauge_limits()
            print(limits)
        
        for _ in range(timesteps):
        for timestep in range(timesteps):
            while self.current_time >= last_submit_time and jobs:
                job = jobs.pop(0)
                self.schedule([job])
@@ -375,15 +365,9 @@ class Scheduler:
            if not self.queue and not self.running and not self.replay:
                print("stopping simulation at time", self.current_time)
                break
            if self.debug:
                if _ % self.config['UI_UPDATE_FREQ'] == 0:
            if self.debug and timestep % self.config['UI_UPDATE_FREQ'] == 0:
                    print(".", end="", flush=True)

    def run_simulation_blocking(self, jobs, timesteps):
        """ Calls run_simulation and blocks until it is complete """
        for _ in self.run_simulation(jobs, timesteps):
            pass

    def get_stats(self):
        """ Return output statistics """
        sum_values = lambda values : sum(x[1] for x in values)
+30 −1
Original line number Diff line number Diff line
@@ -7,10 +7,12 @@ from rich.panel import Panel
from rich.table import Table
from .utils import summarize_ranges, convert_seconds
from .constants import ELLIPSES
from .scheduler import TickData, Scheduler


class LayoutManager:
    def __init__(self, layout_type, debug, **config):
    def __init__(self, layout_type, scheduler: Scheduler, debug, **config):
        self.scheduler = scheduler
        self.config = config
        self.console = Console()
        self.layout = Layout()
@@ -369,7 +371,34 @@ class LayoutManager:

            self.layout["lower"].update(Panel(Align(total_table, align="center"), title="Power and Performance"))

    def update(self, data: TickData):
        uncertainties = self.scheduler.power_manager.uncertainties

        if self.scheduler.cooling_model:
            self.update_powertemp_array(
                data.power_df, data.fmu_outputs, data.p_flops, data.g_flops_w, data.system_util,
                uncertainties = uncertainties,
            )
            self.update_pressflow_array(data.fmu_outputs)

        self.update_scheduled_jobs(data.running + data.queue)
        self.update_status(
            data.current_time, len(data.running), len(data.queue), data.num_active_nodes,
            data.num_free_nodes, data.down_nodes,
        )
        self.update_power_array(
            data.power_df, data.p_flops, data.g_flops_w,
            data.system_util, uncertainties = uncertainties,
        )

    def render(self):
        if not self.debug:
            self.console.clear()
            self.console.print(self.layout)

    def run(self, jobs, timesteps):
        """ Runs the UI, blocking until the simulation is complete """
        for data in self.scheduler.run_simulation(jobs, timesteps):
            if data.current_time % self.config['UI_UPDATE_FREQ'] == 0:
                self.update(data)
                self.render()