Loading config/fugaku/power.json +7 −7 Original line number Diff line number Diff line { "POWER_GPU_IDLE": 75, "POWER_GPU_MAX": 300, "POWER_CPU_IDLE": 47.25, "POWER_CPU_MAX": 300, "POWER_MEM": 74.26, "POWER_NIC": 21, "POWER_NVME": 45, "POWER_GPU_IDLE": 0, "POWER_GPU_MAX": 0, "POWER_CPU_IDLE": 30, "POWER_CPU_MAX": 150, "POWER_MEM": 10, "POWER_NIC": 0, "POWER_NVME": 0, "POWER_SWITCH": 0, "POWER_CDU": 0, "POWER_UPDATE_FREQ": 10, Loading config/fugaku/scheduler.json +1 −1 Original line number Diff line number Diff line { "SEED": 42, "JOB_ARRIVAL_TIME": 60, "JOB_ARRIVAL_TIME": 10, "MTBF": 11, "MAX_TIME": 88200, "TRACE_QUANTA": 10, Loading config/fugaku/system.json +5 −5 Original line number Diff line number Diff line { "NUM_CDUS": 50, "RACKS_PER_CDU": 2, "NODES_PER_RACK": 384, "NUM_CDUS": 24, "RACKS_PER_CDU": 18, "NODES_PER_RACK": 368, "RECTIFIERS_PER_RACK": 8, "CHASSIS_PER_RACK": 8, "NODES_PER_BLADE": 1, Loading @@ -13,8 +13,8 @@ "DOWN_NODES": [], "CPUS_PER_NODE": 1, "GPUS_PER_NODE": 0, "CPU_PEAK_FLOPS": 3.072E12, "CPU_PEAK_FLOPS": 3.379E12, "GPU_PEAK_FLOPS": 0, "CPU_FP_RATIO": 1.0, "CPU_FP_RATIO": 0.82, "GPU_FP_RATIO": 0.0 } raps/dataloaders/fugaku.py +42 −5 Original line number Diff line number Diff line """ Download parquet files from https://zenodo.org/records/11467483 Note that F-Data doesn't give a list of nodes used, so we set 'scheduled_nodes' to None which triggers the scheduler to schedule the nodes itself. Also, power in F-Data is only given at node-level. We can use node-level power by adding the --validate option. The --reschedule will compute submit times from Poisson distribution, instead of using the submit times given in F-Data. python main.py --system fugaku -f /path/to/21_04.parquet --reschedule --validate --reschedule """ import pandas as pd from tqdm import tqdm from ..job import job_dict from ..utils import next_arrival def load_data(path, **kwargs): Loading Loading @@ -31,6 +48,14 @@ def load_data_from_df(df, **kwargs): Returns: list: List of job dictionaries. """ encrypt_bool = kwargs.get('encrypt') fastforward = kwargs.get('fastforward') reschedule = kwargs.get('reschedule') validate = kwargs.get('validate') jid = kwargs.get('jid', '*') if fastforward: print(f"fast-forwarding {fastforward} seconds") job_list = [] # Convert 'adt' (submit time) to datetime and find the earliest submission time Loading @@ -38,16 +63,28 @@ def load_data_from_df(df, **kwargs): earliest_submit_time = df['adt'].min() # Loop through the DataFrame rows to extract job information for _, row in df.iterrows(): for _, row in tqdm(df.iterrows(), total=len(df), desc="Processing Jobs"): nodes_required = row['nnumr'] if 'nnumr' in df.columns else 0 name = row['jnam'] if 'jnam' in df.columns else 'unknown' if validate: cpu_trace = row['avgpcon'] gpu_trace = cpu_trace else: cpu_trace = row['perf1'] if 'perf1' in df.columns else 0 # Assuming some performance metric as cpu_trace gpu_trace = 0 # Set to 0 as GPU trace is not explicitly provided wall_time = row['duration'] if 'duration' in df.columns else 0 end_state = row['exit state'] if 'exit state' in df.columns else 'unknown' scheduled_nodes = row['nnuma'] if 'nnuma' in df.columns else 0 #scheduled_nodes = row['nnuma'] if 'nnuma' in df.columns else 0 scheduled_nodes = None submit_time = row['adt'] if 'adt' in df.columns else earliest_submit_time if reschedule: # Let the scheduler reschedule the jobs time_offset = next_arrival() else: time_offset = (submit_time - earliest_submit_time).total_seconds() # Compute time offset in seconds job_id = row['jid'] if 'jid' in df.columns else 'unknown' priority = row['pri'] if 'pri' in df.columns else 0 Loading raps/scheduler.py +12 −2 Original line number Diff line number Diff line Loading @@ -86,6 +86,15 @@ class TickData: pue: Optional[float] def get_utilization(trace, time_quanta_index): if isinstance(trace, (list, np.ndarray)): return trace[time_quanta_index] elif isinstance(trace, (int, float)): return float(trace) else: raise TypeError(f"Invalid type for utilization: {type(trace)}.") class Scheduler: """Job scheduler and simulation manager.""" def __init__(self, total_nodes, down_nodes, power_manager, flops_manager, \ Loading Loading @@ -249,8 +258,9 @@ class Scheduler: job.running_time = self.current_time - job.start_time time_quanta_index = (self.current_time - job.start_time) // TRACE_QUANTA cpu_util = job.cpu_trace[time_quanta_index] gpu_util = job.gpu_trace[time_quanta_index] cpu_util = get_utilization(job.cpu_trace, time_quanta_index) gpu_util = get_utilization(job.gpu_trace, time_quanta_index) self.flops_manager.update_flop_state(job.scheduled_nodes, cpu_util, gpu_util) job.power = self.power_manager.update_power_state(job.scheduled_nodes, Loading Loading
config/fugaku/power.json +7 −7 Original line number Diff line number Diff line { "POWER_GPU_IDLE": 75, "POWER_GPU_MAX": 300, "POWER_CPU_IDLE": 47.25, "POWER_CPU_MAX": 300, "POWER_MEM": 74.26, "POWER_NIC": 21, "POWER_NVME": 45, "POWER_GPU_IDLE": 0, "POWER_GPU_MAX": 0, "POWER_CPU_IDLE": 30, "POWER_CPU_MAX": 150, "POWER_MEM": 10, "POWER_NIC": 0, "POWER_NVME": 0, "POWER_SWITCH": 0, "POWER_CDU": 0, "POWER_UPDATE_FREQ": 10, Loading
config/fugaku/scheduler.json +1 −1 Original line number Diff line number Diff line { "SEED": 42, "JOB_ARRIVAL_TIME": 60, "JOB_ARRIVAL_TIME": 10, "MTBF": 11, "MAX_TIME": 88200, "TRACE_QUANTA": 10, Loading
config/fugaku/system.json +5 −5 Original line number Diff line number Diff line { "NUM_CDUS": 50, "RACKS_PER_CDU": 2, "NODES_PER_RACK": 384, "NUM_CDUS": 24, "RACKS_PER_CDU": 18, "NODES_PER_RACK": 368, "RECTIFIERS_PER_RACK": 8, "CHASSIS_PER_RACK": 8, "NODES_PER_BLADE": 1, Loading @@ -13,8 +13,8 @@ "DOWN_NODES": [], "CPUS_PER_NODE": 1, "GPUS_PER_NODE": 0, "CPU_PEAK_FLOPS": 3.072E12, "CPU_PEAK_FLOPS": 3.379E12, "GPU_PEAK_FLOPS": 0, "CPU_FP_RATIO": 1.0, "CPU_FP_RATIO": 0.82, "GPU_FP_RATIO": 0.0 }
raps/dataloaders/fugaku.py +42 −5 Original line number Diff line number Diff line """ Download parquet files from https://zenodo.org/records/11467483 Note that F-Data doesn't give a list of nodes used, so we set 'scheduled_nodes' to None which triggers the scheduler to schedule the nodes itself. Also, power in F-Data is only given at node-level. We can use node-level power by adding the --validate option. The --reschedule will compute submit times from Poisson distribution, instead of using the submit times given in F-Data. python main.py --system fugaku -f /path/to/21_04.parquet --reschedule --validate --reschedule """ import pandas as pd from tqdm import tqdm from ..job import job_dict from ..utils import next_arrival def load_data(path, **kwargs): Loading Loading @@ -31,6 +48,14 @@ def load_data_from_df(df, **kwargs): Returns: list: List of job dictionaries. """ encrypt_bool = kwargs.get('encrypt') fastforward = kwargs.get('fastforward') reschedule = kwargs.get('reschedule') validate = kwargs.get('validate') jid = kwargs.get('jid', '*') if fastforward: print(f"fast-forwarding {fastforward} seconds") job_list = [] # Convert 'adt' (submit time) to datetime and find the earliest submission time Loading @@ -38,16 +63,28 @@ def load_data_from_df(df, **kwargs): earliest_submit_time = df['adt'].min() # Loop through the DataFrame rows to extract job information for _, row in df.iterrows(): for _, row in tqdm(df.iterrows(), total=len(df), desc="Processing Jobs"): nodes_required = row['nnumr'] if 'nnumr' in df.columns else 0 name = row['jnam'] if 'jnam' in df.columns else 'unknown' if validate: cpu_trace = row['avgpcon'] gpu_trace = cpu_trace else: cpu_trace = row['perf1'] if 'perf1' in df.columns else 0 # Assuming some performance metric as cpu_trace gpu_trace = 0 # Set to 0 as GPU trace is not explicitly provided wall_time = row['duration'] if 'duration' in df.columns else 0 end_state = row['exit state'] if 'exit state' in df.columns else 'unknown' scheduled_nodes = row['nnuma'] if 'nnuma' in df.columns else 0 #scheduled_nodes = row['nnuma'] if 'nnuma' in df.columns else 0 scheduled_nodes = None submit_time = row['adt'] if 'adt' in df.columns else earliest_submit_time if reschedule: # Let the scheduler reschedule the jobs time_offset = next_arrival() else: time_offset = (submit_time - earliest_submit_time).total_seconds() # Compute time offset in seconds job_id = row['jid'] if 'jid' in df.columns else 'unknown' priority = row['pri'] if 'pri' in df.columns else 0 Loading
raps/scheduler.py +12 −2 Original line number Diff line number Diff line Loading @@ -86,6 +86,15 @@ class TickData: pue: Optional[float] def get_utilization(trace, time_quanta_index): if isinstance(trace, (list, np.ndarray)): return trace[time_quanta_index] elif isinstance(trace, (int, float)): return float(trace) else: raise TypeError(f"Invalid type for utilization: {type(trace)}.") class Scheduler: """Job scheduler and simulation manager.""" def __init__(self, total_nodes, down_nodes, power_manager, flops_manager, \ Loading Loading @@ -249,8 +258,9 @@ class Scheduler: job.running_time = self.current_time - job.start_time time_quanta_index = (self.current_time - job.start_time) // TRACE_QUANTA cpu_util = job.cpu_trace[time_quanta_index] gpu_util = job.gpu_trace[time_quanta_index] cpu_util = get_utilization(job.cpu_trace, time_quanta_index) gpu_util = get_utilization(job.gpu_trace, time_quanta_index) self.flops_manager.update_flop_state(job.scheduled_nodes, cpu_util, gpu_util) job.power = self.power_manager.update_power_state(job.scheduled_nodes, Loading