Commit 155b3ed7 authored by Brewer, Wes's avatar Brewer, Wes
Browse files

hetero.py working now for random and peak workloads

parent 69e16d0b
Loading
Loading
Loading
Loading
+7 −5
Original line number Diff line number Diff line
@@ -17,9 +17,7 @@ from raps.workload import Workload
from raps.utils import convert_to_seconds

config1 = ConfigManager(system_name='setonix-cpu').get_config()
print(config1['system_name'])
config2 = ConfigManager(system_name='setonix-gpu').get_config()
print(config2['system_name'])

pm1 = PowerManager(compute_node_power, **config1)
pm2 = PowerManager(compute_node_power, **config2)
@@ -36,8 +34,13 @@ sc2 = Scheduler(power_manager=pm1, flops_manager=fm2, cooling_model=None, **args
layout_manager1 = LayoutManager(args.layout, scheduler=sc1, debug=args.debug, **config1)
layout_manager2 = LayoutManager(args.layout, scheduler=sc2, debug=args.debug, **config2)

wl = Workload(**config1)
print(config1)
print(config2)
configs = [config1, config2]
wl = Workload(*configs)

jobs = getattr(wl, args.workload)(num_jobs=args.numjobs)
print(jobs)

# Separate jobs based on partition
jobs1 = [job for job in jobs if job['partition'] == 'setonix-cpu']
@@ -52,8 +55,7 @@ if args.time:
else:
    timesteps = 88200 # 24 hours

if args.verbose:
    print(jobs)
if args.verbose: print(jobs)

# Create generator objects for both partitions
gen1 = layout_manager1.run_nonblocking(jobs1, timesteps=timesteps)
+1 −1
Original line number Diff line number Diff line
@@ -106,7 +106,7 @@ if args.replay:
    time.sleep(1)

else:
    wl = Workload(**config)
    wl = Workload(config)
    jobs = getattr(wl, args.workload)(num_jobs=args.numjobs)

    if args.verbose:
+3 −1
Original line number Diff line number Diff line
@@ -91,6 +91,7 @@ class Scheduler:
        self.jobs_completed = 0
        self.current_time = 0
        self.cooling_model = cooling_model
        self.sys_power = 0
        self.power_manager = power_manager
        self.flops_manager = flops_manager
        self.debug = kwargs.get('debug')
@@ -283,6 +284,7 @@ class Scheduler:
            total_power_kw = sum(row[-1] for row in rack_power) + self.config['NUM_CDUS'] * self.config['POWER_CDU'] / 1000.0
            total_loss_kw = sum(row[-1] for row in rack_loss)
            self.power_manager.history.append((self.current_time, total_power_kw))
            self.sys_power = total_power_kw
            self.power_manager.loss_history.append((self.current_time, total_loss_kw))
            output_df = self.power_manager.get_power_df(rack_power, rack_loss)
            pflops = self.flops_manager.get_system_performance() / 1E15
@@ -355,7 +357,7 @@ class Scheduler:
            # Print the current timestep for this partition
            if timestep % self.config['UI_UPDATE_FREQ'] == 0:
                sys_util = self.sys_util_history[-1][1] if self.sys_util_history else 0
                print(f"[DEBUG] {self.config['system_name']} - Timestep {timestep} - Jobs in queue: {len(self.queue)} - Utilization: {sys_util:.1f}%")
                print(f"[DEBUG] {self.config['system_name']} - Timestep {timestep} - Jobs in queue: {len(self.queue)} - Utilization: {sys_util:.1f}% - Power: {self.sys_power:.1f} kW")

            while self.current_time >= last_submit_time and jobs:
                job = jobs.pop(0)
+62 −33
Original line number Diff line number Diff line
@@ -43,40 +43,47 @@ MAX_PRIORITY = 500000
from .utils import truncated_normalvariate, determine_state, next_arrival


class Workload(object):
    """ This class is responsible for generating random workload traces and jobs. """

    def __init__(self, **config):
        self.config = config
class Workload:
    def __init__(self, *configs):
        """
        Initialize Workload with multiple configurations.
        Args:
            *configs: Variable number of configurations for each partition.
        """
        #self.partitions = [config['system_name'] for config in configs]  # Extract system names
        #self.configs = configs
        self.partitions = [config['system_name'] for config in configs]
        self.config_map = {config['system_name']: config for config in configs}

    def compute_traces(self, cpu_util: float, gpu_util: float, wall_time: int) -> tuple[np.ndarray, np.ndarray]:
    def compute_traces(self, cpu_util: float, gpu_util: float, wall_time: int, trace_quanta: int) -> tuple[np.ndarray, np.ndarray]:
        """ Compute CPU and GPU traces based on mean CPU & GPU utilizations and wall time. """
        cpu_trace = cpu_util * np.ones(int(wall_time) // self.config['TRACE_QUANTA'])
        gpu_trace = gpu_util * np.ones(int(wall_time) // self.config['TRACE_QUANTA'])
        cpu_trace = cpu_util * np.ones(int(wall_time) // trace_quanta)
        gpu_trace = gpu_util * np.ones(int(wall_time) // trace_quanta)
        return (cpu_trace, gpu_trace)


    def generate_random_jobs(self, num_jobs: int) -> list[list[any]]:
        """ Generate random jobs with specified number of jobs. """
        jobs = []
        for job_index in range(num_jobs):
            nodes_required = random.randint(1, self.config['MAX_NODES_PER_JOB'])
            # Randomly select a partition
            partition = random.choice(self.partitions)
            # Get the corresponding config for the selected partition
            config = self.config_map[partition]

            nodes_required = random.randint(1, config['MAX_NODES_PER_JOB'])
            name = random.choice(JOB_NAMES)
            cpu_util = random.random() * self.config['CPUS_PER_NODE']
            gpu_util = random.random() * self.config['GPUS_PER_NODE']
            mu = (self.config['MAX_WALL_TIME'] + self.config['MIN_WALL_TIME']) / 2
            sigma = (self.config['MAX_WALL_TIME'] - self.config['MIN_WALL_TIME']) / 6
            wall_time = truncated_normalvariate(mu, sigma, self.config['MIN_WALL_TIME'], self.config['MAX_WALL_TIME']) // 3600 * 3600
            end_state = determine_state(self.config['JOB_END_PROBS'])
            cpu_trace, gpu_trace = self.compute_traces(cpu_util, gpu_util, wall_time)
            cpu_util = random.random() * config['CPUS_PER_NODE']
            gpu_util = random.random() * config['GPUS_PER_NODE']
            mu = (config['MAX_WALL_TIME'] + config['MIN_WALL_TIME']) / 2
            sigma = (config['MAX_WALL_TIME'] - config['MIN_WALL_TIME']) / 6
            wall_time = truncated_normalvariate(mu, sigma, config['MIN_WALL_TIME'], config['MAX_WALL_TIME']) // 3600 * 3600
            end_state = determine_state(config['JOB_END_PROBS'])
            cpu_trace, gpu_trace = self.compute_traces(cpu_util, gpu_util, wall_time, config['TRACE_QUANTA'])
            priority = random.randint(0, MAX_PRIORITY)
            net_tx, net_rx = [], []

            # Jobs arrive according to Poisson process
            time_to_next_job = next_arrival(1 / self.config['JOB_ARRIVAL_TIME'])

            VALID_PARTITIONS = ['setonix-cpu', 'setonix-gpu']
            partition = random.choice(VALID_PARTITIONS)
            time_to_next_job = next_arrival(1 / config['JOB_ARRIVAL_TIME'])

            jobs.append(job_dict(nodes_required, name, cpu_trace, gpu_trace, net_tx, net_rx, \
                        wall_time, end_state, None, time_to_next_job, None, priority, partition))
@@ -89,19 +96,41 @@ class Workload(object):
        num_jobs = kwargs.get('num_jobs', 0)
        return self.generate_random_jobs(num_jobs=num_jobs)


    def peak(self, **kwargs):
        """Peak power test"""
        jobs = self.generate_random_jobs(num_jobs=0)
        cpu_util = self.config['CPUS_PER_NODE'], 
        gpu_util = self.config['GPUS_PER_NODE']
        cpu_trace, gpu_trace = self.compute_traces(cpu_util, gpu_util, 10800)
        """Peak power test for multiple partitions"""
        jobs = []

        # Iterate through each partition and get its configuration
        for partition in self.partitions:
            # Fetch the config for the current partition
            config = self.config_map[partition]

            # Generate traces based on partition-specific configuration
            cpu_util = config['CPUS_PER_NODE']
            gpu_util = config['GPUS_PER_NODE']
            cpu_trace, gpu_trace = self.compute_traces(cpu_util, gpu_util, 10800, config['TRACE_QUANTA'])
            net_tx, net_rx = [], []
        job_info = job_dict(self.config['AVAILABLE_NODES'], "Max Test", cpu_trace, gpu_trace, net_tx, net_rx, \
                    len(gpu_trace)*self.config['TRACE_QUANTA'], 'COMPLETED', None, 100, None)
        jobs.insert(0, job_info)
        return jobs

            # Create job info for this partition
            job_info = job_dict(
                config['AVAILABLE_NODES'],       # Nodes required
                f"Max Test {partition}",         # Name with partition label
                cpu_trace,                       # CPU trace
                gpu_trace,                       # GPU trace
                net_tx,                          # Network transmit trace
                net_rx,                          # Network receive trace
                len(gpu_trace) * config['TRACE_QUANTA'],  # Wall time
                'COMPLETED',                     # End state
                None,                            # Scheduled nodes
                0,                               # Time to next job
                1234,                            # Job ID
                100,                             # Priority
                partition                        # Partition name
            )
            print(job_info)
            jobs.append(job_info)  # Add job to the list

        return jobs

    def idle(self, **kwargs):
        """Idle power test"""