Loading raps/dataloaders/lassen.py +29 −16 Original line number Diff line number Diff line Loading @@ -63,6 +63,7 @@ def load_data_from_df(allocation_df, node_df, step_df, **kwargs): jid = kwargs.get('jid', '*') reschedule = kwargs.get('reschedule') fastforward = kwargs.get('fastforward') verbose = kwargs.get('verbose') if fastforward: print(f"fast-forwarding {fastforward} seconds") Loading Loading @@ -117,11 +118,15 @@ def load_data_from_df(allocation_df, node_df, step_df, **kwargs): # Only Node Energy and GPU Energy is reported! # total_energy = node_data['energy'].sum() # Joules # Network utilization ib_tx = node_data['ib_tx'].values[0] if node_data['ib_tx'].values.size > 0 else [] ib_rx = node_data['ib_rx'].values[0] if node_data['ib_rx'].values.size > 0 else [] # Network utilization - since values are given in octets / quarter of a byte, multiply by 4 to get bytes ib_tx = 4 * node_data['ib_tx'].values[0] if node_data['ib_tx'].values.size > 0 else [] ib_rx = 4 * node_data['ib_rx'].values[0] if node_data['ib_rx'].values.size > 0 else [] net_tx, net_rx = generate_network_sequences(ib_tx, ib_rx, samples, lambda_poisson=0.3) if verbose: print('ib_tx, ib_rx, samples:', ib_tx, ib_rx, samples) print('tx:', net_tx) print('rx:', net_rx) if reschedule: # Let the scheduler reschedule the jobs scheduled_nodes = None Loading Loading @@ -175,9 +180,24 @@ def compute_time_offset(begin_time, reference_time): return int(time_offset.total_seconds()) def adjust_bursts(burst_intervals, total, intervals): bursts = burst_intervals / np.sum(burst_intervals) * total bursts = np.round(bursts).astype(int) adjustment = total - np.sum(bursts) # Distribute adjustment across non-zero elements to avoid negative values if adjustment != 0: for i in range(len(bursts)): if bursts[i] > 0: bursts[i] += adjustment break # Apply adjustment only once where it won't cause a negative return bursts def generate_network_sequences(total_tx, total_rx, intervals, lambda_poisson): if not total_tx or not total_rx: return [], [] if not total_tx or not total_rx: return [], [] # Generate sporadic bursts using a Poisson distribution (shared for both tx and rx) burst_intervals = np.random.poisson(lam=lambda_poisson, size=intervals) Loading @@ -185,21 +205,14 @@ def generate_network_sequences(total_tx, total_rx, intervals, lambda_poisson): # Ensure some intervals have no traffic (both tx and rx will share zero intervals) burst_intervals = np.where(burst_intervals > 0, burst_intervals, 0) # For tx tx_bursts = burst_intervals / np.sum(burst_intervals) * total_tx tx_bursts = np.round(tx_bursts).astype(int) tx_adjustment = total_tx - np.sum(tx_bursts) tx_bursts[0] += tx_adjustment # Adjust for rounding # For rx rx_bursts = burst_intervals / np.sum(burst_intervals) * total_rx rx_bursts = np.round(rx_bursts).astype(int) rx_adjustment = total_rx - np.sum(rx_bursts) rx_bursts[0] += rx_adjustment # Adjust for rounding # Adjust bursts for both tx and rx tx_bursts = adjust_bursts(burst_intervals, total_tx, intervals) rx_bursts = adjust_bursts(burst_intervals, total_rx, intervals) return tx_bursts, rx_bursts if __name__ == "__main__": # Example usage Loading Loading
raps/dataloaders/lassen.py +29 −16 Original line number Diff line number Diff line Loading @@ -63,6 +63,7 @@ def load_data_from_df(allocation_df, node_df, step_df, **kwargs): jid = kwargs.get('jid', '*') reschedule = kwargs.get('reschedule') fastforward = kwargs.get('fastforward') verbose = kwargs.get('verbose') if fastforward: print(f"fast-forwarding {fastforward} seconds") Loading Loading @@ -117,11 +118,15 @@ def load_data_from_df(allocation_df, node_df, step_df, **kwargs): # Only Node Energy and GPU Energy is reported! # total_energy = node_data['energy'].sum() # Joules # Network utilization ib_tx = node_data['ib_tx'].values[0] if node_data['ib_tx'].values.size > 0 else [] ib_rx = node_data['ib_rx'].values[0] if node_data['ib_rx'].values.size > 0 else [] # Network utilization - since values are given in octets / quarter of a byte, multiply by 4 to get bytes ib_tx = 4 * node_data['ib_tx'].values[0] if node_data['ib_tx'].values.size > 0 else [] ib_rx = 4 * node_data['ib_rx'].values[0] if node_data['ib_rx'].values.size > 0 else [] net_tx, net_rx = generate_network_sequences(ib_tx, ib_rx, samples, lambda_poisson=0.3) if verbose: print('ib_tx, ib_rx, samples:', ib_tx, ib_rx, samples) print('tx:', net_tx) print('rx:', net_rx) if reschedule: # Let the scheduler reschedule the jobs scheduled_nodes = None Loading Loading @@ -175,9 +180,24 @@ def compute_time_offset(begin_time, reference_time): return int(time_offset.total_seconds()) def adjust_bursts(burst_intervals, total, intervals): bursts = burst_intervals / np.sum(burst_intervals) * total bursts = np.round(bursts).astype(int) adjustment = total - np.sum(bursts) # Distribute adjustment across non-zero elements to avoid negative values if adjustment != 0: for i in range(len(bursts)): if bursts[i] > 0: bursts[i] += adjustment break # Apply adjustment only once where it won't cause a negative return bursts def generate_network_sequences(total_tx, total_rx, intervals, lambda_poisson): if not total_tx or not total_rx: return [], [] if not total_tx or not total_rx: return [], [] # Generate sporadic bursts using a Poisson distribution (shared for both tx and rx) burst_intervals = np.random.poisson(lam=lambda_poisson, size=intervals) Loading @@ -185,21 +205,14 @@ def generate_network_sequences(total_tx, total_rx, intervals, lambda_poisson): # Ensure some intervals have no traffic (both tx and rx will share zero intervals) burst_intervals = np.where(burst_intervals > 0, burst_intervals, 0) # For tx tx_bursts = burst_intervals / np.sum(burst_intervals) * total_tx tx_bursts = np.round(tx_bursts).astype(int) tx_adjustment = total_tx - np.sum(tx_bursts) tx_bursts[0] += tx_adjustment # Adjust for rounding # For rx rx_bursts = burst_intervals / np.sum(burst_intervals) * total_rx rx_bursts = np.round(rx_bursts).astype(int) rx_adjustment = total_rx - np.sum(rx_bursts) rx_bursts[0] += rx_adjustment # Adjust for rounding # Adjust bursts for both tx and rx tx_bursts = adjust_bursts(burst_intervals, total_tx, intervals) rx_bursts = adjust_bursts(burst_intervals, total_rx, intervals) return tx_bursts, rx_bursts if __name__ == "__main__": # Example usage Loading