Commit c588da57 authored by Brewer, Wes's avatar Brewer, Wes
Browse files

hetero.py working now

parent f768f17b
Loading
Loading
Loading
Loading
+31 −234
Original line number Diff line number Diff line
""" Shortest-job first (SJF) job schedule simulator """
from raps.helpers import check_python_version
check_python_version()

import argparse
import json
import numpy as np
import random
import pandas as pd
import os
import re
import sys
import time
import threading

from tqdm import tqdm
from raps.policy import PolicyType

# Check for the required Python version
required_major, required_minor = 3, 9

if sys.version_info < (required_major, required_minor):
    sys.stderr.write(f"Error: RAPS requires Python {required_major}.{required_minor} or greater\n")
    sys.exit(1)

parser = argparse.ArgumentParser(description='Resource Allocator & Power Simulator (RAPS)')
parser.add_argument('-c', '--cooling', action='store_true', help='Include FMU cooling model')
parser.add_argument('--start', type=str, help='ISO8061 string for start of simulation')
parser.add_argument('--end', type=str, help='ISO8061 string for end of simulation')
parser.add_argument('-d', '--debug', action='store_true', help='Enable debug mode and disable rich layout')
parser.add_argument('-e', '--encrypt', action='store_true', help='Encrypt any sensitive data in telemetry')
parser.add_argument('-n', '--numjobs', type=int, default=1000, help='Number of jobs to schedule')
parser.add_argument('-t', '--time', type=str, default=None, help='Length of time to simulate, e.g., 123, 123s, 27m, 3h, 7d')
parser.add_argument('-ff', '--fastforward', type=str, default=None, help='Fast-forward by time amount (uses same units as -t)')
parser.add_argument('-v', '--verbose', action='store_true', help='Enable verbose output')
parser.add_argument('--seed', action='store_true', help='Set random number seed for deterministic simulation')
parser.add_argument('-f', '--replay', nargs='+', type=str, help='Either: path/to/joblive path/to/jobprofile' + \
                                                                ' -or- filename.npz (overrides --workload option)')
parser.add_argument('--reschedule', action='store_true', help='Reschedule the telemetry workload')
parser.add_argument('-u', '--uncertainties', action='store_true',
                    help='Change from floating point units to floating point units with uncertainties.' + \
                                                                ' Very expensive w.r.t simulation time!')
parser.add_argument('--jid', type=str, default='*', help='Replay job id')
parser.add_argument('--validate', action='store_true', help='Use node power instead of CPU/GPU utilizations')
parser.add_argument('-o', '--output', action='store_true', help='Output power, cooling, and loss models for later analysis')
parser.add_argument('-p', '--plot', nargs='+', choices=['power', 'loss', 'pue', 'temp', 'util'],
                    help='Specify one or more types of plots to generate: power, loss, pue, util, temp')
choices = ['png', 'svg', 'jpg', 'pdf', 'eps']
parser.add_argument('--imtype', type=str, choices=choices, default=choices[0], help='Plot image type')
parser.add_argument('--system', type=str, default='frontier', help='System config to use')
choices = [policy.value for policy in PolicyType]
parser.add_argument('-s', '--schedule', type=str, choices=choices, default=choices[0], help='Schedule policy to use')
choices = ['random', 'benchmark', 'peak', 'idle']
parser.add_argument('-w', '--workload', type=str, choices=choices, default=choices[0], help='Type of synthetic workload')
choices = ['layout1', 'layout2']
parser.add_argument('--layout', type=str, choices=choices, default=choices[0], help='Layout of UI')
args = parser.parse_args()
args_dict1 = vars(args)
args_dict2 = vars(args)
from args import args
import copy
args_dict1 = copy.deepcopy(vars(args))
args_dict2 = copy.deepcopy(vars(args))
print(args_dict1)
print(args_dict2)

from raps.config import ConfigManager
from raps.constants import OUTPUT_PATH
from raps.cooling import ThermoFluidsModel
from raps.ui import LayoutManager
from raps.flops import FLOPSManager
from raps.plotting import Plotter
from raps.power import PowerManager, compute_node_power, compute_node_power_validate
from raps.power import compute_node_power_uncertainties, compute_node_power_validate_uncertainties
from raps.scheduler import Scheduler, Job
from raps.telemetry import Telemetry
from raps.power import PowerManager, compute_node_power
from raps.scheduler import Scheduler
from raps.workload import Workload
from raps.weather import Weather
from raps.utils import create_casename, convert_to_seconds, write_dict_to_file, next_arrival
from raps.utils import convert_to_seconds

