diff --git a/README.md b/README.md index e49e7ce4221982a31466684cd4f6ceb4d206d2e0..a9d1d98f71467e3f0fa0275980950ddc6cdf38e7 100644 --- a/README.md +++ b/README.md @@ -68,7 +68,7 @@ This will simulate synthetic workloads on two partitions as defined in `config/s This will dump a .npz file with a randomized name, e.g. ac23db.npz. Let's rename this file to pm100.npz for clarity. Note: can control-C when the simulation starts. Now, this pm100.npz file can be used with `multi-part-sim.py` as follows: - python multi-part-sim.py -x setonix/* -f pm100.npz --reschedule --scale 192 + python multi-part-sim.py -x setonix/* -f pm100.npz --reschedule poisson --scale 192 The `--reschedule` flag will use the internal scheduler to determine what nodes to schedule for each job, and the `--scale` flag will specify the maximum number of nodes for each job (generally set this to the max number of nodes of the smallest partition). diff --git a/args.py b/args.py index 2623eb62cdffd255f05e958975f887d5b1ca6929..710f632b3b689030f4a66281ea006cf8611e5ed9 100644 --- a/args.py +++ b/args.py @@ -14,7 +14,8 @@ parser.add_argument('-v', '--verbose', action='store_true', help='Enable verbose parser.add_argument('--seed', action='store_true', help='Set random number seed for deterministic simulation') parser.add_argument('-f', '--replay', nargs='+', type=str, help='Either: path/to/joblive path/to/jobprofile' + \ ' -or- filename.npz (overrides --workload option)') -parser.add_argument('--reschedule', action='store_true', help='Reschedule the telemetry workload') +choices = ['false','poisson', 'submit-time'] +parser.add_argument('--reschedule', type=str, choices=choices, default=choices[0], help='Reschedule the telemetry workload') parser.add_argument('-u', '--uncertainties', action='store_true', help='Change from floating point units to floating point units with uncertainties.' + \ ' Very expensive w.r.t simulation time!') diff --git a/main.py b/main.py index 7338964dbdd7d261053fe7b6700382e1a009f6db..f70aeaff78d2c2b40f182b327768e50c5a39cc0a 100644 --- a/main.py +++ b/main.py @@ -95,13 +95,16 @@ if args.replay: job['nodes_required'] = random.randint(1, args.scale) job['requested_nodes'] = None # Setting to None triggers scheduler to assign nodes - if args.reschedule: + if args.reschedule == 'poisson': print("available nodes:", config['AVAILABLE_NODES']) for job in tqdm(jobs, desc="Rescheduling jobs"): job['requested_nodes'] = None job['submit_time'] = next_arrival(1 / config['JOB_ARRIVAL_TIME']) + elif args.reschedule == 'submit-time': + raise NotImplementedError - else: # custom data loader + + else: # custom data loader print(*args.replay) jobs = td.load_data(args.replay) td.save_snapshot(jobs, filename=DIR_NAME) diff --git a/multi-part-sim.py b/multi-part-sim.py index 946164a9cb379d57d407295362ca7d464bd9c440..36218377c22fb99505375c025ee7e026f2458cd3 100644 --- a/multi-part-sim.py +++ b/multi-part-sim.py @@ -32,7 +32,7 @@ args_dicts = [{**vars(args), 'config': config} for config in configs] # Initialize Workload if args.replay: - # Currently this assumes that an .npz file has already been created + # Currently this assumes that an .npz file has already been created # e.g., python main.py --system marconi100 -f ~/data/marconi100/job_table.parquet td = Telemetry(**args_dicts[0]) print(f"Loading {args.replay[0]}...") @@ -41,7 +41,7 @@ if args.replay: print("available nodes:", available_nodes) # Randomly assign partition - for job in jobs: + for job in jobs: job['partition'] = random.choices(partition_names, weights=available_nodes, k=1)[0] if args.scale: @@ -49,14 +49,16 @@ if args.replay: job['nodes_required'] = random.randint(1, args.scale) job['requested_nodes'] = None # Setting to None triggers scheduler to assign nodes - if args.reschedule: + if args.reschedule == 'poisson': for job in tqdm(jobs, desc="Rescheduling jobs"): partition = job['partition'] partition_config = configs[partition_names.index(partition)] job['requested_nodes'] = None job['submit_time'] = next_arrival(1 / partition_config['JOB_ARRIVAL_TIME']) + elif args.reschedule == 'submit-time': + raise NotImplementedError -else: # Synthetic workload +else: # Synthetic workload wl = Workload(*configs) # Generate jobs based on workload type diff --git a/raps/dataloaders/adastraMI250.py b/raps/dataloaders/adastraMI250.py index 1f20bafddb3d7e3b316061aad90ae053fc87d415..58eaec700adf2463c4c841ced1ef5df7f15c6f12 100644 --- a/raps/dataloaders/adastraMI250.py +++ b/raps/dataloaders/adastraMI250.py @@ -8,7 +8,7 @@ python main.py -f /path/to/AdastaJobsMI250_15days.parquet --system adastra # to reschedule - python main.py -f /path/to/AdastaJobsMI250_15days.parquet --system adastra --reschedule + python main.py -f /path/to/AdastaJobsMI250_15days.parquet --system adastra --reschedule poisson # to fast-forward 60 days and replay for 1 day python main.py -f /path/to/AdastaJobsMI250_15days.parquet --system adastra -ff 60d -t 1d @@ -85,8 +85,8 @@ def load_data_from_df(jobs_df: pd.DataFrame, **kwargs): account = jobs_df.loc[jidx, 'user_id'] # or 'group_id' job_id = jobs_df.loc[jidx, 'job_id'] - if not jid == '*': - if int(jid) == int(job_id): + if not jid == '*': + if int(jid) == int(job_id): print(f'Extracting {job_id} profile') else: continue @@ -101,7 +101,7 @@ def load_data_from_df(jobs_df: pd.DataFrame, **kwargs): print("error nodes_required",nodes_required) continue #wall_time = gpu_trace.size * TRACE_QUANTA # seconds - + if validate: node_power = jobs_df.loc[jidx, 'node_power_consumption'] @@ -109,8 +109,8 @@ def load_data_from_df(jobs_df: pd.DataFrame, **kwargs): node_watts = sum(node_power_array) / (wall_time*nodes_required) cpu_trace = node_watts gpu_trace = 0.0 # should contain stddev_node_power when --validate flag is used - - else: + + else: cpu_power = jobs_df.loc[jidx, 'cpu_power_consumption'] cpu_power_array = cpu_power.tolist() cpu_watts = sum(cpu_power_array) / (wall_time*nodes_required) @@ -120,8 +120,8 @@ def load_data_from_df(jobs_df: pd.DataFrame, **kwargs): cpu_util = (cpu_watts/float(config['POWER_CPU_IDLE']) - config['CPUS_PER_NODE']) / ((float(config['POWER_CPU_MAX']) / float(config['POWER_CPU_IDLE'])) -1.0) #power_to_utilization(cpu_power_array, cpu_min_power, cpu_max_power) # print("cpu_watts",cpu_watts,"cpu_util",cpu_util) - cpu_trace = np.maximum(0, cpu_util) - + cpu_trace = np.maximum(0, cpu_util) + node_power = (jobs_df.loc[jidx, 'node_power_consumption']).tolist() mem_power = (jobs_df.loc[jidx, 'mem_power_consumption']).tolist() # Find the minimum length among the three lists @@ -130,7 +130,7 @@ def load_data_from_df(jobs_df: pd.DataFrame, **kwargs): node_power = node_power[:min_length] cpu_power = cpu_power[:min_length] mem_power = mem_power[:min_length] - + gpu_power = (node_power - cpu_power - mem_power - ([config['NICS_PER_NODE'] * config['POWER_NIC']])) gpu_power_array = gpu_power.tolist() @@ -140,29 +140,32 @@ def load_data_from_df(jobs_df: pd.DataFrame, **kwargs): gpu_util = (gpu_watts/float(config['POWER_GPU_IDLE']) - config['GPUS_PER_NODE']) / ((float(config['POWER_GPU_MAX']) / float(config['POWER_GPU_IDLE'])) -1.0) #power_to_utilization(cpu_power_array, cpu_min_power, cpu_max_power) # print("gpu_watts",gpu_watts,"gpu_util",gpu_util) gpu_trace = np.maximum(0, gpu_util) #gpu_util * GPUS_PER_NODE - + priority = int(jobs_df.loc[jidx, 'priority']) - + end_state = jobs_df.loc[jidx, 'job_state'] time_start = jobs_df.loc[jidx, 'start_time'] time_end = jobs_df.loc[jidx, 'end_time'] diff = time_start - time_zero - if jid == '*': + if jid == '*': time_offset = max(diff.total_seconds(), 0) else: # When extracting out a single job, run one iteration past the end of the job time_offset = config['UI_UPDATE_FREQ'] - if fastforward: time_offset -= fastforward + if fastforward: + time_offset -= fastforward - if reschedule: # Let the scheduler reschedule the jobs + if reschedule == 'poisson': # Let the scheduler reschedule the jobs scheduled_nodes = None time_offset = next_arrival(1/config['JOB_ARRIVAL_TIME']) - else: # Prescribed replay + elif reschedule == 'submit-time': + raise NotImplementedError + else: # Prescribed replay scheduled_nodes = (jobs_df.loc[jidx, 'nodes']).tolist() - if time_offset >= 0 and wall_time > 0: + if time_offset >= 0 and wall_time > 0: #print("start_time",time_start,"\tend_time",time_end,"\twall_time",wall_time,"\tquanta wall time",gpu_trace.size * TRACE_QUANTA ) job_info = job_dict(nodes_required, name, account, cpu_trace, gpu_trace, [],[],wall_time, end_state, scheduled_nodes, time_offset, job_id, priority) diff --git a/raps/dataloaders/frontier.py b/raps/dataloaders/frontier.py index 2f4af1775966f6491b7b9049b891bb77e860b175..1b1ae66df0a90be7d43a952eeca2678e3f5c7876 100644 --- a/raps/dataloaders/frontier.py +++ b/raps/dataloaders/frontier.py @@ -22,7 +22,7 @@ def aging_boost(nnodes): https://docs.olcf.ornl.gov/systems/frontier_user_guide.html#job-priority-by-node-count """ if nnodes >= 5645: - return 8*24*3600 # seconds + return 8*24*3600 # seconds elif nnodes >= 1882: return 4*24*3600 else: @@ -38,7 +38,7 @@ def load_data(files, **kwargs): list The list of parsed jobs. """ - assert(len(files) == 2), "Frontier dataloader requires two files: joblive and jobprofile" + assert (len(files) == 2), "Frontier dataloader requires two files: joblive and jobprofile" jobs_path = files[0] jobs_df = pd.read_parquet(jobs_path, engine='pyarrow') @@ -65,7 +65,8 @@ def load_data_from_df(jobs_df: pd.DataFrame, jobprofile_df: pd.DataFrame, **kwar validate = kwargs.get('validate') jid = kwargs.get('jid', '*') - if fastforward: print(f"fast-forwarding {fastforward} seconds") + if fastforward: + print(f"fast-forwarding {fastforward} seconds") min_time = kwargs.get('min_time', None) @@ -102,7 +103,8 @@ def load_data_from_df(jobs_df: pd.DataFrame, jobprofile_df: pd.DataFrame, **kwar nodes_required = jobs_df.loc[jidx, 'node_count'] end_state = jobs_df.loc[jidx, 'state_current'] name = jobs_df.loc[jidx, 'name'] - if encrypt_bool: name = encrypt(name) + if encrypt_bool: + name = encrypt(name) if validate: cpu_power = jobprofile_df[jobprofile_df['allocation_id'] @@ -132,31 +134,42 @@ def load_data_from_df(jobs_df: pd.DataFrame, jobprofile_df: pd.DataFrame, **kwar cpu_trace[np.isnan(cpu_trace)] = 0 gpu_trace[np.isnan(gpu_trace)] = 0 - wall_time = gpu_trace.size * config['TRACE_QUANTA'] # seconds + wall_time = gpu_trace.size * config['TRACE_QUANTA'] # seconds time_start = jobs_df.loc[jidx+1, 'time_start'] diff = time_start - time_zero time_offset = max(diff.total_seconds(), 0) - if fastforward: time_offset -= fastforward + if fastforward: + time_offset -= fastforward xnames = jobs_df.loc[jidx, 'xnames'] # Don't replay any job with an empty set of xnames - if '' in xnames: continue + if '' in xnames: + continue - if reschedule: # Let the scheduler reschedule the jobs + if reschedule == 'poisson': # Let the scheduler reschedule the jobs scheduled_nodes = None time_offset = next_arrival(1/config['JOB_ARRIVAL_TIME']) priority = aging_boost(nodes_required) - else: # Prescribed replay + + elif reschedule == 'submit-time': + scheduled_nodes = None + time_submit = jobs_df.loc[jidx, 'time_submission'] + diff = time_submit - time_zero + time_offset = max(diff.total_seconds(), 0) + priority = 0 # SIC + #raise NotImplementedError + + else: # Prescribed replay scheduled_nodes = [] - priority = 0 # not used for replay + priority = 0 # not used for replay for xname in xnames: indices = xname_to_index(xname, config) scheduled_nodes.append(indices) if gpu_trace.size > 0 and (jid == job_id or jid == '*') and time_offset > 0: - job_info = job_dict(nodes_required, name, account, cpu_trace, gpu_trace, [], [], wall_time, + job_info = job_dict(nodes_required, name, account, cpu_trace, gpu_trace, [], [], wall_time, end_state, scheduled_nodes, time_offset, job_id, priority) jobs.append(job_info) @@ -223,6 +236,7 @@ CDU_NAMES = [ 'x2609c1', ] + def cdu_index_to_name(index: int, config: dict): return CDU_NAMES[index - 1] diff --git a/raps/dataloaders/fugaku.py b/raps/dataloaders/fugaku.py index fcd4dbc799dbb3071e1d44e57d24c69a29c48c97..bc28ec325d6941f3751574536d101acd02da911a 100644 --- a/raps/dataloaders/fugaku.py +++ b/raps/dataloaders/fugaku.py @@ -1,16 +1,16 @@ """ Download parquet files from https://zenodo.org/records/11467483 - Note that F-Data doesn't give a list of nodes used, so we set 'scheduled_nodes' to None + Note that F-Data doesn't give a list of nodes used, so we set 'scheduled_nodes' to None which triggers the scheduler to schedule the nodes itself. - Also, power in F-Data is only given at node-level. We can use node-level power by + Also, power in F-Data is only given at node-level. We can use node-level power by adding the --validate option. - The --reschedule will compute submit times from Poisson distribution, instead of using + The '--reschedule poisson' will compute submit times from Poisson distribution, instead of using the submit times given in F-Data. - python main.py --system fugaku -f /path/to/21_04.parquet --reschedule --validate + python main.py --system fugaku -f /path/to/21_04.parquet --reschedule poisson --validate """ import pandas as pd @@ -25,7 +25,7 @@ def load_data(path, **kwargs): Parameters: path (str): Path to the Parquet file. - + Returns: list: List of job dictionaries. """ @@ -44,7 +44,7 @@ def load_data_from_df(df, **kwargs): Parameters: df (pd.DataFrame): DataFrame containing job information. - + Returns: list: List of job dictionaries. """ @@ -59,7 +59,7 @@ def load_data_from_df(df, **kwargs): if fastforward: print(f"fast-forwarding {fastforward} seconds") job_list = [] - + # Convert 'adt' (submit time) to datetime and find the earliest submission time df['adt'] = pd.to_datetime(df['adt'], errors='coerce') if not min_time: @@ -84,14 +84,16 @@ def load_data_from_df(df, **kwargs): #scheduled_nodes = row['nnuma'] if 'nnuma' in df.columns else 0 scheduled_nodes = None submit_time = row['adt'] if 'adt' in df.columns else min_time - if reschedule: # Let the scheduler reschedule the jobs + if reschedule == 'poisson': # Let the scheduler reschedule the jobs time_offset = next_arrival(1/config['JOB_ARRIVAL_TIME']) + elif reschedule == 'submit-time': + raise NotImplementedError else: time_offset = (submit_time - min_time).total_seconds() # Compute time offset in seconds job_id = row['jid'] if 'jid' in df.columns else 'unknown' priority = row['pri'] if 'pri' in df.columns else 0 - + # Create job dictionary job_info = job_dict( nodes_required=nodes_required, @@ -99,8 +101,8 @@ def load_data_from_df(df, **kwargs): account=account, cpu_trace=cpu_trace, gpu_trace=gpu_trace, - ntx_trace=[], - nrx_trace=[], + ntx_trace=[], + nrx_trace=[], wall_time=wall_time, end_state=end_state, scheduled_nodes=scheduled_nodes, @@ -108,9 +110,9 @@ def load_data_from_df(df, **kwargs): job_id=job_id, priority=priority ) - + job_list.append(job_info) - + return job_list diff --git a/raps/dataloaders/lassen.py b/raps/dataloaders/lassen.py index ebdf348277d6d759f2c1a4aaf7824494564f6826..b14cabe16110d409d829a5c8c169f54f0854faf9 100644 --- a/raps/dataloaders/lassen.py +++ b/raps/dataloaders/lassen.py @@ -20,7 +20,7 @@ Usage Instructions: python main.py -f /path/to/LAST/Lassen-Supercomputer-Job-Dataset --system lassen # to reschedule - python main.py -f /path/to/LAST/Lassen-Supercomputer-Job-Dataset --system lassen --reschedule + python main.py -f /path/to/LAST/Lassen-Supercomputer-Job-Dataset --system lassen --reschedule poisson # to fast-forward 37 days and replay for 1 day python main.py -f /path/to/LAST/Lassen-Supercomputer-Job-Dataset --system lassen -ff 37d -t 1d @@ -121,9 +121,11 @@ def load_data_from_df(allocation_df, node_df, step_df, **kwargs): net_tx, net_rx = generate_network_sequences(ib_tx, ib_rx, samples, lambda_poisson=0.3) - if reschedule: # Let the scheduler reschedule the jobs + if reschedule == 'poisson': # Let the scheduler reschedule the jobs scheduled_nodes = None time_offset = next_arrival(1/config['JOB_ARRIVAL_TIME']) + elif reschedule == 'submit-time': + raise NotImplementedError else: scheduled_nodes = get_scheduled_nodes(row['allocation_id'], node_df) time_offset = compute_time_offset(row['begin_time'], min_time) @@ -196,8 +198,8 @@ def adjust_bursts(burst_intervals, total, intervals): def generate_network_sequences(total_tx, total_rx, intervals, lambda_poisson): - - if not total_tx or not total_rx: + + if not total_tx or not total_rx: return [], [] # Generate sporadic bursts using a Poisson distribution (shared for both tx and rx) diff --git a/raps/dataloaders/marconi100.py b/raps/dataloaders/marconi100.py index 1348f23e9dc94e03e58c69195ef84c40de2f60e1..c6a97f80982a19d92559eb74e464c971a26cb955 100644 --- a/raps/dataloaders/marconi100.py +++ b/raps/dataloaders/marconi100.py @@ -1,8 +1,8 @@ """ # Reference - Antici, Francesco, et al. "PM100: A Job Power Consumption Dataset of a - Large-scale Production HPC System." Proceedings of the SC'23 Workshops - of The International Conference on High Performance Computing, + Antici, Francesco, et al. "PM100: A Job Power Consumption Dataset of a + Large-scale Production HPC System." Proceedings of the SC'23 Workshops + of The International Conference on High Performance Computing, Network, Storage, and Analysis. 2023. # get the data @@ -12,7 +12,7 @@ python main.py -f /path/to/job_table.parquet --system marconi100 # to reschedule - python main.py -f /path/to/job_table.parquet --system marconi100 --reschedule + python main.py -f /path/to/job_table.parquet --system marconi100 --reschedule poisson # to fast-forward 60 days and replay for 1 day python main.py -f /path/to/job_table.parquet --system marconi100 -ff 60d -t 1d @@ -88,28 +88,28 @@ def load_data_from_df(jobs_df: pd.DataFrame, **kwargs): account = jobs_df.loc[jidx, 'user_id'] # or 'group_id' job_id = jobs_df.loc[jidx, 'job_id'] - if not jid == '*': - if int(jid) == int(job_id): + if not jid == '*': + if int(jid) == int(job_id): print(f'Extracting {job_id} profile') else: continue nodes_required = jobs_df.loc[jidx, 'num_nodes_alloc'] name = str(uuid.uuid4())[:6] - + if validate: cpu_power = jobs_df.loc[jidx, 'node_power_consumption']/jobs_df.loc[jidx, 'num_nodes_alloc'] cpu_trace = cpu_power gpu_trace = cpu_trace - else: + else: cpu_power = jobs_df.loc[jidx, 'cpu_power_consumption'] cpu_power_array = cpu_power.tolist() cpu_min_power = nodes_required * config['POWER_CPU_IDLE'] * config['CPUS_PER_NODE'] cpu_max_power = nodes_required * config['POWER_CPU_MAX'] * config['CPUS_PER_NODE'] cpu_util = power_to_utilization(cpu_power_array, cpu_min_power, cpu_max_power) cpu_trace = cpu_util * config['CPUS_PER_NODE'] - + node_power = (jobs_df.loc[jidx, 'node_power_consumption']).tolist() mem_power = (jobs_df.loc[jidx, 'mem_power_consumption']).tolist() # Find the minimum length among the three lists @@ -118,7 +118,7 @@ def load_data_from_df(jobs_df: pd.DataFrame, **kwargs): node_power = node_power[:min_length] cpu_power = cpu_power[:min_length] mem_power = mem_power[:min_length] - + gpu_power = (node_power - cpu_power - mem_power - ([nodes_required * config['NICS_PER_NODE'] * config['POWER_NIC']] * len(node_power)) - ([nodes_required * config['POWER_NVME']] * len(node_power))) @@ -127,16 +127,16 @@ def load_data_from_df(jobs_df: pd.DataFrame, **kwargs): gpu_max_power = nodes_required * config['POWER_GPU_MAX'] * config['GPUS_PER_NODE'] gpu_util = power_to_utilization(gpu_power_array, gpu_min_power, gpu_max_power) gpu_trace = gpu_util * config['GPUS_PER_NODE'] - + priority = int(jobs_df.loc[jidx, 'priority']) - + # wall_time = jobs_df.loc[i, 'run_time'] wall_time = gpu_trace.size * config['TRACE_QUANTA'] # seconds end_state = jobs_df.loc[jidx, 'job_state'] time_start = jobs_df.loc[jidx+1, 'start_time'] diff = time_start - time_zero - if jid == '*': + if jid == '*': time_offset = max(diff.total_seconds(), 0) else: # When extracting out a single job, run one iteration past the end of the job @@ -144,12 +144,14 @@ def load_data_from_df(jobs_df: pd.DataFrame, **kwargs): if fastforward: time_offset -= fastforward - if reschedule: # Let the scheduler reschedule the jobs + if reschedule == 'poisson': # Let the scheduler reschedule the jobs scheduled_nodes = None time_offset = next_arrival(1/config['JOB_ARRIVAL_TIME']) - else: # Prescribed replay + elif reschedule == 'submit-time': + raise NotImplementedError + else: # Prescribed replay scheduled_nodes = (jobs_df.loc[jidx, 'nodes']).tolist() - + if gpu_trace.size > 0 and time_offset >= 0: job_info = job_dict(nodes_required, name, account, cpu_trace, gpu_trace, [], [], wall_time, end_state, scheduled_nodes, time_offset, job_id, priority)