diff --git a/raps/dataloaders/frontier.py b/raps/dataloaders/frontier.py index 8491617e8886afbfda35930ddcd49cf0e24a0e9a..d0662286e0327c06810d788d7869aa97bf2bf839 100644 --- a/raps/dataloaders/frontier.py +++ b/raps/dataloaders/frontier.py @@ -330,9 +330,10 @@ def load_data_from_df(jobs_df: pd.DataFrame, jobprofile_df: pd.DataFrame, **kwar def load_live_data(**kwargs): """ Load Slurm Live data using pyslurm """ + config = kwargs.get('config') jobs = list() telemetry_start = int(time.time()) # This is now! get unix time - telemetry_start = 1755721300 + #telemetry_start = 1755721300 if hasattr(kwargs, 'time'): time_to_sim = kwargs.get('time') # Should be specified . assert isinstance(time_to_sim, int) @@ -343,18 +344,28 @@ def load_live_data(**kwargs): total_partitions = 0 partition_dict = dict() - import pyslurm # noqa - # Local Tests - # filename = "something/something/pyslurm.dump" - # with open(filename, 'r') as f: - # s = f.read() - # data = ast.literal_eval(s) - # - data = pyslurm.job().get() - - for jidx, jdata in data.items(): + if True: + # Test with job_regressor project + import pyslurm # noqa + data = pyslurm.job().get() + else: + # Local Tests + import ast + filename = "/Users/mnq/s1.dump" + with open(filename, 'r') as f: + s = f.read() + data = ast.literal_eval(s) + + from job_regressor.config_utils import load_config + cfg_path = "/lustre/orion/stf218/scratch/maiterth/Repositories/exadigit/job-regressor/configs/local-test.yaml" + cfg = load_config(cfg_path) + from job_regressor.inference_runner import InferenceRunner + runner = InferenceRunner(cfg, quiet=True) + predictions = runner.predict_records(data) + for (jidx, jdata), (_, pred) in zip(data.items(), predictions.iterrows()): if jdata['job_state'] == "COMPLETED" \ - or jdata['job_state'] == "CANCELLED": + or jdata['job_state'] == "CANCELLED" \ + or jdata['job_state'] == "REQUEUE_HOLD" : continue if jdata['job_state'] == "TIMEOUT" \ or jdata['job_state'] == "FAILED": @@ -446,7 +457,7 @@ def load_live_data(**kwargs): scheduled_nodes_str_list = jdata['req_nodes'] # Explicitly requested nodes # Missmatch between slurm and raps scheduled_nodes = [] for n in scheduled_nodes_str_list: - scheduled_nodes = int(n[8:]) + scheduled_nodes.append(int(n[8:])) # Do we need to reintroduce a list of explicitly required nodes? This is currently handled by setting the # scheduled_nodes before the scheduler modifies this list # 'req_switch': int, @@ -479,7 +490,6 @@ def load_live_data(**kwargs): assert current_run_time == 0, "Check if any other value occurs and should be handled! " \ f"current_run_time:{current_run_time}" \ f"\njdata:\n{jdata}" - expected_run_time = None # 'state_reason': String # e.g. 'JobHeldUser', # 'std_err': String, # 'std_in': String, @@ -506,8 +516,12 @@ def load_live_data(**kwargs): # 'work_dir': String # 'cpus_allocated': dict, # 'cpus_alloc_layout': dict - cpu_trace = None # To be determined by a model! - gpu_trace = None + # Prediction To be determined by a model! + expected_run_time = int(pred["end_time"]) # predicted runtime + cpu_power = min(max(float(pred["avg_cpu_power"]),config['POWER_CPU_IDLE']),config['POWER_CPU_MAX']) + cpu_trace = (cpu_power / config['POWER_CPU_MAX']) * config['CPUS_PER_NODE'] + gpu_power = min(max(float(pred["avg_gpu_power"]),config['POWER_GPU_IDLE']),config['POWER_GPU_MAX']) + gpu_trace = (gpu_power / config['POWER_GPU_MAX']) * config['GPUS_PER_NODE'] trace_time = None trace_start_time = None trace_end_time = None diff --git a/raps/ui.py b/raps/ui.py index d9c3bbe199a60c67391054ac6200cee8c6dcda21..6668d27c825c6d0bf7701966d6cef9b24a09bb90 100644 --- a/raps/ui.py +++ b/raps/ui.py @@ -31,9 +31,11 @@ class LayoutManager: if args_dict is not None: self.noui = args_dict.get("noui") self.simulate_network = args_dict.get("simulate_network") + self.encrypt = args_dict.get("encrypt") else: self.noui = False self.simulate_network = False + self.encrypt = False self.engine = engine self.config = config self.topology = self.engine.config.get("TOPOLOGY", "none") @@ -194,11 +196,16 @@ class LayoutManager: else: running_time_str = convert_seconds_to_hhmm(job.running_time) + if self.encrypt: + job_name_str="hidden" + else: + job_name_str=str(job.name) + row = [ str(job.id).zfill(5), convert_seconds_to_hhmm(job.time_limit // self.engine.downscale), # str(job.wall_time), - str(job.name), + job_name_str, str(job.account), job.current_state.value, str(job.nodes_required),