#config = ConfigManager(system_name=args.system).get_config()
config1 = ConfigManager(system_name='setonix-cpu').get_config()
print(config1['system_name'])
config2 = ConfigManager(system_name='setonix-gpu').get_config()
print(config2['system_name'])


if args.seed:
    random.seed(SEED)
    np.random.seed(SEED)

if args.cooling:
    cooling_model = ThermoFluidsModel(**config)
    cooling_model.initialize()
    args.layout = "layout2"

    if args_dict['start']:
        cooling_model.weather = Weather(args_dict['start'], config = config)
else:
    cooling_model = None

cpu_power_manager = PowerManager(compute_node_power, **config1)
gpu_power_manager = PowerManager(compute_node_power, **config2)
pm1 = PowerManager(compute_node_power, **config1)
pm2 = PowerManager(compute_node_power, **config2)

args_dict1['config'] = config1
args_dict2['config'] = config2

cpu_flops_manager = FLOPSManager(**args_dict1)
gpu_flops_manager = FLOPSManager(**args_dict2)
fm1 = FLOPSManager(**args_dict1)
fm2 = FLOPSManager(**args_dict2)

sc1 = Scheduler(power_manager=cpu_power_manager, flops_manager=cpu_flops_manager, cooling_model=None, **args_dict1)
sc2 = Scheduler(power_manager=gpu_power_manager, flops_manager=gpu_flops_manager, cooling_model=None, **args_dict2)
sc1 = Scheduler(power_manager=pm1, flops_manager=fm1, cooling_model=None, **args_dict1)
sc2 = Scheduler(power_manager=pm1, flops_manager=fm2, cooling_model=None, **args_dict2)

layout_manager1 = LayoutManager(args.layout, scheduler=sc1, debug=args.debug, **config1)
layout_manager2 = LayoutManager(args.layout, scheduler=sc2, debug=args.debug, **config2)

if args.replay:

    if args.fastforward: args.fastforward = convert_to_seconds(args.fastforward)

    td = Telemetry(**args_dict)

    # Try to extract date from given name to use as case directory
    matched_date = re.search(r"\d{4}-\d{2}-\d{2}", args.replay[0])
    if matched_date:
        extracted_date = matched_date.group(0)
        DIR_NAME = "sim=" + extracted_date
    else:
        extracted_date = "Date not found"
        DIR_NAME = create_casename()

    # Read either npz file or telemetry parquet files
    if args.replay[0].endswith(".npz"):
        print(f"Loading {args.replay[0]}...")
        jobs = td.load_snapshot(args.replay[0])
        if args.reschedule:
            for job in tqdm(jobs, desc="Updating requested_nodes"):
                job['requested_nodes'] = None
                job['submit_time'] = next_arrival()
    else:
        print(*args.replay)
        jobs = td.load_data(args.replay)
        td.save_snapshot(jobs, filename=DIR_NAME)

    # Set number of timesteps based on the last job running which we assume
    # is the maximum value of submit_time + wall_time of all the jobs
    if args.time:
        timesteps = convert_to_seconds(args.time)
    else:
        timesteps = int(max(job['wall_time'] + job['submit_time'] for job in jobs)) + 1

    print(f'Simulating {len(jobs)} jobs for {timesteps} seconds')
    time.sleep(1)

else:
wl = Workload(**config1)
jobs = getattr(wl, args.workload)(num_jobs=args.numjobs)

    if args.verbose:
        for job_vector in jobs:
            job = Job(job_vector, 0)
            print('jobid:', job.id, '\tlen(gpu_trace):', len(job.gpu_trace), '\twall_time(s):', job.wall_time)
        time.sleep(2)
#print(jobs)
#exit()

if args.time:
    timesteps = convert_to_seconds(args.time)
else:
    timesteps = 88200 # 24 hours

    DIR_NAME = create_casename()

OPATH = OUTPUT_PATH / DIR_NAME
print("Output directory is: ", OPATH)
sc1.opath = OPATH
sc2.opath = OPATH

if args.plot or args.output:
    try:
        os.makedirs(OPATH)
    except OSError as error:
        print(f"Error creating directory: {error}")

if args.verbose:
    print(jobs)

layout_manager1.run(jobs, timesteps=timesteps)
layout_manager2.run(jobs, timesteps=timesteps)

