Loading raps/dataloaders/lassen.py +55 −9 Original line number Diff line number Diff line Loading @@ -29,16 +29,20 @@ import math import numpy as np import os import pandas as pd from tqdm import tqdm try: from ..config import load_config_variables from ..job import job_dict from ..utils import power_to_utilization, next_arrival from tqdm import tqdm load_config_variables(['TRACE_QUANTA', 'CPUS_PER_NODE', 'GPUS_PER_NODE', 'POWER_GPU_IDLE', 'POWER_GPU_MAX', 'POWER_CPU_IDLE', 'POWER_CPU_MAX', 'POWER_MEM', 'POWER_NIC', 'POWER_NVME', 'POWER_CDU', 'POWER_SWITCH', 'CORES_PER_CPU', 'NICS_PER_NODE'], globals()) except: pass def load_data(path, **kwargs): Loading Loading @@ -113,6 +117,13 @@ def load_data_from_df(allocation_df, node_df, step_df, **kwargs): # Only Node Energy and GPU Energy is reported! # total_energy = node_data['energy'].sum() # Joules # Network utilization lambda_poisson = 0.3 ib_tx, ib_rx = node_data['ib_tx'], node_data['ib_rx'] net_tx, net_rx = generate_network_sequences(ib_tx, ib_rx, samples, lambda_poisson) print(net_tx) print(net_rx) if reschedule: # Let the scheduler reschedule the jobs scheduled_nodes = None time_offset = next_arrival() Loading @@ -126,7 +137,7 @@ def load_data_from_df(allocation_df, node_df, step_df, **kwargs): job_info = job_dict(nodes_required, row['hashed_user_id'], cpu_trace, gpu_trace, wall_time, cpu_trace, gpu_trace, net_tx, net_rx, wall_time, row['exit_status'], scheduled_nodes, time_offset, Loading Loading @@ -163,3 +174,38 @@ def compute_time_offset(begin_time, reference_time): """ time_offset = pd.to_datetime(begin_time) - reference_time return int(time_offset.total_seconds()) def generate_network_sequences(total_tx, total_rx, intervals, lambda_poisson): # Generate sporadic bursts using a Poisson distribution (shared for both tx and rx) burst_intervals = np.random.poisson(lam=lambda_poisson, size=intervals) print('**', burst_intervals) # Ensure some intervals have no traffic (both tx and rx will share zero intervals) burst_intervals = np.where(burst_intervals > 0, burst_intervals, 0) # For tx tx_bursts = burst_intervals / np.sum(burst_intervals) * total_tx tx_bursts = np.round(tx_bursts).astype(int) tx_adjustment = total_tx - np.sum(tx_bursts) tx_bursts[0] += tx_adjustment # Adjust for rounding # For rx rx_bursts = burst_intervals / np.sum(burst_intervals) * total_rx rx_bursts = np.round(rx_bursts).astype(int) rx_adjustment = total_rx - np.sum(rx_bursts) rx_bursts[0] += rx_adjustment # Adjust for rounding return tx_bursts, rx_bursts if __name__ == "__main__": # Example usage total_ib_tx = 720 # total transmitted bytes total_ib_rx = 480 # total received bytes intervals = 20 # number of 20-second intervals lambda_poisson = 0.3 # control sporadicity tx_sequence, rx_sequence = generate_ib_tx_rx_sequences(total_ib_tx, total_ib_rx, intervals, lambda_poisson) print(tx_sequence, rx_sequence) raps/job.py +2 −0 Original line number Diff line number Diff line Loading @@ -8,6 +8,8 @@ def job_dict(nodes_required, name, cpu_trace, gpu_trace, wall_time, \ 'name': name, 'cpu_trace': cpu_trace, 'gpu_trace': gpu_trace, 'ntx_trace': net_trace, 'nrx_trace': net_trace, 'wall_time': wall_time, 'end_state': end_state, 'requested_nodes': scheduled_nodes, Loading Loading
raps/dataloaders/lassen.py +55 −9 Original line number Diff line number Diff line Loading @@ -29,16 +29,20 @@ import math import numpy as np import os import pandas as pd from tqdm import tqdm try: from ..config import load_config_variables from ..job import job_dict from ..utils import power_to_utilization, next_arrival from tqdm import tqdm load_config_variables(['TRACE_QUANTA', 'CPUS_PER_NODE', 'GPUS_PER_NODE', 'POWER_GPU_IDLE', 'POWER_GPU_MAX', 'POWER_CPU_IDLE', 'POWER_CPU_MAX', 'POWER_MEM', 'POWER_NIC', 'POWER_NVME', 'POWER_CDU', 'POWER_SWITCH', 'CORES_PER_CPU', 'NICS_PER_NODE'], globals()) except: pass def load_data(path, **kwargs): Loading Loading @@ -113,6 +117,13 @@ def load_data_from_df(allocation_df, node_df, step_df, **kwargs): # Only Node Energy and GPU Energy is reported! # total_energy = node_data['energy'].sum() # Joules # Network utilization lambda_poisson = 0.3 ib_tx, ib_rx = node_data['ib_tx'], node_data['ib_rx'] net_tx, net_rx = generate_network_sequences(ib_tx, ib_rx, samples, lambda_poisson) print(net_tx) print(net_rx) if reschedule: # Let the scheduler reschedule the jobs scheduled_nodes = None time_offset = next_arrival() Loading @@ -126,7 +137,7 @@ def load_data_from_df(allocation_df, node_df, step_df, **kwargs): job_info = job_dict(nodes_required, row['hashed_user_id'], cpu_trace, gpu_trace, wall_time, cpu_trace, gpu_trace, net_tx, net_rx, wall_time, row['exit_status'], scheduled_nodes, time_offset, Loading Loading @@ -163,3 +174,38 @@ def compute_time_offset(begin_time, reference_time): """ time_offset = pd.to_datetime(begin_time) - reference_time return int(time_offset.total_seconds()) def generate_network_sequences(total_tx, total_rx, intervals, lambda_poisson): # Generate sporadic bursts using a Poisson distribution (shared for both tx and rx) burst_intervals = np.random.poisson(lam=lambda_poisson, size=intervals) print('**', burst_intervals) # Ensure some intervals have no traffic (both tx and rx will share zero intervals) burst_intervals = np.where(burst_intervals > 0, burst_intervals, 0) # For tx tx_bursts = burst_intervals / np.sum(burst_intervals) * total_tx tx_bursts = np.round(tx_bursts).astype(int) tx_adjustment = total_tx - np.sum(tx_bursts) tx_bursts[0] += tx_adjustment # Adjust for rounding # For rx rx_bursts = burst_intervals / np.sum(burst_intervals) * total_rx rx_bursts = np.round(rx_bursts).astype(int) rx_adjustment = total_rx - np.sum(rx_bursts) rx_bursts[0] += rx_adjustment # Adjust for rounding return tx_bursts, rx_bursts if __name__ == "__main__": # Example usage total_ib_tx = 720 # total transmitted bytes total_ib_rx = 480 # total received bytes intervals = 20 # number of 20-second intervals lambda_poisson = 0.3 # control sporadicity tx_sequence, rx_sequence = generate_ib_tx_rx_sequences(total_ib_tx, total_ib_rx, intervals, lambda_poisson) print(tx_sequence, rx_sequence)
raps/job.py +2 −0 Original line number Diff line number Diff line Loading @@ -8,6 +8,8 @@ def job_dict(nodes_required, name, cpu_trace, gpu_trace, wall_time, \ 'name': name, 'cpu_trace': cpu_trace, 'gpu_trace': gpu_trace, 'ntx_trace': net_trace, 'nrx_trace': net_trace, 'wall_time': wall_time, 'end_state': end_state, 'requested_nodes': scheduled_nodes, Loading