Loading multi-part-sim.py +3 −2 Original line number Diff line number Diff line Loading @@ -49,13 +49,14 @@ if args.replay: job['nodes_required'] = random.randint(1, args.scale) job['requested_nodes'] = None # Setting to None triggers scheduler to assign nodes if args.reschedule == 'poisson': if args.arrival == 'poisson': for job in tqdm(jobs, desc="Rescheduling jobs"): partition = job['partition'] partition_config = configs[partition_names.index(partition)] job['requested_nodes'] = None job['submit_time'] = next_arrival(1 / partition_config['JOB_ARRIVAL_TIME']) elif args.reschedule == 'submit-time': elif args.arrival == 'prescribed': raise NotImplementedError else: # Synthetic workload Loading raps/dataloaders/adastraMI250.py +6 −7 Original line number Diff line number Diff line Loading @@ -7,8 +7,8 @@ # to simulate the dataset python main.py -f /path/to/AdastaJobsMI250_15days.parquet --system adastra # to reschedule python main.py -f /path/to/AdastaJobsMI250_15days.parquet --system adastra --reschedule poisson # to replay with different arrival distribution python main.py -f /path/to/AdastaJobsMI250_15days.parquet --system adastra --arrival poisson # to fast-forward 60 days and replay for 1 day python main.py -f /path/to/AdastaJobsMI250_15days.parquet --system adastra -ff 60d -t 1d Loading Loading @@ -56,7 +56,7 @@ def load_data_from_df(jobs_df: pd.DataFrame, **kwargs): count_jobs_notOK = 0 config = kwargs.get('config') min_time = kwargs.get('min_time', None) reschedule = kwargs.get('reschedule') arrival = kwargs.get('arrival') fastforward = kwargs.get('fastforward') validate = kwargs.get('validate') jid = kwargs.get('jid', '*') Loading Loading @@ -157,21 +157,20 @@ def load_data_from_df(jobs_df: pd.DataFrame, **kwargs): if fastforward: time_offset -= fastforward if reschedule == 'poisson': # Let the scheduler reschedule the jobs if arrival == 'poisson': # Modify the arrival times of the jobs according to Poisson distribution scheduled_nodes = None time_offset = next_arrival(1/config['JOB_ARRIVAL_TIME']) else: # Prescribed replay scheduled_nodes = (jobs_df.loc[jidx, 'nodes']).tolist() if time_offset >= 0 and wall_time > 0: #print("start_time",time_start,"\tend_time",time_end,"\twall_time",wall_time,"\tquanta wall time",gpu_trace.size * TRACE_QUANTA ) job_info = job_dict(nodes_required, name, account, cpu_trace, gpu_trace, [],[],wall_time, end_state, scheduled_nodes, time_offset, job_id, priority) jobs.append(job_info) else: count_jobs_notOK = count_jobs_notOK + 1 count_jobs_notOK += 1 print("many jobs not OK !!!!!!!!!!!!!!! : ",count_jobs_notOK) print("jobs not added: ", count_jobs_notOK) return jobs def xname_to_index(xname: str, config: dict): Loading raps/dataloaders/frontier.py +2 −2 Original line number Diff line number Diff line Loading @@ -61,7 +61,7 @@ def load_data_from_df(jobs_df: pd.DataFrame, jobprofile_df: pd.DataFrame, **kwar config = kwargs.get('config') encrypt_bool = kwargs.get('encrypt') fastforward = kwargs.get('fastforward') reschedule = kwargs.get('reschedule') arrival = kwargs.get('arrival') validate = kwargs.get('validate') jid = kwargs.get('jid', '*') Loading Loading @@ -153,7 +153,7 @@ def load_data_from_df(jobs_df: pd.DataFrame, jobprofile_df: pd.DataFrame, **kwar if '' in xnames: continue if reschedule == 'poisson': # Let the scheduler reschedule the jobs if arrival == 'poisson': # Modify the arrival times of the jobs according to Poisson distribution scheduled_nodes = None time_offset = next_arrival(1/config['JOB_ARRIVAL_TIME']) time_start = None Loading raps/dataloaders/fugaku.py +4 −4 Original line number Diff line number Diff line Loading @@ -7,10 +7,10 @@ Also, power in F-Data is only given at node-level. We can use node-level power by adding the --validate option. The '--reschedule poisson' will compute submit times from Poisson distribution, instead of using The '--arrival poisson' will compute submit times from Poisson distribution, instead of using the submit times given in F-Data. python main.py --system fugaku -f /path/to/21_04.parquet --reschedule poisson --validate python main.py --system fugaku -f /path/to/21_04.parquet --arrival poisson --validate """ import pandas as pd Loading Loading @@ -50,7 +50,7 @@ def load_data_from_df(df, **kwargs): """ encrypt_bool = kwargs.get('encrypt') fastforward = kwargs.get('fastforward') reschedule = kwargs.get('reschedule') arrival = kwargs.get('arrival') validate = kwargs.get('validate') jid = kwargs.get('jid', '*') config = kwargs.get('config') Loading Loading @@ -84,7 +84,7 @@ def load_data_from_df(df, **kwargs): #scheduled_nodes = row['nnuma'] if 'nnuma' in df.columns else 0 scheduled_nodes = None submit_time = row['adt'] if 'adt' in df.columns else min_time if reschedule == 'poisson': # Let the scheduler reschedule the jobs if arrival == 'poisson': # Modify the arrival times of according to Poisson distribution time_offset = next_arrival(1/config['JOB_ARRIVAL_TIME']) else: time_offset = (submit_time - min_time).total_seconds() # Compute time offset in seconds Loading raps/dataloaders/lassen.py +16 −11 Original line number Diff line number Diff line Loading @@ -19,8 +19,8 @@ Usage Instructions: # to simulate the dataset as submitted python main.py -f /path/to/LAST/Lassen-Supercomputer-Job-Dataset --system lassen # to reschedule python main.py -f /path/to/LAST/Lassen-Supercomputer-Job-Dataset --system lassen --reschedule poisson # to modify the submit times of the telemetry according to Poisson distribution python main.py -f /path/to/LAST/Lassen-Supercomputer-Job-Dataset --system lassen --arrival poisson # to fast-forward 37 days and replay for 1 day python main.py -f /path/to/LAST/Lassen-Supercomputer-Job-Dataset --system lassen -ff 37d -t 1d Loading Loading @@ -56,7 +56,7 @@ def load_data_from_df(allocation_df, node_df, step_df, **kwargs): """ config = kwargs.get('config') jid = kwargs.get('jid', '*') reschedule = kwargs.get('reschedule') arrival = kwargs.get('arrival') fastforward = kwargs.get('fastforward') verbose = kwargs.get('verbose') min_time = kwargs.get('min_time', None) Loading @@ -64,6 +64,7 @@ def load_data_from_df(allocation_df, node_df, step_df, **kwargs): if fastforward: print(f"fast-forwarding {fastforward} seconds") allocation_df['job_submit_time'] = pd.to_datetime(allocation_df['job_submit_time'], format='mixed', errors='coerce') allocation_df['begin_time'] = pd.to_datetime(allocation_df['begin_time'], format='mixed', errors='coerce') allocation_df['end_time'] = pd.to_datetime(allocation_df['end_time'], format='mixed', errors='coerce') Loading Loading @@ -121,14 +122,17 @@ def load_data_from_df(allocation_df, node_df, step_df, **kwargs): net_tx, net_rx = generate_network_sequences(ib_tx, ib_rx, samples, lambda_poisson=0.3) if reschedule == 'poisson': # Let the scheduler reschedule the jobs if arrival == 'poisson': # Modify the submit times according to Poisson process scheduled_nodes = None time_offset = next_arrival(1/config['JOB_ARRIVAL_TIME']) else: time_submit = next_arrival(1/config['JOB_ARRIVAL_TIME']) time_start = None # Scheduler will determine start time else: # Prescribed replay scheduled_nodes = get_scheduled_nodes(row['allocation_id'], node_df) time_offset = compute_time_offset(row['begin_time'], min_time) time_submit = compute_time_offset(row['job_submit_time'], min_time) time_start = compute_time_offset(row['begin_time'], min_time) if fastforward: time_offset -= fastforward time_submit -= fastforward time_start -= fastforward if verbose: print('ib_tx, ib_rx, samples:', ib_tx, ib_rx, samples) Loading @@ -136,7 +140,7 @@ def load_data_from_df(allocation_df, node_df, step_df, **kwargs): print('rx:', net_rx) print('scheduled_nodes:', nodes_required, scheduled_nodes) if time_offset >= 0: if time_submit >= 0: job_info = job_dict(nodes_required, row['hashed_user_id'], Loading @@ -144,9 +148,10 @@ def load_data_from_df(allocation_df, node_df, step_df, **kwargs): cpu_trace, gpu_trace, net_tx, net_rx, wall_time, row['exit_status'], scheduled_nodes, time_offset, time_submit, job_id, row.get('priority', 0)) row.get('priority', 0), time_start) job_list.append(job_info) Loading Loading
multi-part-sim.py +3 −2 Original line number Diff line number Diff line Loading @@ -49,13 +49,14 @@ if args.replay: job['nodes_required'] = random.randint(1, args.scale) job['requested_nodes'] = None # Setting to None triggers scheduler to assign nodes if args.reschedule == 'poisson': if args.arrival == 'poisson': for job in tqdm(jobs, desc="Rescheduling jobs"): partition = job['partition'] partition_config = configs[partition_names.index(partition)] job['requested_nodes'] = None job['submit_time'] = next_arrival(1 / partition_config['JOB_ARRIVAL_TIME']) elif args.reschedule == 'submit-time': elif args.arrival == 'prescribed': raise NotImplementedError else: # Synthetic workload Loading
raps/dataloaders/adastraMI250.py +6 −7 Original line number Diff line number Diff line Loading @@ -7,8 +7,8 @@ # to simulate the dataset python main.py -f /path/to/AdastaJobsMI250_15days.parquet --system adastra # to reschedule python main.py -f /path/to/AdastaJobsMI250_15days.parquet --system adastra --reschedule poisson # to replay with different arrival distribution python main.py -f /path/to/AdastaJobsMI250_15days.parquet --system adastra --arrival poisson # to fast-forward 60 days and replay for 1 day python main.py -f /path/to/AdastaJobsMI250_15days.parquet --system adastra -ff 60d -t 1d Loading Loading @@ -56,7 +56,7 @@ def load_data_from_df(jobs_df: pd.DataFrame, **kwargs): count_jobs_notOK = 0 config = kwargs.get('config') min_time = kwargs.get('min_time', None) reschedule = kwargs.get('reschedule') arrival = kwargs.get('arrival') fastforward = kwargs.get('fastforward') validate = kwargs.get('validate') jid = kwargs.get('jid', '*') Loading Loading @@ -157,21 +157,20 @@ def load_data_from_df(jobs_df: pd.DataFrame, **kwargs): if fastforward: time_offset -= fastforward if reschedule == 'poisson': # Let the scheduler reschedule the jobs if arrival == 'poisson': # Modify the arrival times of the jobs according to Poisson distribution scheduled_nodes = None time_offset = next_arrival(1/config['JOB_ARRIVAL_TIME']) else: # Prescribed replay scheduled_nodes = (jobs_df.loc[jidx, 'nodes']).tolist() if time_offset >= 0 and wall_time > 0: #print("start_time",time_start,"\tend_time",time_end,"\twall_time",wall_time,"\tquanta wall time",gpu_trace.size * TRACE_QUANTA ) job_info = job_dict(nodes_required, name, account, cpu_trace, gpu_trace, [],[],wall_time, end_state, scheduled_nodes, time_offset, job_id, priority) jobs.append(job_info) else: count_jobs_notOK = count_jobs_notOK + 1 count_jobs_notOK += 1 print("many jobs not OK !!!!!!!!!!!!!!! : ",count_jobs_notOK) print("jobs not added: ", count_jobs_notOK) return jobs def xname_to_index(xname: str, config: dict): Loading
raps/dataloaders/frontier.py +2 −2 Original line number Diff line number Diff line Loading @@ -61,7 +61,7 @@ def load_data_from_df(jobs_df: pd.DataFrame, jobprofile_df: pd.DataFrame, **kwar config = kwargs.get('config') encrypt_bool = kwargs.get('encrypt') fastforward = kwargs.get('fastforward') reschedule = kwargs.get('reschedule') arrival = kwargs.get('arrival') validate = kwargs.get('validate') jid = kwargs.get('jid', '*') Loading Loading @@ -153,7 +153,7 @@ def load_data_from_df(jobs_df: pd.DataFrame, jobprofile_df: pd.DataFrame, **kwar if '' in xnames: continue if reschedule == 'poisson': # Let the scheduler reschedule the jobs if arrival == 'poisson': # Modify the arrival times of the jobs according to Poisson distribution scheduled_nodes = None time_offset = next_arrival(1/config['JOB_ARRIVAL_TIME']) time_start = None Loading
raps/dataloaders/fugaku.py +4 −4 Original line number Diff line number Diff line Loading @@ -7,10 +7,10 @@ Also, power in F-Data is only given at node-level. We can use node-level power by adding the --validate option. The '--reschedule poisson' will compute submit times from Poisson distribution, instead of using The '--arrival poisson' will compute submit times from Poisson distribution, instead of using the submit times given in F-Data. python main.py --system fugaku -f /path/to/21_04.parquet --reschedule poisson --validate python main.py --system fugaku -f /path/to/21_04.parquet --arrival poisson --validate """ import pandas as pd Loading Loading @@ -50,7 +50,7 @@ def load_data_from_df(df, **kwargs): """ encrypt_bool = kwargs.get('encrypt') fastforward = kwargs.get('fastforward') reschedule = kwargs.get('reschedule') arrival = kwargs.get('arrival') validate = kwargs.get('validate') jid = kwargs.get('jid', '*') config = kwargs.get('config') Loading Loading @@ -84,7 +84,7 @@ def load_data_from_df(df, **kwargs): #scheduled_nodes = row['nnuma'] if 'nnuma' in df.columns else 0 scheduled_nodes = None submit_time = row['adt'] if 'adt' in df.columns else min_time if reschedule == 'poisson': # Let the scheduler reschedule the jobs if arrival == 'poisson': # Modify the arrival times of according to Poisson distribution time_offset = next_arrival(1/config['JOB_ARRIVAL_TIME']) else: time_offset = (submit_time - min_time).total_seconds() # Compute time offset in seconds Loading
raps/dataloaders/lassen.py +16 −11 Original line number Diff line number Diff line Loading @@ -19,8 +19,8 @@ Usage Instructions: # to simulate the dataset as submitted python main.py -f /path/to/LAST/Lassen-Supercomputer-Job-Dataset --system lassen # to reschedule python main.py -f /path/to/LAST/Lassen-Supercomputer-Job-Dataset --system lassen --reschedule poisson # to modify the submit times of the telemetry according to Poisson distribution python main.py -f /path/to/LAST/Lassen-Supercomputer-Job-Dataset --system lassen --arrival poisson # to fast-forward 37 days and replay for 1 day python main.py -f /path/to/LAST/Lassen-Supercomputer-Job-Dataset --system lassen -ff 37d -t 1d Loading Loading @@ -56,7 +56,7 @@ def load_data_from_df(allocation_df, node_df, step_df, **kwargs): """ config = kwargs.get('config') jid = kwargs.get('jid', '*') reschedule = kwargs.get('reschedule') arrival = kwargs.get('arrival') fastforward = kwargs.get('fastforward') verbose = kwargs.get('verbose') min_time = kwargs.get('min_time', None) Loading @@ -64,6 +64,7 @@ def load_data_from_df(allocation_df, node_df, step_df, **kwargs): if fastforward: print(f"fast-forwarding {fastforward} seconds") allocation_df['job_submit_time'] = pd.to_datetime(allocation_df['job_submit_time'], format='mixed', errors='coerce') allocation_df['begin_time'] = pd.to_datetime(allocation_df['begin_time'], format='mixed', errors='coerce') allocation_df['end_time'] = pd.to_datetime(allocation_df['end_time'], format='mixed', errors='coerce') Loading Loading @@ -121,14 +122,17 @@ def load_data_from_df(allocation_df, node_df, step_df, **kwargs): net_tx, net_rx = generate_network_sequences(ib_tx, ib_rx, samples, lambda_poisson=0.3) if reschedule == 'poisson': # Let the scheduler reschedule the jobs if arrival == 'poisson': # Modify the submit times according to Poisson process scheduled_nodes = None time_offset = next_arrival(1/config['JOB_ARRIVAL_TIME']) else: time_submit = next_arrival(1/config['JOB_ARRIVAL_TIME']) time_start = None # Scheduler will determine start time else: # Prescribed replay scheduled_nodes = get_scheduled_nodes(row['allocation_id'], node_df) time_offset = compute_time_offset(row['begin_time'], min_time) time_submit = compute_time_offset(row['job_submit_time'], min_time) time_start = compute_time_offset(row['begin_time'], min_time) if fastforward: time_offset -= fastforward time_submit -= fastforward time_start -= fastforward if verbose: print('ib_tx, ib_rx, samples:', ib_tx, ib_rx, samples) Loading @@ -136,7 +140,7 @@ def load_data_from_df(allocation_df, node_df, step_df, **kwargs): print('rx:', net_rx) print('scheduled_nodes:', nodes_required, scheduled_nodes) if time_offset >= 0: if time_submit >= 0: job_info = job_dict(nodes_required, row['hashed_user_id'], Loading @@ -144,9 +148,10 @@ def load_data_from_df(allocation_df, node_df, step_df, **kwargs): cpu_trace, gpu_trace, net_tx, net_rx, wall_time, row['exit_status'], scheduled_nodes, time_offset, time_submit, job_id, row.get('priority', 0)) row.get('priority', 0), time_start) job_list.append(job_info) Loading