output_stats = sc.get_stats()
# Following b/c we get the following error when we use PM100 telemetry dataset
# TypeError: Object of type int64 is not JSON serializable
try:
    print(json.dumps(output_stats, indent=4))
except:
    print(output_stats)

if args.plot:
    if 'power' in args.plot:
        pl = Plotter('Time (s)', 'Power (kW)', 'Power History', \
                     OPATH / f'power.{args.imtype}', \
                     uncertainties=args.uncertainties)
        x, y = zip(*power_manager.history)
        pl.plot_history(x, y)

    if 'util' in args.plot:
        pl = Plotter('Time (s)', 'System Utilization (%)', \
                     'System Utilization History', OPATH / f'util.{args.imtype}')
        x, y = zip(*sc.sys_util_history)
        pl.plot_history(x, y)

    if 'loss' in args.plot:
        pl = Plotter('Time (s)', 'Power Losses (kW)', 'Power Loss History', \
                     OPATH / f'loss.{args.imtype}', \
                     uncertainties=args.uncertainties)
        x, y = zip(*power_manager.loss_history)
        pl.plot_history(x, y)

        pl = Plotter('Time (s)', 'Power Losses (%)', 'Power Loss History', \
                     OPATH / f'loss_pct.{args.imtype}', \
                     uncertainties=args.uncertainties)
        x, y = zip(*power_manager.loss_history_percentage)
        pl.plot_history(x, y)

    if 'pue' in args.plot:
        if cooling_model:
            ylabel = 'PUE_Out[1]'
            title = 'FMU ' + ylabel + 'History'
            pl = Plotter('Time (s)', ylabel, title, OPATH / f'pue.{args.imtype}', \
                         uncertainties=args.uncertainties)
            df = pd.DataFrame(cooling_model.fmu_history)
            df.to_parquet('cooling_model.parquet', engine='pyarrow')
            pl.plot_history(df['time'], df[ylabel])
        else:
            print('Cooling model not enabled... skipping output of plot')

    if 'temp' in args.plot:
        if cooling_model:
            ylabel = 'Tr_pri_Out[1]'
            title = 'FMU ' + ylabel + 'History'
            pl = Plotter('Time (s)', ylabel, title, OPATH / 'temp.svg')
            df = pd.DataFrame(cooling_model.fmu_history)
            df.to_parquet('cooling_model.parquet', engine='pyarrow')
            pl.plot_compare(df['time'], df[ylabel])
        else:
            print('Cooling model not enabled... skipping output of plot')

if args.output:

    if args.uncertainties:
        # Parquet cannot handle annotated ufloat format AFAIK
        print('Data dump not implemented using uncertainties!')  
    else:
        if cooling_model:
            df = pd.DataFrame(cooling_model.fmu_history)
            df.to_parquet(OPATH / 'cooling_model.parquet', engine='pyarrow')

        df = pd.DataFrame(power_manager.history)
        df.to_parquet(OPATH / 'power_history.parquet', engine='pyarrow')

        df = pd.DataFrame(power_manager.loss_history)
        df.to_parquet(OPATH / 'loss_history.parquet', engine='pyarrow')

        df = pd.DataFrame(sc.sys_util_history)
        df.to_parquet(OPATH / 'util.parquet', engine='pyarrow')
# Create generator objects for both partitions
gen1 = layout_manager1.run_nonblocking(jobs, timesteps=timesteps)
gen2 = layout_manager2.run_nonblocking(jobs, timesteps=timesteps)

        try:
            with open(OPATH / 'stats.out', 'w') as f:
                json.dump(output_stats, f, indent=4)
        except:
            write_dict_to_file(output_stats, OPATH / 'stats.out')
# Step through both generators in lockstep
for _ in range(timesteps):
    next(gen1)  # Advance first scheduler
    next(gen2)  # Advance second scheduler
+6 −44
Original line number Diff line number Diff line
""" Shortest-job first (SJF) job schedule simulator """

import argparse
import json
import numpy as np
import random
import pandas as pd
import os
import re
import sys
import time

from tqdm import tqdm
from raps.policy import PolicyType

# Check for the required Python version
required_major, required_minor = 3, 9

if sys.version_info < (required_major, required_minor):
    sys.stderr.write(f"Error: RAPS requires Python {required_major}.{required_minor} or greater\n")
    sys.exit(1)

