Loading raps/dataloaders/frontier.py +30 −16 Original line number Diff line number Diff line Loading @@ -326,9 +326,10 @@ def load_data_from_df(jobs_df: pd.DataFrame, jobprofile_df: pd.DataFrame, **kwar def load_live_data(**kwargs): """ Load Slurm Live data using pyslurm """ config = kwargs.get('config') jobs = list() telemetry_start = int(time.time()) # This is now! get unix time telemetry_start = 1755721300 #telemetry_start = 1755721300 if hasattr(kwargs, 'time'): time_to_sim = kwargs.get('time') # Should be specified . assert isinstance(time_to_sim, int) Loading @@ -339,18 +340,28 @@ def load_live_data(**kwargs): total_partitions = 0 partition_dict = dict() if True: # Test with job_regressor project import pyslurm # noqa # Local Tests # filename = "something/something/pyslurm.dump" # with open(filename, 'r') as f: # s = f.read() # data = ast.literal_eval(s) # data = pyslurm.job().get() for jidx, jdata in data.items(): else: # Local Tests import ast filename = "/Users/mnq/s1.dump" with open(filename, 'r') as f: s = f.read() data = ast.literal_eval(s) from job_regressor.config_utils import load_config cfg_path = "/lustre/orion/stf218/scratch/maiterth/Repositories/exadigit/job-regressor/configs/local-test.yaml" cfg = load_config(cfg_path) from job_regressor.inference_runner import InferenceRunner runner = InferenceRunner(cfg, quiet=True) predictions = runner.predict_records(data) for (jidx, jdata), (_, pred) in zip(data.items(), predictions.iterrows()): if jdata['job_state'] == "COMPLETED" \ or jdata['job_state'] == "CANCELLED": or jdata['job_state'] == "CANCELLED" \ or jdata['job_state'] == "REQUEUE_HOLD" : continue if jdata['job_state'] == "TIMEOUT" \ or jdata['job_state'] == "FAILED": Loading Loading @@ -442,7 +453,7 @@ def load_live_data(**kwargs): scheduled_nodes_str_list = jdata['req_nodes'] # Explicitly requested nodes # Missmatch between slurm and raps scheduled_nodes = [] for n in scheduled_nodes_str_list: scheduled_nodes = int(n[8:]) scheduled_nodes.append(int(n[8:])) # Do we need to reintroduce a list of explicitly required nodes? This is currently handled by setting the # scheduled_nodes before the scheduler modifies this list # 'req_switch': int, Loading Loading @@ -475,7 +486,6 @@ def load_live_data(**kwargs): assert current_run_time == 0, "Check if any other value occurs and should be handled! " \ f"current_run_time:{current_run_time}" \ f"\njdata:\n{jdata}" expected_run_time = None # 'state_reason': String # e.g. 'JobHeldUser', # 'std_err': String, # 'std_in': String, Loading @@ -502,8 +512,12 @@ def load_live_data(**kwargs): # 'work_dir': String # 'cpus_allocated': dict, # 'cpus_alloc_layout': dict cpu_trace = None # To be determined by a model! gpu_trace = None # Prediction To be determined by a model! expected_run_time = int(pred["end_time"]) # predicted runtime cpu_power = min(max(float(pred["avg_cpu_power"]),config['POWER_CPU_IDLE']),config['POWER_CPU_MAX']) cpu_trace = (cpu_power / config['POWER_CPU_MAX']) * config['CPUS_PER_NODE'] gpu_power = min(max(float(pred["avg_gpu_power"]),config['POWER_GPU_IDLE']),config['POWER_GPU_MAX']) gpu_trace = (gpu_power / config['POWER_GPU_MAX']) * config['GPUS_PER_NODE'] trace_time = None trace_start_time = None trace_end_time = None Loading raps/ui.py +8 −1 Original line number Diff line number Diff line Loading @@ -33,9 +33,11 @@ class LayoutManager: if args_dict is not None: self.noui = args_dict.get("noui") self.simulate_network = args_dict.get("simulate_network") self.encrypt = args_dict.get("encrypt") else: self.noui = False self.simulate_network = False self.encrypt = False self.engine = engine self.config = config self.topology = self.engine.config.get("TOPOLOGY", "none") Loading Loading @@ -196,11 +198,16 @@ class LayoutManager: else: running_time_str = convert_seconds_to_hhmm(job.current_run_time) if self.encrypt: job_name_str="hidden" else: job_name_str=str(job.name) row = [ str(job.id).zfill(5), convert_seconds_to_hhmm(job.time_limit // self.engine.downscale), # str(job.wall_time), str(job.name), job_name_str, str(job.account), job.current_state.value, str(job.nodes_required), Loading Loading
raps/dataloaders/frontier.py +30 −16 Original line number Diff line number Diff line Loading @@ -326,9 +326,10 @@ def load_data_from_df(jobs_df: pd.DataFrame, jobprofile_df: pd.DataFrame, **kwar def load_live_data(**kwargs): """ Load Slurm Live data using pyslurm """ config = kwargs.get('config') jobs = list() telemetry_start = int(time.time()) # This is now! get unix time telemetry_start = 1755721300 #telemetry_start = 1755721300 if hasattr(kwargs, 'time'): time_to_sim = kwargs.get('time') # Should be specified . assert isinstance(time_to_sim, int) Loading @@ -339,18 +340,28 @@ def load_live_data(**kwargs): total_partitions = 0 partition_dict = dict() if True: # Test with job_regressor project import pyslurm # noqa # Local Tests # filename = "something/something/pyslurm.dump" # with open(filename, 'r') as f: # s = f.read() # data = ast.literal_eval(s) # data = pyslurm.job().get() for jidx, jdata in data.items(): else: # Local Tests import ast filename = "/Users/mnq/s1.dump" with open(filename, 'r') as f: s = f.read() data = ast.literal_eval(s) from job_regressor.config_utils import load_config cfg_path = "/lustre/orion/stf218/scratch/maiterth/Repositories/exadigit/job-regressor/configs/local-test.yaml" cfg = load_config(cfg_path) from job_regressor.inference_runner import InferenceRunner runner = InferenceRunner(cfg, quiet=True) predictions = runner.predict_records(data) for (jidx, jdata), (_, pred) in zip(data.items(), predictions.iterrows()): if jdata['job_state'] == "COMPLETED" \ or jdata['job_state'] == "CANCELLED": or jdata['job_state'] == "CANCELLED" \ or jdata['job_state'] == "REQUEUE_HOLD" : continue if jdata['job_state'] == "TIMEOUT" \ or jdata['job_state'] == "FAILED": Loading Loading @@ -442,7 +453,7 @@ def load_live_data(**kwargs): scheduled_nodes_str_list = jdata['req_nodes'] # Explicitly requested nodes # Missmatch between slurm and raps scheduled_nodes = [] for n in scheduled_nodes_str_list: scheduled_nodes = int(n[8:]) scheduled_nodes.append(int(n[8:])) # Do we need to reintroduce a list of explicitly required nodes? This is currently handled by setting the # scheduled_nodes before the scheduler modifies this list # 'req_switch': int, Loading Loading @@ -475,7 +486,6 @@ def load_live_data(**kwargs): assert current_run_time == 0, "Check if any other value occurs and should be handled! " \ f"current_run_time:{current_run_time}" \ f"\njdata:\n{jdata}" expected_run_time = None # 'state_reason': String # e.g. 'JobHeldUser', # 'std_err': String, # 'std_in': String, Loading @@ -502,8 +512,12 @@ def load_live_data(**kwargs): # 'work_dir': String # 'cpus_allocated': dict, # 'cpus_alloc_layout': dict cpu_trace = None # To be determined by a model! gpu_trace = None # Prediction To be determined by a model! expected_run_time = int(pred["end_time"]) # predicted runtime cpu_power = min(max(float(pred["avg_cpu_power"]),config['POWER_CPU_IDLE']),config['POWER_CPU_MAX']) cpu_trace = (cpu_power / config['POWER_CPU_MAX']) * config['CPUS_PER_NODE'] gpu_power = min(max(float(pred["avg_gpu_power"]),config['POWER_GPU_IDLE']),config['POWER_GPU_MAX']) gpu_trace = (gpu_power / config['POWER_GPU_MAX']) * config['GPUS_PER_NODE'] trace_time = None trace_start_time = None trace_end_time = None Loading
raps/ui.py +8 −1 Original line number Diff line number Diff line Loading @@ -33,9 +33,11 @@ class LayoutManager: if args_dict is not None: self.noui = args_dict.get("noui") self.simulate_network = args_dict.get("simulate_network") self.encrypt = args_dict.get("encrypt") else: self.noui = False self.simulate_network = False self.encrypt = False self.engine = engine self.config = config self.topology = self.engine.config.get("TOPOLOGY", "none") Loading Loading @@ -196,11 +198,16 @@ class LayoutManager: else: running_time_str = convert_seconds_to_hhmm(job.current_run_time) if self.encrypt: job_name_str="hidden" else: job_name_str=str(job.name) row = [ str(job.id).zfill(5), convert_seconds_to_hhmm(job.time_limit // self.engine.downscale), # str(job.wall_time), str(job.name), job_name_str, str(job.account), job.current_state.value, str(job.nodes_required), Loading