Loading config/marconi100/system.json +3 −3 Original line number Diff line number Diff line { "NUM_CDUS": 17, "NUM_CDUS": 1, "RACKS_PER_CDU": 3, "NODES_PER_RACK": 20, "NODES_PER_RACK": 10, "RECTIFIERS_PER_RACK": 5, "CHASSIS_PER_RACK": 1, "NODES_PER_BLADE": 1, Loading @@ -9,7 +9,7 @@ "NICS_PER_NODE": 2, "RECTIFIERS_PER_CHASSIS": 5, "NODES_PER_RECTIFIER": 4, "MISSING_RACKS": [49, 50], "MISSING_RACKS": [0], "DOWN_NODES": [], "CPUS_PER_NODE": 2, "GPUS_PER_NODE": 4, Loading main.py +27 −2 Original line number Diff line number Diff line Loading @@ -9,7 +9,7 @@ import os import re import sys import time import csv from tqdm import tqdm from raps.policy import PolicyType Loading @@ -26,7 +26,7 @@ parser.add_argument('--start', type=str, help='ISO8061 string for start of simul parser.add_argument('--end', type=str, help='ISO8061 string for end of simulation') parser.add_argument('-d', '--debug', action='store_true', help='Enable debug mode and disable rich layout') parser.add_argument('-e', '--encrypt', action='store_true', help='Encrypt any sensitive data in telemetry') parser.add_argument('-n', '--numjobs', type=int, default=1000, help='Number of jobs to schedule') parser.add_argument('-n', '--numjobs', type=int, default=10, help='Number of jobs to schedule') parser.add_argument('-t', '--time', type=str, default=None, help='Length of time to simulate, e.g., 123, 123s, 27m, 3h, 7d') parser.add_argument('-ff', '--fastforward', type=str, default=None, help='Fast-forward by time amount (uses same units as -t)') parser.add_argument('-v', '--verbose', action='store_true', help='Enable verbose output') Loading Loading @@ -105,6 +105,7 @@ else: else: power_manager = PowerManager(SC_SHAPE, DOWN_NODES, power_func=compute_node_power) print(f"printing Schedule from main {args.schedule}") flops_manager = FLOPSManager(SC_SHAPE) layout_manager = LayoutManager(args.layout, args.debug) sc = Scheduler(TOTAL_NODES, DOWN_NODES, power_manager, flops_manager, layout_manager, Loading Loading @@ -180,6 +181,30 @@ if args.verbose: sc.run_simulation_blocking(jobs, timesteps=timesteps) output_stats = sc.get_stats() """ keys_to_include = ["num_samples", "jobs completed", "throughput", "average_turnaround_time", "average_waiting_time", "total_job_power", "fairness_index", "node_efficiency", "energy_delay_product"] fileI = open(f"Time-{args.time}-policy-{args.schedule}.csv", "w") writer = csv.DictWriter(fileI, fieldnames=keys_to_include) filtered_stats = {key: output_stats[key] for key in keys_to_include} writer.writerow(filtered_stats) """ with open(f"Time-{args.time}-policy-{args.schedule}.csv",mode="w",newline="") as fileI: writer = csv.writer(fileI) for key, value in output_stats.items(): writer.writerow([key, value]) fileI.close() print("csv written successfully") # Following b/c we get the following error when we use PM100 telemetry dataset # TypeError: Object of type int64 is not JSON serializable try: Loading raps/Autoencoder.py 0 → 100644 +86 −0 Original line number Diff line number Diff line import torch import torch.nn as nn import torch.optim as optim from torch.utils.data import DataLoader, TensorDataset, random_split #import wandb import torch class ImprovedAutoencoder(nn.Module): def __init__(self, input_dim, encoding_dim, attention_dim=32): super(ImprovedAutoencoder, self).__init__() self.encoder = nn.Sequential( nn.Linear(input_dim, encoding_dim), nn.ReLU(), nn.Linear(encoding_dim, attention_dim), nn.ReLU() ) self.attention = nn.MultiheadAttention(embed_dim=attention_dim, num_heads=1) self.decoder = nn.Sequential( nn.Linear(attention_dim, encoding_dim), nn.ReLU(), nn.Linear(encoding_dim, input_dim), nn.Sigmoid() ) def forward(self, x): x = self.encoder(x) x = x.unsqueeze(0) # Add a batch dimension for the attention mechanism x, _ = self.attention(x, x, x) x = x.squeeze(0) # Remove the batch dimension x = self.decoder(x) return x def train_improved_autoencoder_plain_data(self, data, input_dim, encoding_dim, attention_dim=32, epochs=50, batch_size=256, validation_split=0.2): train_size = int((1 - validation_split) * len(data)) val_size = len(data) - train_size train_data, val_data = random_split(TensorDataset(torch.tensor(data, dtype=torch.float32)), [train_size, val_size]) train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True) val_loader = DataLoader(val_data, batch_size=batch_size, shuffle=False) #input_dim = data.shape[1] print('INPUT DIM OF FEATURES: ', input_dim, data.shape[1]) model = ImprovedAutoencoder(input_dim, encoding_dim, attention_dim) criterion = nn.MSELoss() optimizer = optim.Adam(model.parameters(), lr=0.001) train_losses = [] val_losses = [] for epoch in range(epochs): model.train() train_loss = 0 for batch in train_loader: inputs = batch[0] inputs = inputs.to(torch.device('cuda' if torch.cuda.is_available() else 'cpu')) optimizer.zero_grad() reconstruction = model(inputs) loss = criterion(reconstruction, inputs) loss.backward() optimizer.step() train_loss += loss.item() train_loss /= len(train_loader) train_losses.append(train_loss) model.eval() val_loss = 0 with torch.no_grad(): for batch in val_loader: inputs = batch[0] inputs = inputs.to(torch.device('cuda' if torch.cuda.is_available() else 'cpu')) reconstruction = model(inputs) loss = criterion(reconstruction, inputs) val_loss += loss.item() val_loss /= len(val_loader) val_losses.append(val_loss) if (epoch + 1) % 10 == 0: print(f'Epoch [{epoch+1}/{epochs}], Train Loss: {train_loss:.4f}, Validation Loss: {val_loss:.4f}') #wandb.log({"epoch": epoch + 1, "train_loss": train_loss, "val_loss": val_loss}) #wandb.finish() encoder = model.encoder return encoder, train_losses, val_losses, model, train_loader, val_loader raps/dataloaders/marconi100.py +86 −10 Original line number Diff line number Diff line Loading @@ -23,12 +23,15 @@ """ import uuid import pandas as pd import random from tqdm import tqdm from ..config import load_config_variables from ..job import job_dict from ..utils import power_to_utilization, next_arrival from ..preprocessing import preprocessor from ..rnn_testing import rnn_testing from ..ranking import TabularTimeSeriesRankingModel, ranking_jobs_on_multi_feature_train, scoring_function load_config_variables([ 'CPUS_PER_NODE', 'GPUS_PER_NODE', Loading @@ -52,13 +55,73 @@ def load_data(jobs_path, **kwargs): ---------- jobs_path : str The path to the jobs parquet file. Returns ------- list The list of parsed jobs. """ jobs_df = pd.read_parquet(jobs_path, engine='pyarrow') jobs_df = jobs_df[0:500] p = preprocessor(jobs_df) feature_cols = ['cores_per_task', 'num_cores_req', 'num_cores_alloc', 'num_nodes_req', 'num_nodes_alloc', 'num_tasks', 'priority', 'num_gpus_req', 'num_gpus_alloc', 'mem_req', 'mem_alloc', 'time_limit'] target_col = 'cpu_power_consumption' N = 100 # Number of time steps in the input sequence M = 10 df_test, scaler, X_test_numerical = p.preprocess_data_disjoint_testing(feature_cols, target_col, N, M) #print(f"X_test_numerical {X_test_numerical}") # Create disjoint sequences for the time series data #X_time_series_train, y_train_sequences = p.create_disjoint_sequences3(df_train, target_col, N, M) # CHANGED LINE X_time_series_test, y_test_sequences = p.create_disjoint_sequences_testing(df_test, target_col, N, M) # CHANGED LINE #print(f"X_time_series_test {X_time_series_test}") model = rnn_testing() _ , _, cpu_power_consumption_predictions = model(X_test_numerical, X_time_series_test) sample_df_data=[] random.seed(42) for i in range(len(df_test["num_nodes_req"].values)): sample_df_data.append(random.randint(1, 10)) #print("printing sample data") #print(sample_df_data) sample_df = pd.DataFrame(columns=["num_nodes"], data = sample_df_data) #df_test["num_nodes_alloc"].values) sample_df["power_consumptions"] = cpu_power_consumption_predictions sample_df["priority"] = jobs_df["priority"] sample_df["time_limit"] = jobs_df["time_limit"] """ ###ranking # Hyperparameters and configuration num_features = 12 # Number of non-time-series features in tabular data time_series_length = 10 # Length of the time-series data lstm_hidden_size = 16 dense_hidden_size = 32 margin = 1.0 # Margin for the hinge loss # Instantiate the model model = TabularTimeSeriesRankingModel(num_features, time_series_length, lstm_hidden_size, dense_hidden_size) model = ranking_jobs_on_multi_feature_train(model, train_loader) ranking = get_job_ranking(model, x_test_tabular, x_test_ts) """ #print("printing sample_df from ranking_generation") #print(sample_df) scores = scoring_function(sample_df, feature_columns=['num_nodes', 'power_consumptions'], time_series_column='power_consumptions', time_series_stat='mean') """ high_priority = len(indices) priority_array = [] for i in range(high_priority): priority_array.append(0) for i in range(high_priority): priority_array[int(indices[i])] = int(i) print("printing job priorities") print(priority_array) """ jobs_df["ml_priority"]= scores #priority_array #jobs_df["priority"] jobs_df = jobs_df.sort_values(by='submit_time') jobs_df["num_nodes_req"] = sample_df["num_nodes"] jobs_df["num_nodes_alloc"] = sample_df["num_nodes"] #sorted(jobs, key=lambda job: job.submit_time) return load_data_from_df(jobs_df, **kwargs) Loading Loading @@ -141,30 +204,43 @@ def load_data_from_df(jobs_df: pd.DataFrame, **kwargs): gpu_trace = gpu_util * GPUS_PER_NODE priority = int(jobs_df.loc[jidx, 'priority']) # wall_time = jobs_df.loc[i, 'run_time'] wall_time = gpu_trace.size * TRACE_QUANTA # seconds end_state = jobs_df.loc[jidx, 'job_state'] time_start = jobs_df.loc[jidx+1, 'start_time'] diff = time_start - time_zero ##adding new features here #cores_per_task = jobs_df.loc[jidx, 'cores_per_task'] num_cores_req = jobs_df.loc[jidx, 'num_cores_req'] num_cores_alloc = jobs_df.loc[jidx,'num_cores_alloc'] num_nodes_req = jobs_df.loc[jidx, 'num_nodes_req'] num_nodes_alloc = jobs_df.loc[jidx, 'num_nodes_alloc'] num_tasks = jobs_df.loc[jidx, 'num_tasks'] num_gpus_req = jobs_df.loc[jidx, 'num_gpus_req'] num_gpus_alloc = jobs_df.loc[jidx,'num_gpus_alloc'] mem_req = jobs_df.loc[jidx, 'mem_req'] mem_alloc = jobs_df.loc[jidx, 'mem_alloc'] threads_per_core = jobs_df.loc[jidx, 'threads_per_core'] time_limit = jobs_df.loc[jidx, 'time_limit'] cpu_power_consumption = jobs_df.loc[jidx, 'cpu_power_consumption'] ml_priority = jobs_df.loc[jidx, 'ml_priority'] ##new features end here if jid == '*': time_offset = max(diff.total_seconds(), 0) else: # When extracting out a single job, run one iteration past the end of the job time_offset = UI_UPDATE_FREQ if fastforward: time_offset -= fastforward if reschedule: # Let the scheduler reschedule the jobs scheduled_nodes = None time_offset = next_arrival() else: # Prescribed replay scheduled_nodes = (jobs_df.loc[jidx, 'nodes']).tolist() if gpu_trace.size > 0 and time_offset >= 0: job_info = job_dict(nodes_required, name, cpu_trace, gpu_trace, wall_time, end_state, scheduled_nodes, time_offset, job_id, priority) #job_info = job_dict(nodes_required, name, cpu_trace, gpu_trace, wall_time, # end_state, scheduled_nodes, time_offset, job_id, priority) #new job info job_info = job_dict(nodes_required, name, cpu_trace, gpu_trace, wall_time, end_state, scheduled_nodes, time_offset, job_id, num_cores_req, num_cores_alloc, num_nodes_req, num_nodes_alloc, num_tasks, num_gpus_req, num_gpus_alloc, mem_req, mem_alloc, threads_per_core, time_limit, cpu_power_consumption, ml_priority, priority) jobs.append(job_info) return jobs raps/job.py +38 −2 Original line number Diff line number Diff line from enum import Enum """ def job_dict(nodes_required, name, cpu_trace, gpu_trace, wall_time, \ end_state, scheduled_nodes, time_offset, job_id, priority=0): """ Return job info dictionary """ Return job info dictionary return { 'nodes_required': nodes_required, 'name': name, Loading @@ -16,6 +16,42 @@ def job_dict(nodes_required, name, cpu_trace, gpu_trace, wall_time, \ 'priority': priority } """ ##new job_dict def job_dict(nodes_required, name, cpu_trace, gpu_trace, wall_time, \ end_state, scheduled_nodes, time_offset, job_id, num_cores_req, num_cores_alloc, num_nodes_req, num_nodes_alloc, num_tasks, num_gpus_req, num_gpus_alloc, mem_req, mem_alloc, threads_per_core, time_limit, cpu_power_consumption, ml_priority, priority=0): """ Return job info dictionary """ return { 'nodes_required': nodes_required, 'name': name, 'cpu_trace': cpu_trace, 'gpu_trace': gpu_trace, 'wall_time': wall_time, 'end_state': end_state, 'requested_nodes': scheduled_nodes, 'submit_time': time_offset, 'id': job_id, 'priority': priority, # the features below are newly added for ML based decision making #'cores_per_task': cores_per_task, 'num_cores_req': num_cores_req, 'num_cores_alloc': num_cores_alloc, 'num_nodes_req': num_nodes_req, 'num_nodes_alloc': num_nodes_alloc, 'num_tasks': num_tasks, 'num_gpus_req': num_gpus_req, 'num_gpus_alloc': num_gpus_alloc, 'mem_req': mem_req, 'mem_alloc': mem_alloc, 'threads_per_core': threads_per_core, 'time_limit': time_limit, 'ml_priority': ml_priority, 'cpu_power_consumption': cpu_power_consumption } class JobState(Enum): """Enumeration for job states.""" Loading Loading
config/marconi100/system.json +3 −3 Original line number Diff line number Diff line { "NUM_CDUS": 17, "NUM_CDUS": 1, "RACKS_PER_CDU": 3, "NODES_PER_RACK": 20, "NODES_PER_RACK": 10, "RECTIFIERS_PER_RACK": 5, "CHASSIS_PER_RACK": 1, "NODES_PER_BLADE": 1, Loading @@ -9,7 +9,7 @@ "NICS_PER_NODE": 2, "RECTIFIERS_PER_CHASSIS": 5, "NODES_PER_RECTIFIER": 4, "MISSING_RACKS": [49, 50], "MISSING_RACKS": [0], "DOWN_NODES": [], "CPUS_PER_NODE": 2, "GPUS_PER_NODE": 4, Loading
main.py +27 −2 Original line number Diff line number Diff line Loading @@ -9,7 +9,7 @@ import os import re import sys import time import csv from tqdm import tqdm from raps.policy import PolicyType Loading @@ -26,7 +26,7 @@ parser.add_argument('--start', type=str, help='ISO8061 string for start of simul parser.add_argument('--end', type=str, help='ISO8061 string for end of simulation') parser.add_argument('-d', '--debug', action='store_true', help='Enable debug mode and disable rich layout') parser.add_argument('-e', '--encrypt', action='store_true', help='Encrypt any sensitive data in telemetry') parser.add_argument('-n', '--numjobs', type=int, default=1000, help='Number of jobs to schedule') parser.add_argument('-n', '--numjobs', type=int, default=10, help='Number of jobs to schedule') parser.add_argument('-t', '--time', type=str, default=None, help='Length of time to simulate, e.g., 123, 123s, 27m, 3h, 7d') parser.add_argument('-ff', '--fastforward', type=str, default=None, help='Fast-forward by time amount (uses same units as -t)') parser.add_argument('-v', '--verbose', action='store_true', help='Enable verbose output') Loading Loading @@ -105,6 +105,7 @@ else: else: power_manager = PowerManager(SC_SHAPE, DOWN_NODES, power_func=compute_node_power) print(f"printing Schedule from main {args.schedule}") flops_manager = FLOPSManager(SC_SHAPE) layout_manager = LayoutManager(args.layout, args.debug) sc = Scheduler(TOTAL_NODES, DOWN_NODES, power_manager, flops_manager, layout_manager, Loading Loading @@ -180,6 +181,30 @@ if args.verbose: sc.run_simulation_blocking(jobs, timesteps=timesteps) output_stats = sc.get_stats() """ keys_to_include = ["num_samples", "jobs completed", "throughput", "average_turnaround_time", "average_waiting_time", "total_job_power", "fairness_index", "node_efficiency", "energy_delay_product"] fileI = open(f"Time-{args.time}-policy-{args.schedule}.csv", "w") writer = csv.DictWriter(fileI, fieldnames=keys_to_include) filtered_stats = {key: output_stats[key] for key in keys_to_include} writer.writerow(filtered_stats) """ with open(f"Time-{args.time}-policy-{args.schedule}.csv",mode="w",newline="") as fileI: writer = csv.writer(fileI) for key, value in output_stats.items(): writer.writerow([key, value]) fileI.close() print("csv written successfully") # Following b/c we get the following error when we use PM100 telemetry dataset # TypeError: Object of type int64 is not JSON serializable try: Loading
raps/Autoencoder.py 0 → 100644 +86 −0 Original line number Diff line number Diff line import torch import torch.nn as nn import torch.optim as optim from torch.utils.data import DataLoader, TensorDataset, random_split #import wandb import torch class ImprovedAutoencoder(nn.Module): def __init__(self, input_dim, encoding_dim, attention_dim=32): super(ImprovedAutoencoder, self).__init__() self.encoder = nn.Sequential( nn.Linear(input_dim, encoding_dim), nn.ReLU(), nn.Linear(encoding_dim, attention_dim), nn.ReLU() ) self.attention = nn.MultiheadAttention(embed_dim=attention_dim, num_heads=1) self.decoder = nn.Sequential( nn.Linear(attention_dim, encoding_dim), nn.ReLU(), nn.Linear(encoding_dim, input_dim), nn.Sigmoid() ) def forward(self, x): x = self.encoder(x) x = x.unsqueeze(0) # Add a batch dimension for the attention mechanism x, _ = self.attention(x, x, x) x = x.squeeze(0) # Remove the batch dimension x = self.decoder(x) return x def train_improved_autoencoder_plain_data(self, data, input_dim, encoding_dim, attention_dim=32, epochs=50, batch_size=256, validation_split=0.2): train_size = int((1 - validation_split) * len(data)) val_size = len(data) - train_size train_data, val_data = random_split(TensorDataset(torch.tensor(data, dtype=torch.float32)), [train_size, val_size]) train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True) val_loader = DataLoader(val_data, batch_size=batch_size, shuffle=False) #input_dim = data.shape[1] print('INPUT DIM OF FEATURES: ', input_dim, data.shape[1]) model = ImprovedAutoencoder(input_dim, encoding_dim, attention_dim) criterion = nn.MSELoss() optimizer = optim.Adam(model.parameters(), lr=0.001) train_losses = [] val_losses = [] for epoch in range(epochs): model.train() train_loss = 0 for batch in train_loader: inputs = batch[0] inputs = inputs.to(torch.device('cuda' if torch.cuda.is_available() else 'cpu')) optimizer.zero_grad() reconstruction = model(inputs) loss = criterion(reconstruction, inputs) loss.backward() optimizer.step() train_loss += loss.item() train_loss /= len(train_loader) train_losses.append(train_loss) model.eval() val_loss = 0 with torch.no_grad(): for batch in val_loader: inputs = batch[0] inputs = inputs.to(torch.device('cuda' if torch.cuda.is_available() else 'cpu')) reconstruction = model(inputs) loss = criterion(reconstruction, inputs) val_loss += loss.item() val_loss /= len(val_loader) val_losses.append(val_loss) if (epoch + 1) % 10 == 0: print(f'Epoch [{epoch+1}/{epochs}], Train Loss: {train_loss:.4f}, Validation Loss: {val_loss:.4f}') #wandb.log({"epoch": epoch + 1, "train_loss": train_loss, "val_loss": val_loss}) #wandb.finish() encoder = model.encoder return encoder, train_losses, val_losses, model, train_loader, val_loader
raps/dataloaders/marconi100.py +86 −10 Original line number Diff line number Diff line Loading @@ -23,12 +23,15 @@ """ import uuid import pandas as pd import random from tqdm import tqdm from ..config import load_config_variables from ..job import job_dict from ..utils import power_to_utilization, next_arrival from ..preprocessing import preprocessor from ..rnn_testing import rnn_testing from ..ranking import TabularTimeSeriesRankingModel, ranking_jobs_on_multi_feature_train, scoring_function load_config_variables([ 'CPUS_PER_NODE', 'GPUS_PER_NODE', Loading @@ -52,13 +55,73 @@ def load_data(jobs_path, **kwargs): ---------- jobs_path : str The path to the jobs parquet file. Returns ------- list The list of parsed jobs. """ jobs_df = pd.read_parquet(jobs_path, engine='pyarrow') jobs_df = jobs_df[0:500] p = preprocessor(jobs_df) feature_cols = ['cores_per_task', 'num_cores_req', 'num_cores_alloc', 'num_nodes_req', 'num_nodes_alloc', 'num_tasks', 'priority', 'num_gpus_req', 'num_gpus_alloc', 'mem_req', 'mem_alloc', 'time_limit'] target_col = 'cpu_power_consumption' N = 100 # Number of time steps in the input sequence M = 10 df_test, scaler, X_test_numerical = p.preprocess_data_disjoint_testing(feature_cols, target_col, N, M) #print(f"X_test_numerical {X_test_numerical}") # Create disjoint sequences for the time series data #X_time_series_train, y_train_sequences = p.create_disjoint_sequences3(df_train, target_col, N, M) # CHANGED LINE X_time_series_test, y_test_sequences = p.create_disjoint_sequences_testing(df_test, target_col, N, M) # CHANGED LINE #print(f"X_time_series_test {X_time_series_test}") model = rnn_testing() _ , _, cpu_power_consumption_predictions = model(X_test_numerical, X_time_series_test) sample_df_data=[] random.seed(42) for i in range(len(df_test["num_nodes_req"].values)): sample_df_data.append(random.randint(1, 10)) #print("printing sample data") #print(sample_df_data) sample_df = pd.DataFrame(columns=["num_nodes"], data = sample_df_data) #df_test["num_nodes_alloc"].values) sample_df["power_consumptions"] = cpu_power_consumption_predictions sample_df["priority"] = jobs_df["priority"] sample_df["time_limit"] = jobs_df["time_limit"] """ ###ranking # Hyperparameters and configuration num_features = 12 # Number of non-time-series features in tabular data time_series_length = 10 # Length of the time-series data lstm_hidden_size = 16 dense_hidden_size = 32 margin = 1.0 # Margin for the hinge loss # Instantiate the model model = TabularTimeSeriesRankingModel(num_features, time_series_length, lstm_hidden_size, dense_hidden_size) model = ranking_jobs_on_multi_feature_train(model, train_loader) ranking = get_job_ranking(model, x_test_tabular, x_test_ts) """ #print("printing sample_df from ranking_generation") #print(sample_df) scores = scoring_function(sample_df, feature_columns=['num_nodes', 'power_consumptions'], time_series_column='power_consumptions', time_series_stat='mean') """ high_priority = len(indices) priority_array = [] for i in range(high_priority): priority_array.append(0) for i in range(high_priority): priority_array[int(indices[i])] = int(i) print("printing job priorities") print(priority_array) """ jobs_df["ml_priority"]= scores #priority_array #jobs_df["priority"] jobs_df = jobs_df.sort_values(by='submit_time') jobs_df["num_nodes_req"] = sample_df["num_nodes"] jobs_df["num_nodes_alloc"] = sample_df["num_nodes"] #sorted(jobs, key=lambda job: job.submit_time) return load_data_from_df(jobs_df, **kwargs) Loading Loading @@ -141,30 +204,43 @@ def load_data_from_df(jobs_df: pd.DataFrame, **kwargs): gpu_trace = gpu_util * GPUS_PER_NODE priority = int(jobs_df.loc[jidx, 'priority']) # wall_time = jobs_df.loc[i, 'run_time'] wall_time = gpu_trace.size * TRACE_QUANTA # seconds end_state = jobs_df.loc[jidx, 'job_state'] time_start = jobs_df.loc[jidx+1, 'start_time'] diff = time_start - time_zero ##adding new features here #cores_per_task = jobs_df.loc[jidx, 'cores_per_task'] num_cores_req = jobs_df.loc[jidx, 'num_cores_req'] num_cores_alloc = jobs_df.loc[jidx,'num_cores_alloc'] num_nodes_req = jobs_df.loc[jidx, 'num_nodes_req'] num_nodes_alloc = jobs_df.loc[jidx, 'num_nodes_alloc'] num_tasks = jobs_df.loc[jidx, 'num_tasks'] num_gpus_req = jobs_df.loc[jidx, 'num_gpus_req'] num_gpus_alloc = jobs_df.loc[jidx,'num_gpus_alloc'] mem_req = jobs_df.loc[jidx, 'mem_req'] mem_alloc = jobs_df.loc[jidx, 'mem_alloc'] threads_per_core = jobs_df.loc[jidx, 'threads_per_core'] time_limit = jobs_df.loc[jidx, 'time_limit'] cpu_power_consumption = jobs_df.loc[jidx, 'cpu_power_consumption'] ml_priority = jobs_df.loc[jidx, 'ml_priority'] ##new features end here if jid == '*': time_offset = max(diff.total_seconds(), 0) else: # When extracting out a single job, run one iteration past the end of the job time_offset = UI_UPDATE_FREQ if fastforward: time_offset -= fastforward if reschedule: # Let the scheduler reschedule the jobs scheduled_nodes = None time_offset = next_arrival() else: # Prescribed replay scheduled_nodes = (jobs_df.loc[jidx, 'nodes']).tolist() if gpu_trace.size > 0 and time_offset >= 0: job_info = job_dict(nodes_required, name, cpu_trace, gpu_trace, wall_time, end_state, scheduled_nodes, time_offset, job_id, priority) #job_info = job_dict(nodes_required, name, cpu_trace, gpu_trace, wall_time, # end_state, scheduled_nodes, time_offset, job_id, priority) #new job info job_info = job_dict(nodes_required, name, cpu_trace, gpu_trace, wall_time, end_state, scheduled_nodes, time_offset, job_id, num_cores_req, num_cores_alloc, num_nodes_req, num_nodes_alloc, num_tasks, num_gpus_req, num_gpus_alloc, mem_req, mem_alloc, threads_per_core, time_limit, cpu_power_consumption, ml_priority, priority) jobs.append(job_info) return jobs
raps/job.py +38 −2 Original line number Diff line number Diff line from enum import Enum """ def job_dict(nodes_required, name, cpu_trace, gpu_trace, wall_time, \ end_state, scheduled_nodes, time_offset, job_id, priority=0): """ Return job info dictionary """ Return job info dictionary return { 'nodes_required': nodes_required, 'name': name, Loading @@ -16,6 +16,42 @@ def job_dict(nodes_required, name, cpu_trace, gpu_trace, wall_time, \ 'priority': priority } """ ##new job_dict def job_dict(nodes_required, name, cpu_trace, gpu_trace, wall_time, \ end_state, scheduled_nodes, time_offset, job_id, num_cores_req, num_cores_alloc, num_nodes_req, num_nodes_alloc, num_tasks, num_gpus_req, num_gpus_alloc, mem_req, mem_alloc, threads_per_core, time_limit, cpu_power_consumption, ml_priority, priority=0): """ Return job info dictionary """ return { 'nodes_required': nodes_required, 'name': name, 'cpu_trace': cpu_trace, 'gpu_trace': gpu_trace, 'wall_time': wall_time, 'end_state': end_state, 'requested_nodes': scheduled_nodes, 'submit_time': time_offset, 'id': job_id, 'priority': priority, # the features below are newly added for ML based decision making #'cores_per_task': cores_per_task, 'num_cores_req': num_cores_req, 'num_cores_alloc': num_cores_alloc, 'num_nodes_req': num_nodes_req, 'num_nodes_alloc': num_nodes_alloc, 'num_tasks': num_tasks, 'num_gpus_req': num_gpus_req, 'num_gpus_alloc': num_gpus_alloc, 'mem_req': mem_req, 'mem_alloc': mem_alloc, 'threads_per_core': threads_per_core, 'time_limit': time_limit, 'ml_priority': ml_priority, 'cpu_power_consumption': cpu_power_consumption } class JobState(Enum): """Enumeration for job states.""" Loading