parser = argparse.ArgumentParser(description='Resource Allocator & Power Simulator (RAPS)')
parser.add_argument('-c', '--cooling', action='store_true', help='Include FMU cooling model')
parser.add_argument('--start', type=str, help='ISO8061 string for start of simulation')
parser.add_argument('--end', type=str, help='ISO8061 string for end of simulation')
parser.add_argument('-d', '--debug', action='store_true', help='Enable debug mode and disable rich layout')
parser.add_argument('-e', '--encrypt', action='store_true', help='Encrypt any sensitive data in telemetry')
parser.add_argument('-n', '--numjobs', type=int, default=1000, help='Number of jobs to schedule')
parser.add_argument('-t', '--time', type=str, default=None, help='Length of time to simulate, e.g., 123, 123s, 27m, 3h, 7d')
parser.add_argument('-ff', '--fastforward', type=str, default=None, help='Fast-forward by time amount (uses same units as -t)')
parser.add_argument('-v', '--verbose', action='store_true', help='Enable verbose output')
parser.add_argument('--seed', action='store_true', help='Set random number seed for deterministic simulation')
parser.add_argument('-f', '--replay', nargs='+', type=str, help='Either: path/to/joblive path/to/jobprofile' + \
                                                                ' -or- filename.npz (overrides --workload option)')
parser.add_argument('--reschedule', action='store_true', help='Reschedule the telemetry workload')
parser.add_argument('-u', '--uncertainties', action='store_true',
                    help='Change from floating point units to floating point units with uncertainties.' + \
                                                                ' Very expensive w.r.t simulation time!')
parser.add_argument('--jid', type=str, default='*', help='Replay job id')
parser.add_argument('--validate', action='store_true', help='Use node power instead of CPU/GPU utilizations')
parser.add_argument('-o', '--output', action='store_true', help='Output power, cooling, and loss models for later analysis')
parser.add_argument('-p', '--plot', nargs='+', choices=['power', 'loss', 'pue', 'temp', 'util'],
                    help='Specify one or more types of plots to generate: power, loss, pue, util, temp')
choices = ['png', 'svg', 'jpg', 'pdf', 'eps']
parser.add_argument('--imtype', type=str, choices=choices, default=choices[0], help='Plot image type')
parser.add_argument('--system', type=str, default='frontier', help='System config to use')
choices = [policy.value for policy in PolicyType]
parser.add_argument('-s', '--schedule', type=str, choices=choices, default=choices[0], help='Schedule policy to use')
choices = ['random', 'benchmark', 'peak', 'idle']
parser.add_argument('-w', '--workload', type=str, choices=choices, default=choices[0], help='Type of synthetic workload')
choices = ['layout1', 'layout2']
parser.add_argument('--layout', type=str, choices=choices, default=choices[0], help='Layout of UI')
args = parser.parse_args()

from raps.helpers import check_python_version
check_python_version()

from args import args
args_dict = vars(args)
print(args_dict)

from raps.config import ConfigManager
from raps.constants import OUTPUT_PATH
from raps.constants import OUTPUT_PATH, SEED
from raps.cooling import ThermoFluidsModel
from raps.ui import LayoutManager
from raps.flops import FLOPSManager
+2 −0
Original line number Diff line number Diff line
@@ -10,6 +10,7 @@ class ConfigManager:
    def __init__(self, system_name: str):
        self.config: Dict[str, Any] = {}
        self.load_system_config(system_name)
        self.system_name = system_name
        self.derive_values()

    def load_system_config(self, system_name: str) -> None:
@@ -45,6 +46,7 @@ class ConfigManager:
        self.config['SC_SHAPE'] = [num_cdus, racks_per_cdu, nodes_per_rack]
        self.config['TOTAL_NODES'] = num_cdus * racks_per_cdu * nodes_per_rack
        self.config['BLADES_PER_CHASSIS'] = int(nodes_per_rack / chassis_per_rack / nodes_per_blade)
        self.config['system_name'] = self.system_name

        # Generate POWER_DF_HEADER
        power_df_header = ["CDU"]
+1 −0
Original line number Diff line number Diff line
@@ -5,3 +5,4 @@ from pathlib import Path

ELLIPSES = '\u2026'
OUTPUT_PATH = Path('simulation_results')
SEED = 42

raps/helpers.py

0 → 100644
+9 −0
Original line number Diff line number Diff line
import sys

def check_python_version():
    # Check for the required Python version
    required_major, required_minor = 3, 9

    if sys.version_info < (required_major, required_minor):
        sys.stderr.write(f"Error: RAPS requires Python {required_major}.{required_minor} or greater\n")
        sys.exit(1)
Loading