From 208326a99265e4f9a70a9c49ad603b2aa26c0b62 Mon Sep 17 00:00:00 2001 From: Matthias Maiterth Date: Thu, 28 Aug 2025 21:28:11 -0400 Subject: [PATCH 1/4] live-test 1 --- raps/dataloaders/frontier.py | 24 +++++++++++------------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/raps/dataloaders/frontier.py b/raps/dataloaders/frontier.py index 24326a4..2decba9 100644 --- a/raps/dataloaders/frontier.py +++ b/raps/dataloaders/frontier.py @@ -346,24 +346,22 @@ def load_live_data(**kwargs): if True: # Test with job_regressor project import pyslurm # noqa - from job_regressor.config_utils import load_config - from job_regressor.inference_runner import InferenceRunner data = pyslurm.job().get() - cfg_path = "" - cfg = load_config(cfg_path) - runner = InferenceRunner(cfg, quiet=True) - pred = runner.predict_records(data) else: # Local Tests import ast - filename = "something/something/pyslurm.dump" + filename = "/Users/mnq/s1.dump" with open(filename, 'r') as f: s = f.read() data = ast.literal_eval(s) - pred = (None, None, None) - # - for (jidx, jdata), (pcpu, pgpu, prt) in zip(data.items(), pred): + from job_regressor.config_utils import load_config + cfg_path = "/Users/mnq/Repositories/exadigit/job-regressor/configs/local-test.yaml" + cfg = load_config(cfg_path) + from job_regressor.inference_runner import InferenceRunner + runner = InferenceRunner(cfg, quiet=True) + predictions = runner.predict_records(data) + for (jidx, jdata), (_, pred) in zip(data.items(), predictions.iterrows()): if jdata['job_state'] == "COMPLETED" \ or jdata['job_state'] == "CANCELLED": continue @@ -490,7 +488,7 @@ def load_live_data(**kwargs): assert current_run_time == 0, "Check if any other value occurs and should be handled! " \ f"current_run_time:{current_run_time}" \ f"\njdata:\n{jdata}" - expected_run_time = prt # predicted runtime + expected_run_time = int(pred["end_time"]) # predicted runtime # 'state_reason': String # e.g. 'JobHeldUser', # 'std_err': String, # 'std_in': String, @@ -517,8 +515,8 @@ def load_live_data(**kwargs): # 'work_dir': String # 'cpus_allocated': dict, # 'cpus_alloc_layout': dict - cpu_trace = pcpu # CPU prediction To be determined by a model! - gpu_trace = pgpu + cpu_trace = float(pred["avg_cpu_power"]) # CPU prediction To be determined by a model! + gpu_trace = float(pred["avg_gpu_power"]) trace_time = None trace_start_time = None trace_end_time = None -- GitLab From 387cf559d874819df4e61bb825d4d4374d4b36d7 Mon Sep 17 00:00:00 2001 From: Matthias Maiterth Date: Fri, 29 Aug 2025 08:27:36 -0400 Subject: [PATCH 2/4] Adjusted predcitions to be % utilization --- raps/dataloaders/frontier.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/raps/dataloaders/frontier.py b/raps/dataloaders/frontier.py index 2decba9..0fa042d 100644 --- a/raps/dataloaders/frontier.py +++ b/raps/dataloaders/frontier.py @@ -330,6 +330,7 @@ def load_data_from_df(jobs_df: pd.DataFrame, jobprofile_df: pd.DataFrame, **kwar def load_live_data(**kwargs): """ Load Slurm Live data using pyslurm """ + config = kwargs.get('config') jobs = list() telemetry_start = int(time.time()) # This is now! get unix time telemetry_start = 1755721300 @@ -343,7 +344,7 @@ def load_live_data(**kwargs): total_partitions = 0 partition_dict = dict() - if True: + if False: # Test with job_regressor project import pyslurm # noqa data = pyslurm.job().get() @@ -488,7 +489,6 @@ def load_live_data(**kwargs): assert current_run_time == 0, "Check if any other value occurs and should be handled! " \ f"current_run_time:{current_run_time}" \ f"\njdata:\n{jdata}" - expected_run_time = int(pred["end_time"]) # predicted runtime # 'state_reason': String # e.g. 'JobHeldUser', # 'std_err': String, # 'std_in': String, @@ -515,8 +515,10 @@ def load_live_data(**kwargs): # 'work_dir': String # 'cpus_allocated': dict, # 'cpus_alloc_layout': dict - cpu_trace = float(pred["avg_cpu_power"]) # CPU prediction To be determined by a model! - gpu_trace = float(pred["avg_gpu_power"]) + # Prediction To be determined by a model! + expected_run_time = int(pred["end_time"]) # predicted runtime + cpu_trace = (float(pred["avg_cpu_power"]) / config['POWER_CPU_MAX']) * config['CPUS_PER_NODE'] + gpu_trace = (float(pred["avg_gpu_power"]) / config['POWER_GPU_MAX']) * config['GPUS_PER_NODE'] trace_time = None trace_start_time = None trace_end_time = None -- GitLab From 4567a0171fdf24b86d96ed6b6b98206c25c0d326 Mon Sep 17 00:00:00 2001 From: Matthias Maiterth Date: Fri, 29 Aug 2025 10:40:57 -0400 Subject: [PATCH 3/4] Clamping live power prediction at the job side. --- raps/dataloaders/frontier.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/raps/dataloaders/frontier.py b/raps/dataloaders/frontier.py index 0fa042d..3cdea26 100644 --- a/raps/dataloaders/frontier.py +++ b/raps/dataloaders/frontier.py @@ -344,7 +344,7 @@ def load_live_data(**kwargs): total_partitions = 0 partition_dict = dict() - if False: + if True: # Test with job_regressor project import pyslurm # noqa data = pyslurm.job().get() @@ -357,7 +357,7 @@ def load_live_data(**kwargs): data = ast.literal_eval(s) from job_regressor.config_utils import load_config - cfg_path = "/Users/mnq/Repositories/exadigit/job-regressor/configs/local-test.yaml" + cfg_path = "/lustre/orion/stf218/scratch/maiterth/Repositories/exadigit/job-regressor/configs/local-test.yaml" cfg = load_config(cfg_path) from job_regressor.inference_runner import InferenceRunner runner = InferenceRunner(cfg, quiet=True) @@ -456,7 +456,7 @@ def load_live_data(**kwargs): scheduled_nodes_str_list = jdata['req_nodes'] # Explicitly requested nodes # Missmatch between slurm and raps scheduled_nodes = [] for n in scheduled_nodes_str_list: - scheduled_nodes = int(n[8:]) + scheduled_nodes.append(int(n[8:])) # Do we need to reintroduce a list of explicitly required nodes? This is currently handled by setting the # scheduled_nodes before the scheduler modifies this list # 'req_switch': int, @@ -517,8 +517,10 @@ def load_live_data(**kwargs): # 'cpus_alloc_layout': dict # Prediction To be determined by a model! expected_run_time = int(pred["end_time"]) # predicted runtime - cpu_trace = (float(pred["avg_cpu_power"]) / config['POWER_CPU_MAX']) * config['CPUS_PER_NODE'] - gpu_trace = (float(pred["avg_gpu_power"]) / config['POWER_GPU_MAX']) * config['GPUS_PER_NODE'] + cpu_power = min(max(float(pred["avg_cpu_power"]),config['POWER_CPU_IDLE']),config['POWER_CPU_MAX']) + cpu_trace = (cpu_power / config['POWER_CPU_MAX']) * config['CPUS_PER_NODE'] + gpu_power = min(max(float(pred["avg_gpu_power"]),config['POWER_GPU_IDLE']),config['POWER_GPU_MAX']) + gpu_trace = (gpu_power / config['POWER_GPU_MAX']) * config['GPUS_PER_NODE'] trace_time = None trace_start_time = None trace_end_time = None -- GitLab From 76f997bec19fb0c24f3fd61913a4b65c45ccf40b Mon Sep 17 00:00:00 2001 From: Matthias Maiterth Date: Mon, 13 Oct 2025 11:59:07 -0400 Subject: [PATCH 4/4] Added ui update to show --hidden-- on live forecast when using --encrypt option Added another job-state to be skipped as the job will never run. --- raps/dataloaders/frontier.py | 5 +++-- raps/ui.py | 9 ++++++++- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/raps/dataloaders/frontier.py b/raps/dataloaders/frontier.py index 3cdea26..d066228 100644 --- a/raps/dataloaders/frontier.py +++ b/raps/dataloaders/frontier.py @@ -333,7 +333,7 @@ def load_live_data(**kwargs): config = kwargs.get('config') jobs = list() telemetry_start = int(time.time()) # This is now! get unix time - telemetry_start = 1755721300 + #telemetry_start = 1755721300 if hasattr(kwargs, 'time'): time_to_sim = kwargs.get('time') # Should be specified . assert isinstance(time_to_sim, int) @@ -364,7 +364,8 @@ def load_live_data(**kwargs): predictions = runner.predict_records(data) for (jidx, jdata), (_, pred) in zip(data.items(), predictions.iterrows()): if jdata['job_state'] == "COMPLETED" \ - or jdata['job_state'] == "CANCELLED": + or jdata['job_state'] == "CANCELLED" \ + or jdata['job_state'] == "REQUEUE_HOLD" : continue if jdata['job_state'] == "TIMEOUT" \ or jdata['job_state'] == "FAILED": diff --git a/raps/ui.py b/raps/ui.py index d9c3bbe..6668d27 100644 --- a/raps/ui.py +++ b/raps/ui.py @@ -31,9 +31,11 @@ class LayoutManager: if args_dict is not None: self.noui = args_dict.get("noui") self.simulate_network = args_dict.get("simulate_network") + self.encrypt = args_dict.get("encrypt") else: self.noui = False self.simulate_network = False + self.encrypt = False self.engine = engine self.config = config self.topology = self.engine.config.get("TOPOLOGY", "none") @@ -194,11 +196,16 @@ class LayoutManager: else: running_time_str = convert_seconds_to_hhmm(job.running_time) + if self.encrypt: + job_name_str="hidden" + else: + job_name_str=str(job.name) + row = [ str(job.id).zfill(5), convert_seconds_to_hhmm(job.time_limit // self.engine.downscale), # str(job.wall_time), - str(job.name), + job_name_str, str(job.account), job.current_state.value, str(job.nodes_required), -- GitLab