From 591076f976ec137b354c6f2fec439c3c5b85a265 Mon Sep 17 00:00:00 2001 From: Matthias Maiterth Date: Thu, 6 Feb 2025 14:51:51 -0500 Subject: [PATCH 01/12] Refactored engine timestep loop into 3 steps. Updated the Job initilaization and added a priority based sorting function for the default scheduler. 1. add eligible jobs to queue 2. sort the queue 3. schedule. Sorting and scheduling both sort, this needs to happen in one place. (Discuss with Wes) The job initialization overwrote values set by the job_dict. This is resolved now. The default scheduler now takes account information into account. --- main.py | 3 +-- raps/account.py | 2 +- raps/engine.py | 29 ++++++++++++++++++----------- raps/job.py | 11 +++++++---- raps/schedulers/default.py | 30 ++++++++++++++++++++---------- 5 files changed, 47 insertions(+), 28 deletions(-) diff --git a/main.py b/main.py index 58e7147..9dccab6 100644 --- a/main.py +++ b/main.py @@ -31,7 +31,6 @@ from raps.telemetry import Telemetry from raps.workload import Workload from raps.weather import Weather from raps.utils import create_casename, convert_to_seconds, write_dict_to_file, next_arrival -from raps.utils import toJSON config = ConfigManager(system_name=args.system).get_config() @@ -45,7 +44,7 @@ if args.cooling: args.layout = "layout2" if args_dict['start']: - cooling_model.weather = Weather(args_dict['start'], config = config) + cooling_model.weather = Weather(args_dict['start'], config=config) else: cooling_model = None diff --git a/raps/account.py b/raps/account.py index 033bcb9..3f1bb69 100644 --- a/raps/account.py +++ b/raps/account.py @@ -118,7 +118,7 @@ class Accounts: #update specific account associated with job if isinstance(jobstats, JobStatistics): if jobstats.account not in self.account_dict: - self.account_dict[jobstats.account] = Account(self._account_id,jobstats.account,0) + self.account_dict[jobstats.account] = Account(self._account_id,jobstats.account,0) # new account from job's.account_id with priority 0 self._account_id += 1 account = self.account_dict[jobstats.account] account.update_statistics(jobstats,self.average_user) diff --git a/raps/engine.py b/raps/engine.py index 9972854..fc779d4 100644 --- a/raps/engine.py +++ b/raps/engine.py @@ -37,7 +37,7 @@ class Engine: self.resource_manager = ResourceManager( total_nodes=self.config['TOTAL_NODES'], down_nodes=self.config['DOWN_NODES'] - ) + ) # Initialize running and queue, etc. self.running = [] @@ -55,7 +55,7 @@ class Engine: self.output = kwargs.get('output') self.replay = kwargs.get('replay') self.sys_util_history = [] - + # Get scheduler type from command-line args or default scheduler_type = kwargs.get('scheduler', 'default') self.scheduler = load_scheduler(scheduler_type)( @@ -65,17 +65,25 @@ class Engine: ) print(f"Using scheduler: {scheduler_type}") - + # Unused! def add_job(self, job): self.queue.append(job) - self.queue = self.scheduler.sort_jobs(self.queue) + self.queue = self.scheduler.sort_jobs(self.queue) # No need to sort here! + + def eligible_jobs(self,jobs_to_submit): + eligible_jobs_list = [] + while jobs_to_submit and jobs_to_submit[0]['submit_time'] <= self.current_time: + job_info = jobs_to_submit.pop(0) + job = Job(job_info, self.current_time) + eligible_jobs_list.append(job) + return eligible_jobs_list def tick(self): """Simulate a timestep.""" completed_jobs = [job for job in self.running if job.end_time is not None and job.end_time <= self.current_time] completed_job_stats = [] - + # Simulate node failure newly_downed_nodes = self.node_failure(self.config['MTBF']) @@ -112,7 +120,7 @@ class Engine: scheduled_nodes.append(job.scheduled_nodes) cpu_utils.append(cpu_util) gpu_utils.append(gpu_util) - + if len(scheduled_nodes) > 0: self.flops_manager.update_flop_state(scheduled_nodes, cpu_utils, gpu_utils) jobs_power = self.power_manager.update_power_state(scheduled_nodes, cpu_utils, gpu_utils, net_utils) @@ -219,12 +227,11 @@ class Engine: jobs_to_submit = sorted(jobs, key=lambda j: j['submit_time']) for timestep in range(timesteps): - # Submit jobs whose submit_time is <= current_time - while jobs_to_submit and jobs_to_submit[0]['submit_time'] <= self.current_time: - job_info = jobs_to_submit.pop(0) - job = Job(job_info, self.current_time) - self.add_job(job) + # identify eligible jobs and add them to the queue. + self.queue += self.eligible_jobs(jobs_to_submit) + #sort the queue according to the policy + self.queue = self.scheduler.sort_jobs(self.queue, self.accounts) # Schedule jobs that are now in the queue. self.scheduler.schedule(self.queue, self.running, self.current_time) diff --git a/raps/job.py b/raps/job.py index 6ce9f8d..a06eb84 100644 --- a/raps/job.py +++ b/raps/job.py @@ -42,8 +42,6 @@ class Job: _id_counter = 0 def __init__(self, job_dict, current_time, state=JobState.PENDING, account=None): - for key, value in job_dict.items(): setattr(self, key, value) - if not self.id: self.id = Job._get_next_id() # initializations self.start_time = None self.end_time = None @@ -53,6 +51,12 @@ class Job: self.power_history = [] self._state = state self.account = account + # if a job dict was given, override the values from the job_dict: + for key, value in job_dict.items(): + setattr(self, key, value) + # in any case: provide a job_id! + if not self.id: + self.id = Job._get_next_id() def __repr__(self): """Return a string representation of the job.""" @@ -100,7 +104,6 @@ class Job: return JobStatistics(self) - class JobStatistics: """ Reduced class for handling statistics after the job has finished. @@ -113,7 +116,7 @@ class JobStatistics: self.num_nodes = len(job.scheduled_nodes) self.run_time = job.running_time if len(job.power_history) == 0: - self.avg_node_power = 0 + self.avg_node_power = 0 self.max_node_power = 0 else: self.avg_node_power = sum(job.power_history) / len(job.power_history) / self.num_nodes diff --git a/raps/schedulers/default.py b/raps/schedulers/default.py index 7ab664b..903c055 100644 --- a/raps/schedulers/default.py +++ b/raps/schedulers/default.py @@ -1,7 +1,8 @@ from enum import Enum -from ..job import Job, JobState +#from ..job import Job, JobState # Unused from ..utils import summarize_ranges +from ..workload import MAX_PRIORITY class PolicyType(Enum): """Supported scheduling policies.""" @@ -13,7 +14,6 @@ class PolicyType(Enum): class Scheduler: """ Default job scheduler with various scheduling policies. """ - def __init__(self, config, policy, resource_manager=None): self.config = config @@ -23,18 +23,29 @@ class Scheduler: self.resource_manager = resource_manager self.debug = False - - def sort_jobs(self, queue): + def sort_jobs(self, queue, accounts=None): """Sort jobs based on the selected scheduling policy.""" if self.policy == PolicyType.FCFS or self.policy == PolicyType.BACKFILL: return sorted(queue, key=lambda job: job.submit_time) elif self.policy == PolicyType.SJF: return sorted(queue, key=lambda job: job.wall_time) elif self.policy == PolicyType.PRIORITY: - return sorted(queue, key=lambda job: job.priority, reverse=True) + return self.sort_by_job_and_account_priority(queue, accounts) else: raise ValueError(f"Unknown policy type: {self.policy}") + def sort_by_job_and_account_priority(self, queue, accounts=None): + priority_tuple_list = [] + for job in queue: + # create a tuple of the job and the priority + priority = job.priority + if job.account in accounts: + priority += accounts[job.account].priority + priority = max(priority, MAX_PRIORITY) + priority_tuple_list.append((priority,job)) + priority_tuple_list = sorted(priority_tuple_list, key=lambda x:x[1], reverse=True) + _, queue = zip(*priority_tuple_list) + return queue def schedule(self, queue, running, current_time, debug=False): # Sort the queue in place. @@ -63,20 +74,19 @@ class Scheduler: else: if self.policy == PolicyType.BACKFILL: # Try to find a backfill candidate from the entire queue. - backfill_job = self.find_backfill_job(queue, len(available_nodes), current_time) + backfill_job = self.find_backfill_job(queue, len(self.resource_manager.available_nodes), current_time) if backfill_job: - self.assign_nodes_to_job(backfill_job, available_nodes, current_time) + self.assign_nodes_to_job(backfill_job, self.resource_manager.available_nodes, current_time) running.append(backfill_job) queue.remove(backfill_job) if debug: scheduled_nodes = summarize_ranges(backfill_job.scheduled_nodes) print(f"t={current_time}: Backfilling job {backfill_job.id} with wall time {backfill_job.wall_time} on nodes {scheduled_nodes}") - def find_backfill_job(self, queue, num_free_nodes, current_time): """Finds a backfill job based on available nodes and estimated completion times. - - Based on pseudocode from Leonenkov and Zhumatiy, 'Introducing new backfill-based + + Based on pseudocode from Leonenkov and Zhumatiy, 'Introducing new backfill-based scheduler for slurm resource manager.' Procedia computer science 66 (2015): 661-669. """ -- GitLab From 8e7538709154a2bfcaa1a980179fa647bb320b41 Mon Sep 17 00:00:00 2001 From: Wes Brewer Date: Thu, 6 Feb 2025 14:38:44 -0500 Subject: [PATCH 02/12] Move node_failure() to resmgr.py --- raps/engine.py | 25 +------------------------ raps/resmgr.py | 34 ++++++++++++++++++++++++++++++++++ tests/smoke.py | 1 - 3 files changed, 35 insertions(+), 25 deletions(-) diff --git a/raps/engine.py b/raps/engine.py index fc779d4..f21f36b 100644 --- a/raps/engine.py +++ b/raps/engine.py @@ -85,7 +85,7 @@ class Engine: completed_job_stats = [] # Simulate node failure - newly_downed_nodes = self.node_failure(self.config['MTBF']) + newly_downed_nodes = self.resource_manager.node_failure(self.config['MTBF']) # Update active/free nodes self.num_free_nodes = len(self.resource_manager.available_nodes) @@ -282,26 +282,3 @@ class Engine: } return stats - - - def node_failure(self, mtbf): - """Simulate node failure using Weibull distribution.""" - from scipy.stats import weibull_min - shape_parameter = 1.5 - scale_parameter = mtbf * 3600 # Convert to seconds - - down_nodes = expand_ranges(self.down_nodes) - all_nodes = np.setdiff1d(np.arange(self.config['TOTAL_NODES']), np.array(down_nodes, dtype=int)) - - random_values = weibull_min.rvs(shape_parameter, scale=scale_parameter, size=all_nodes.size) - failure_threshold = 0.1 - failed_nodes_mask = random_values < failure_threshold - newly_downed_nodes = all_nodes[failed_nodes_mask] - - for node_index in newly_downed_nodes: - if node_index in self.resource_manager.available_nodes: - self.resource_manager.available_nodes.remove(node_index) - self.down_nodes.append(str(node_index)) - self.power_manager.set_idle(node_index) - - return newly_downed_nodes.tolist() diff --git a/raps/resmgr.py b/raps/resmgr.py index d4fee8a..508cc6f 100644 --- a/raps/resmgr.py +++ b/raps/resmgr.py @@ -1,4 +1,8 @@ +import numpy as np from .job import JobState +from .utils import expand_ranges +from scipy.stats import weibull_min + class ResourceManager: def __init__(self, total_nodes, down_nodes): @@ -48,3 +52,33 @@ class ResourceManager: utilization = (num_active_nodes / total_operational) * 100 if total_operational else 0 self.sys_util_history.append((current_time, utilization)) return utilization + + def node_failure(self, mtbf): + """Simulate node failure using Weibull distribution.""" + shape_parameter = 1.5 + scale_parameter = mtbf * 3600 # Convert to seconds + + # Create a NumPy array of node indices, excluding down nodes + #print(self.down_nodes) + #down_nodes = expand_ranges(self.down_nodes) + #all_nodes = np.setdiff1d(np.arange(self.config['TOTAL_NODES']), np.array(self.down_nodes, dtype=int)) + all_nodes = np.array(sorted(set(range(self.total_nodes)) - set(self.down_nodes))) + + # Sample the Weibull distribution for all nodes at once + random_values = weibull_min.rvs(shape_parameter, scale=scale_parameter, size=all_nodes.size) + failure_threshold = 0.1 + failed_nodes = [node for node, r in zip(all_nodes, random_values) if r < failure_threshold] + + # Identify nodes that have failed + failure_threshold = 0.1 + failed_nodes_mask = random_values < failure_threshold + newly_downed_nodes = all_nodes[failed_nodes_mask] + + # Update available and down nodes + for node_index in newly_downed_nodes: + if node_index in self.available_nodes: + self.available_nodes.remove(node_index) + self.down_nodes.append(str(node_index)) + self.power_manager.set_idle(node_index) + + return newly_downed_nodes.tolist() diff --git a/tests/smoke.py b/tests/smoke.py index 623908b..9174b3c 100644 --- a/tests/smoke.py +++ b/tests/smoke.py @@ -11,7 +11,6 @@ DEFAULT_TIME = "1h" # Define systems and their corresponding filenames SYSTEMS = { "frontier": "frontier/slurm/joblive/date=2024-01-18 frontier/jobprofile/date=2024-01-18", - "fugaku": "fugaku/21_04.parquet", "marconi100": "marconi100/job_table.parquet", "lassen": "lassen/Lassen-Supercomputer-Job-Dataset", "adastraMI250": "adastra/AdastaJobsMI250_15days.parquet" -- GitLab From 9c147bf5f12fbb535f31af2679e172ce81af2bd8 Mon Sep 17 00:00:00 2001 From: Wes Brewer Date: Thu, 6 Feb 2025 15:22:10 -0500 Subject: [PATCH 03/12] Fix bug in UI reporting of down_nodes --- raps/engine.py | 2 +- raps/resmgr.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/raps/engine.py b/raps/engine.py index f21f36b..8e6b228 100644 --- a/raps/engine.py +++ b/raps/engine.py @@ -194,7 +194,7 @@ class Engine: completed=completed_jobs, running=self.running, queue=self.queue, - down_nodes=expand_ranges(self.down_nodes), + down_nodes=expand_ranges(self.down_nodes[1:]), power_df=power_df, p_flops=pflops, g_flops_w=gflop_per_watt, diff --git a/raps/resmgr.py b/raps/resmgr.py index 508cc6f..196e636 100644 --- a/raps/resmgr.py +++ b/raps/resmgr.py @@ -78,7 +78,7 @@ class ResourceManager: for node_index in newly_downed_nodes: if node_index in self.available_nodes: self.available_nodes.remove(node_index) - self.down_nodes.append(str(node_index)) + self.down_nodes.add(str(node_index)) self.power_manager.set_idle(node_index) return newly_downed_nodes.tolist() -- GitLab From a717d5c903b8e390ab5f98d12c9f8c803cd42e89 Mon Sep 17 00:00:00 2001 From: Wes Brewer Date: Thu, 6 Feb 2025 15:46:59 -0500 Subject: [PATCH 04/12] Move get_utilization() from engine.py to utils.py and cleanup code. --- raps/engine.py | 25 +++++++------------------ raps/power.py | 4 +++- raps/resmgr.py | 7 ------- raps/telemetry.py | 2 -- raps/ui.py | 1 - raps/utils.py | 10 ++++++++++ raps/weather.py | 3 +-- raps/workload.py | 1 - 8 files changed, 21 insertions(+), 32 deletions(-) diff --git a/raps/engine.py b/raps/engine.py index 8e6b228..10c2b48 100644 --- a/raps/engine.py +++ b/raps/engine.py @@ -1,12 +1,11 @@ from typing import Optional import dataclasses -import numpy as np import pandas as pd from .job import Job, JobState from .account import Accounts from .network import network_utilization -from .utils import summarize_ranges, expand_ranges, write_dict_to_file +from .utils import summarize_ranges, expand_ranges, get_utilization from .resmgr import ResourceManager from .schedulers import load_scheduler @@ -82,10 +81,11 @@ class Engine: def tick(self): """Simulate a timestep.""" completed_jobs = [job for job in self.running if job.end_time is not None and job.end_time <= self.current_time] - completed_job_stats = [] # Simulate node failure newly_downed_nodes = self.resource_manager.node_failure(self.config['MTBF']) + for node in newly_downed_nodes: + self.power_manager.set_idle(node) # Update active/free nodes self.num_free_nodes = len(self.resource_manager.available_nodes) @@ -105,13 +105,13 @@ class Engine: if job.state == JobState.RUNNING: job.running_time = self.current_time - job.start_time time_quanta_index = (self.current_time - job.start_time) // self.config['TRACE_QUANTA'] - cpu_util = self.get_utilization(job.cpu_trace, time_quanta_index) - gpu_util = self.get_utilization(job.gpu_trace, time_quanta_index) + cpu_util = get_utilization(job.cpu_trace, time_quanta_index) + gpu_util = get_utilization(job.gpu_trace, time_quanta_index) net_util = 0 if len(job.ntx_trace) and len(job.nrx_trace): - net_tx = self.get_utilization(job.ntx_trace, time_quanta_index) - net_rx = self.get_utilization(job.nrx_trace, time_quanta_index) + net_tx = get_utilization(job.ntx_trace, time_quanta_index) + net_rx = get_utilization(job.nrx_trace, time_quanta_index) net_util = network_utilization(net_tx, net_rx) net_utils.append(net_util) else: @@ -164,7 +164,6 @@ class Engine: self.power_manager.history.append((self.current_time, total_power_kw)) self.sys_power = total_power_kw self.power_manager.loss_history.append((self.current_time, total_loss_kw)) - output_df = self.power_manager.get_power_df(rack_power, rack_loss) pflops = self.flops_manager.get_system_performance() / 1E15 gflop_per_watt = pflops * 1E6 / (total_power_kw * 1000) else: @@ -209,16 +208,6 @@ class Engine: return tick_data - def get_utilization(self, trace, time_quanta_index): - """Retrieve utilization value for a given trace at a specific time quanta index.""" - if isinstance(trace, (list, np.ndarray)): - return trace[time_quanta_index] - elif isinstance(trace, (int, float)): - return float(trace) - else: - raise TypeError(f"Invalid type for utilization: {type(trace)}.") - - def run_simulation(self, jobs, timesteps, autoshutdown=False): """Generator that yields after each simulation tick.""" self.timesteps = timesteps diff --git a/raps/power.py b/raps/power.py index 832a1ab..e61010f 100644 --- a/raps/power.py +++ b/raps/power.py @@ -334,7 +334,9 @@ class PowerManager: num_rectifiers = num_rectifiers_array[i, j, k] power_per_rectifier = chassis_power[i, j, k] / num_rectifiers rectifier_power[i, j, k, :num_rectifiers] = power_per_rectifier - power_with_losses[i, j, k, :num_rectifiers] = rectifier_loss(power_per_rectifier) + power_with_losses[i, j, k, :num_rectifiers] = compute_loss(power_per_rectifier, \ + self.config['RECTIFIER_LOSS_CONSTANT'], \ + self.config['RECTIFIER_EFFICIENCY']) rectifier_power = np.nan_to_num(rectifier_power) power_with_losses = np.nan_to_num(power_with_losses) diff --git a/raps/resmgr.py b/raps/resmgr.py index 196e636..8abce81 100644 --- a/raps/resmgr.py +++ b/raps/resmgr.py @@ -1,6 +1,5 @@ import numpy as np from .job import JobState -from .utils import expand_ranges from scipy.stats import weibull_min @@ -59,15 +58,10 @@ class ResourceManager: scale_parameter = mtbf * 3600 # Convert to seconds # Create a NumPy array of node indices, excluding down nodes - #print(self.down_nodes) - #down_nodes = expand_ranges(self.down_nodes) - #all_nodes = np.setdiff1d(np.arange(self.config['TOTAL_NODES']), np.array(self.down_nodes, dtype=int)) all_nodes = np.array(sorted(set(range(self.total_nodes)) - set(self.down_nodes))) # Sample the Weibull distribution for all nodes at once random_values = weibull_min.rvs(shape_parameter, scale=scale_parameter, size=all_nodes.size) - failure_threshold = 0.1 - failed_nodes = [node for node, r in zip(all_nodes, random_values) if r < failure_threshold] # Identify nodes that have failed failure_threshold = 0.1 @@ -79,6 +73,5 @@ class ResourceManager: if node_index in self.available_nodes: self.available_nodes.remove(node_index) self.down_nodes.add(str(node_index)) - self.power_manager.set_idle(node_index) return newly_downed_nodes.tolist() diff --git a/raps/telemetry.py b/raps/telemetry.py index 9aa72bc..833ca39 100644 --- a/raps/telemetry.py +++ b/raps/telemetry.py @@ -23,8 +23,6 @@ if __name__ == "__main__": import importlib import numpy as np -import re -from datetime import datetime from tqdm import tqdm from .config import ConfigManager diff --git a/raps/ui.py b/raps/ui.py index fc83e6b..8bee172 100644 --- a/raps/ui.py +++ b/raps/ui.py @@ -1,4 +1,3 @@ -import numpy as np import pandas as pd from rich.align import Align from rich.console import Console diff --git a/raps/utils.py b/raps/utils.py index 6c9d4d7..5ead3d1 100644 --- a/raps/utils.py +++ b/raps/utils.py @@ -360,3 +360,13 @@ def toJSON(obj): default=lambda o:o.__dict__, sort_keys=True, indent=4) + + +def get_utilization(trace, time_quanta_index): + """Retrieve utilization value for a given trace at a specific time quanta index.""" + if isinstance(trace, (list, np.ndarray)): + return trace[time_quanta_index] + elif isinstance(trace, (int, float)): + return float(trace) + else: + raise TypeError(f"Invalid type for utilization: {type(trace)}.") diff --git a/raps/weather.py b/raps/weather.py index 3917279..b31f88e 100644 --- a/raps/weather.py +++ b/raps/weather.py @@ -1,7 +1,6 @@ import requests import urllib3 -import json -from datetime import datetime, timedelta +from datetime import datetime # Disable SSL warnings when verify=False is used (temporary debugging purpose) urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) diff --git a/raps/workload.py b/raps/workload.py index 14d91cc..b368246 100644 --- a/raps/workload.py +++ b/raps/workload.py @@ -25,7 +25,6 @@ JOB_END_PROBS : list """ -import math import random import numpy as np -- GitLab From 5e97869154cdf0184f2754cf3bd3c5be5d2f8b27 Mon Sep 17 00:00:00 2001 From: Matthias Maiterth Date: Thu, 6 Feb 2025 20:09:49 -0500 Subject: [PATCH 05/12] Initial refactor to derive accounts from jobs. The intended way to obtain accounts is 1: Load account information from an account file (intended for a continued run tracking account statistics) 2: Derive from job information (If job information is stored in a npz file also the accounts should be stored in the npz, such that both can simply be loaded and no conversion/derivation is necessary) This is not yet fully tested. --- main.py | 17 +++++--- raps/account.py | 106 +++++++++++++++++++++++++++++----------------- raps/engine.py | 4 +- raps/telemetry.py | 11 ++--- raps/ui.py | 4 +- raps/workload.py | 2 +- 6 files changed, 88 insertions(+), 56 deletions(-) diff --git a/main.py b/main.py index 9dccab6..3c5ccd3 100644 --- a/main.py +++ b/main.py @@ -29,6 +29,7 @@ from raps.engine import Engine from raps.job import Job from raps.telemetry import Telemetry from raps.workload import Workload +from raps.account import Accounts from raps.weather import Weather from raps.utils import create_casename, convert_to_seconds, write_dict_to_file, next_arrival @@ -86,14 +87,14 @@ if args.replay: DIR_NAME = create_casename() # Read telemetry data (either npz file or via custom data loader) - if args.replay[0].endswith(".npz"): # replay .npz file + if args.replay[0].endswith(".npz"): # Replay .npz file print(f"Loading {args.replay[0]}...") - jobs = td.load_snapshot(args.replay[0]) + jobs, accounts = td.load_snapshot(args.replay[0]) if args.scale: for job in tqdm(jobs, desc=f"Scaling jobs to {args.scale} nodes"): job['nodes_required'] = random.randint(1, args.scale) - job['requested_nodes'] = None # Setting to None triggers scheduler to assign nodes + job['requested_nodes'] = None # Setting to None triggers scheduler to assign nodes if args.reschedule == 'poisson': print("available nodes:", config['AVAILABLE_NODES']) @@ -103,11 +104,13 @@ if args.replay: elif args.reschedule == 'submit-time': raise NotImplementedError - else: # custom data loader print(*args.replay) jobs = td.load_data(args.replay) - td.save_snapshot(jobs, filename=DIR_NAME) + accounts = Accounts(jobs) + sc.accounts = accounts + accounts_dict = accounts.to_dict() + td.save_snapshot(jobs, accounts, filename=DIR_NAME) # Set number of timesteps based on the last job running which we assume # is the maximum value of submit_time + wall_time of all the jobs @@ -119,9 +122,11 @@ if args.replay: print(f'Simulating {len(jobs)} jobs for {timesteps} seconds') time.sleep(1) -else: # synthetic jobs +else: # Synthetic jobs wl = Workload(config) jobs = getattr(wl, args.workload)(num_jobs=args.numjobs) + accounts = Accounts(jobs) + sc.accounts = accounts if args.verbose: for job_vector in jobs: diff --git a/raps/account.py b/raps/account.py index 3f1bb69..59e5b47 100644 --- a/raps/account.py +++ b/raps/account.py @@ -18,18 +18,29 @@ class Account: RUNNING, COMPLETED, CANCELLED, FAILED, or TIMEOUT. """ - def __init__(self, id, name, priority): + def __init__(self, id, name, + priority=0, + total_jobs_enqueued=0, + total_jobs_completed=0, + time_allocated=0, + energy_allocated=0, + avg_power=0, + fugaku_points=0 + ): self.id = id self.name = name self.priority = priority - self.total_jobs = 0 - self.time_allocated = 0 - self.energy_allocated = 0 - self.avg_power = 0 - self.fugaku_points = 0 + self.total_jobs_enqueued = total_jobs_enqueued + self.total_jobs_completed = total_jobs_completed + self.time_allocated = time_allocated + self.energy_allocated = energy_allocated + self.avg_power = avg_power + if self.avg_power == 0 and self.energy_allocated != 0: + self.avg_power = self.time_allocated / self.energy_allocated + self.fugaku_points = fugaku_points def update_statistics(self, jobstats, average_user): - self.total_jobs += 1 + self.total_jobs_completed += 1 self.time_allocated += jobstats.run_time self.energy_allocated += jobstats.energy if self.time_allocated == 0: @@ -44,7 +55,8 @@ class Account: def __repr__(self): return (f"Account(id={self.id}, name={self.name}), " f"priority: {self.priority}, " - f"total_jobs: {self.total_jobs}, " + f"total_jobs_enqueued: {self.total_jobs_enqueued}, " + f"total_jobs_completed: {self.total_jobs_completed}, " f"time_allocated: {self.time_allocated}, " f"energy_allocated: {self.energy_allocated}, " f"avg_power: {self.avg_power}, " @@ -56,16 +68,17 @@ class Account: "id": self.id, "name": self.name, "priority": self.priority, - "total_jobs":self.total_jobs, - "time_allocated":self.time_allocated, - "energy_allocated":self.energy_allocated, - "avg_power":self.avg_power, - "fugaku_points":self.fugaku_points + "total_jobs_enqueued": self.total_jobs_enqueued, + "total_jobs_completed": self.total_jobs_completed, + "time_allocated": self.time_allocated, + "energy_allocated": self.energy_allocated, + "avg_power": self.avg_power, + "fugaku_points": self.fugaku_points } @classmethod def init_from_dict(acct, account_dict): # id ,name, priority, total_jobs, time_allocated, energy_allocated, avg_power, fugaku_points): - acct = Account(account_dict["id"],account_dict["name"],account_dict["priority"]) + acct = Account(account_dict["id"], account_dict["name"], priority=account_dict["priority"]) acct.id = account_dict["id"] acct.name = account_dict["name"] acct.priority = account_dict["priority"] @@ -81,49 +94,64 @@ class Accounts: def update_average_user(self): total_accounts = len(self.account_dict) - self.average_user.total_jobs = self.all_users.total_jobs / total_accounts + self.average_user.total_jobs_enqueued = self.all_users.total_jobs_enqueued / total_accounts + self.average_user.total_jobs_completed = self.all_users.total_jobs_completed / total_accounts self.average_user.time_allocated = self.all_users.time_allocated / total_accounts self.average_user.energy_allocated = self.all_users.energy_allocated / total_accounts self.average_user.avg_power = self.all_users.avg_power / total_accounts self.fugaku_points = self.all_users.fugaku_points / total_accounts # this should be 0 return self - def __init__(self): + def __init__(self, jobs=None): self._account_id = 0 self.account_dict = dict() - self.all_users = Account(-2,"All_Users",0) - self.average_user = Account(-1,"Avg_User",0) - - def initialize_accounts_from_json(self,filename): + self.all_users = Account(-2, "All_Users") + self.average_user = Account(-1, "Avg_User") + if jobs: + if not isinstance(jobs,list): + raise TypeError + for job_dict in jobs: + if not isinstance(job_dict,dict): + raise TypeError + if job_dict["account"] not in self.account_dict: + self.account_dict[job_dict["account"]] = Account(self._account_id, job_dict["account"], total_jobs_enqueued=1) + self._account_id += 1 + else: + self.account_dict[job_dict["account"]].total_jobs_enqueued += 1 + + def initialize_accounts_from_dict(self, dictionary): + if '_account_id' in dictionary: + self._account_id = dictionary['_account_id'] + if 'account_dict' in dictionary: + dics_from_dictionary = dictionary['account_dict'] + self.account_dict = {} + for account_name, account_dict in dics_from_dictionary.items(): + self.account_dict[account_name] = Account.init_from_dict(account_dict) + + if 'all_users' in dictionary: + self.all_users = Account.init_from_dict(dictionary['all_users']) + if 'average_user' in dictionary: + self.average_user = Account.init_from_dict(dictionary['average_user']) + + def initialize_accounts_from_json(self, filename): try: with open(filename, 'r', encoding='utf-8') as file: json_object = json.load(file) - if '_account_id' in json_object: - self._account_id = json_object['_account_id'] - if 'account_dict' in json_object: - json_dict = json_object['account_dict'] - self.account_dict = {} - for account_name,account_dict in json_dict.items(): - self.account_dict[account_name] = Account.init_from_dict(account_dict) - #self.account_dict = json_object['account_dict'] - - if 'all_users' in json_object: - self.all_users = Account.init_from_dict(json_object['all_users']) - if 'average_user' in json_object: - self.average_user = Account.init_from_dict(json_object['average_user']) + self.initialize_accounts_from_dict(json_object) except ValueError: raise ValueError(f"{file} could not be read using json.load()") - def update_account_statistics(self,jobstats): - #update specific account associated with job + def update_account_statistics(self, jobstats): + # Update specific account associated with job if isinstance(jobstats, JobStatistics): if jobstats.account not in self.account_dict: - self.account_dict[jobstats.account] = Account(self._account_id,jobstats.account,0) # new account from job's.account_id with priority 0 - self._account_id += 1 + raise ValueError(f"Account {jobstats.account} not registered in Accounts object {self}") + # self.account_dict[jobstats.account] = Account(self._account_id, jobstats.account) + # self._account_id += 1 account = self.account_dict[jobstats.account] - account.update_statistics(jobstats,self.average_user) + account.update_statistics(jobstats, self.average_user) self.account_dict[jobstats.account] = account - #update the average_user account and the summary account + # Update the summary account (all_users) and the average_user account self.all_users.update_statistics(jobstats,self.average_user) self.update_average_user() diff --git a/raps/engine.py b/raps/engine.py index 10c2b48..9524c25 100644 --- a/raps/engine.py +++ b/raps/engine.py @@ -41,9 +41,7 @@ class Engine: # Initialize running and queue, etc. self.running = [] self.queue = [] - self.accounts = Accounts() - if 'accounts_json' in kwargs and kwargs['accounts_json']: - self.accounts.initialize_accounts_from_json(kwargs.get('accounts_json')) + self.accounts = None self.jobs_completed = 0 self.current_time = 0 self.cooling_model = cooling_model diff --git a/raps/telemetry.py b/raps/telemetry.py index 833ca39..b46d2c1 100644 --- a/raps/telemetry.py +++ b/raps/telemetry.py @@ -27,6 +27,7 @@ from tqdm import tqdm from .config import ConfigManager from .job import Job +from .account import Accounts from .plotting import plot_submit_times, plot_nodes_histogram from .utils import next_arrival @@ -44,15 +45,15 @@ class Telemetry: print("WARNING: Failed to load dataloader") - def save_snapshot(self, jobs: list, filename: str): + def save_snapshot(self, jobs: list, accounts: dict, filename: str): """Saves a snapshot of the jobs to a compressed file. """ - np.savez_compressed(filename, jobs=jobs) + np.savez_compressed(filename, jobs=jobs, accounts=accounts) - def load_snapshot(self, snapshot: str) -> list: + def load_snapshot(self, snapshot: str) -> (list, dict): """Reads a snapshot from a compressed file and returns the jobs.""" - jobs = np.load(snapshot, allow_pickle=True, mmap_mode='r') - return jobs['jobs'].tolist() + jobs, accounts_dict = np.load(snapshot, allow_pickle=True, mmap_mode='r') + return jobs['jobs'].tolist(), Accounts.initialize_accounts_from_dict(accounts_dict) def load_data(self, files): diff --git a/raps/ui.py b/raps/ui.py index 8bee172..e269663 100644 --- a/raps/ui.py +++ b/raps/ui.py @@ -379,7 +379,7 @@ class LayoutManager: if self.engine.cooling_model: self.update_powertemp_array( data.power_df, data.fmu_outputs, data.p_flops, data.g_flops_w, data.system_util, - uncertainties = uncertainties, + uncertainties=uncertainties, ) self.update_pressflow_array(data.fmu_outputs) @@ -390,7 +390,7 @@ class LayoutManager: ) self.update_power_array( data.power_df, data.p_flops, data.g_flops_w, - data.system_util, uncertainties = uncertainties, + data.system_util, uncertainties=uncertainties, ) def render(self): diff --git a/raps/workload.py b/raps/workload.py index b368246..c5dc898 100644 --- a/raps/workload.py +++ b/raps/workload.py @@ -111,7 +111,7 @@ class Workload: job_info = job_dict( config['AVAILABLE_NODES'], # Nodes required f"Max Test {partition}", # Name with partition label - ACCT_NAMES[0], # User account + ACCT_NAMES[0], # User account cpu_trace, # CPU trace gpu_trace, # GPU trace net_tx, # Network transmit trace -- GitLab From 5f0d7a649fa3da55bfd7c82f6394061456f480c5 Mon Sep 17 00:00:00 2001 From: Matthias Maiterth Date: Fri, 7 Feb 2025 16:22:52 -0500 Subject: [PATCH 06/12] Update to refactor of accounts 1. Added accounts into main and accounts generation from jobs. 2. Updated Accounts fugaku points computation 3. Introduced Account and Accounts merging functionality for loading (TODO: merge previous changes and test -o flag and subsequent --accounts-json flag) 4. Revereted account specifc sorting, but added sorted=False flag to scheduler --- main.py | 11 ++++++--- raps/account.py | 47 +++++++++++++++++++++++++++++++++++++- raps/engine.py | 2 +- raps/schedulers/default.py | 20 ++++------------ 4 files changed, 59 insertions(+), 21 deletions(-) diff --git a/main.py b/main.py index 3c5ccd3..5c244a3 100644 --- a/main.py +++ b/main.py @@ -125,8 +125,12 @@ if args.replay: else: # Synthetic jobs wl = Workload(config) jobs = getattr(wl, args.workload)(num_jobs=args.numjobs) - accounts = Accounts(jobs) - sc.accounts = accounts + job_accounts = Accounts(jobs) + if args.accounts_json: + loaded_accounts = Accounts.initialize_accounts_from_json(args.accounts_json) + accounts = loaded_accounts.merge(loaded_accounts,job_accounts) + else: + accounts = job_accounts if args.verbose: for job_vector in jobs: @@ -137,13 +141,14 @@ else: # Synthetic jobs if args.time: timesteps = convert_to_seconds(args.time) else: - timesteps = 88200 # 24 hours + timesteps = 88200 # 24 hours DIR_NAME = create_casename() OPATH = OUTPUT_PATH / DIR_NAME print("Output directory is: ", OPATH) sc.opath = OPATH +sc.accounts = accounts if args.plot or args.output: try: diff --git a/raps/account.py b/raps/account.py index 59e5b47..4955aea 100644 --- a/raps/account.py +++ b/raps/account.py @@ -39,6 +39,11 @@ class Account: self.avg_power = self.time_allocated / self.energy_allocated self.fugaku_points = fugaku_points + def update_fugaku_points(self, average_energy, average_power): + if average_power == 0: + raise ValueError(f"{average_power} is zero") + self.fugaku_points = (average_energy - self.energy_allocated) / average_power + def update_statistics(self, jobstats, average_user): self.total_jobs_completed += 1 self.time_allocated += jobstats.run_time @@ -50,7 +55,7 @@ class Account: if average_user.avg_power == 0: # If this is the first job use own power average_user.avg_power = self.avg_power if average_user.avg_power != 0: # If no energy was computed no points can be computed. - self.fugaku_points = (average_user.energy_allocated - self.energy_allocated) / average_user.avg_power + self.update_fugaku_points(average_user.energy_allocated, average_user.avg_power) def __repr__(self): return (f"Account(id={self.id}, name={self.name}), " @@ -90,6 +95,30 @@ class Account: return acct +def merge_account_of_same_id(account1:Account, account2:Account, new_id) -> Account: + merged_account = Account() + if account1.name != account2.name: + raise KeyError(f"{account1.name} != {account2.name}. Input arguments missmatch.") + merged_account.name = account1.name + merged_account.id = new_id # This has to be relative to the Accounts Object and cannot be derived from the individual Account objects + if account1.priority is 0: + merged_account.priority = account2.priority + elif account2.priority is 0: + merged_account.priority = account1.priority + else: + raise ValueError("Priority Cannot be derived!") + + merged_account.total_jobs_enqueued = account1.total_jobs_enqueued + account2.total_jobs_enqueued + merged_account.total_jobs_completed = account1.total_jobs_completed + account2.total_jobs_completed + merged_account.time_allocated = account1.time_allocated + account2.time_allocated + merged_account.energy_allocated = account1.energy_allocated + account2.energy_allocated + if merged_account.energy_allocated != 0: + merged_account.avg_power = merged_account.time_allocated / merged_account.energy_allocated + else: + merged_account.avg_power = 0 + merged_account.fugaku_points = None # Needs to be invalidated, as averages are not known! + + class Accounts: def update_average_user(self): @@ -165,3 +194,19 @@ class Accounts: ret_dict['all_users'] = self.all_users.to_dict() ret_dict['average_user'] = self.average_user.to_dict() return ret_dict + + +def merge_accounts(accounts1: Accounts,accounts2: Accounts) -> Accounts: + merged_accounts = Accounts() + merged_accounts._account_id = len(accounts1.account_dict) + len(accounts2.account_dict) + merged_accounts.account_dict = accounts1.account_dict + for ac2_k, ac2_v in accounts2.account_dict.items(): + if ac2_k in accounts1.account_dict: + merged_accounts.account_dict[ac2_k] = merge_account_of_same_id(accounts1.account_dict[ac2_k], accounts2.account_dict[ac2_k]) + # update all uers -> then update average user -> then fugagku points for all users (order is important!) + merged_accounts.all_users = merge_account_of_same_id(accounts1.all_users,accounts2.all_users) + merged_accounts.all_users.update_fugaku_points(merged_accounts.average_user.energy_allocated, merged_accounts.average_user.avg_power) + merged_accounts.update_average_user() + for ac_k, ac_v in merged_accounts.account_dict.items(): + merged_accounts[ac_k].update_fugaku_points(merged_accounts.average_user.energy_allocated, merged_accounts.average_user.avg_power) + return merged_accounts diff --git a/raps/engine.py b/raps/engine.py index 9524c25..ac75904 100644 --- a/raps/engine.py +++ b/raps/engine.py @@ -220,7 +220,7 @@ class Engine: #sort the queue according to the policy self.queue = self.scheduler.sort_jobs(self.queue, self.accounts) # Schedule jobs that are now in the queue. - self.scheduler.schedule(self.queue, self.running, self.current_time) + self.scheduler.schedule(self.queue, self.running, self.current_time, sorted = True) # Stop the simulation if no more jobs are running or in the queue. if autoshutdown and not self.queue and not self.running and not self.replay: diff --git a/raps/schedulers/default.py b/raps/schedulers/default.py index 903c055..9cef4c0 100644 --- a/raps/schedulers/default.py +++ b/raps/schedulers/default.py @@ -30,26 +30,14 @@ class Scheduler: elif self.policy == PolicyType.SJF: return sorted(queue, key=lambda job: job.wall_time) elif self.policy == PolicyType.PRIORITY: - return self.sort_by_job_and_account_priority(queue, accounts) + return sorted(queue, key=lambda job: job.priority, reverse=True) else: raise ValueError(f"Unknown policy type: {self.policy}") - def sort_by_job_and_account_priority(self, queue, accounts=None): - priority_tuple_list = [] - for job in queue: - # create a tuple of the job and the priority - priority = job.priority - if job.account in accounts: - priority += accounts[job.account].priority - priority = max(priority, MAX_PRIORITY) - priority_tuple_list.append((priority,job)) - priority_tuple_list = sorted(priority_tuple_list, key=lambda x:x[1], reverse=True) - _, queue = zip(*priority_tuple_list) - return queue - - def schedule(self, queue, running, current_time, debug=False): + def schedule(self, queue, running, current_time, sorted=False, debug=False): # Sort the queue in place. - queue[:] = self.sort_jobs(queue) + if not sorted: + queue[:] = self.sort_jobs(queue) # Iterate over a copy of the queue since we might remove items for job in queue[:]: -- GitLab From 728fc33cf09eb06bf15e86ed8b6e201af15caa3a Mon Sep 17 00:00:00 2001 From: Matthias Maiterth Date: Fri, 7 Feb 2025 16:35:41 -0500 Subject: [PATCH 07/12] Move node_failure() to resmgr.py --- raps/engine.py | 1 - raps/resmgr.py | 9 ++++++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/raps/engine.py b/raps/engine.py index ac75904..c223b8a 100644 --- a/raps/engine.py +++ b/raps/engine.py @@ -3,7 +3,6 @@ import dataclasses import pandas as pd from .job import Job, JobState -from .account import Accounts from .network import network_utilization from .utils import summarize_ranges, expand_ranges, get_utilization from .resmgr import ResourceManager diff --git a/raps/resmgr.py b/raps/resmgr.py index 8abce81..508cc6f 100644 --- a/raps/resmgr.py +++ b/raps/resmgr.py @@ -1,5 +1,6 @@ import numpy as np from .job import JobState +from .utils import expand_ranges from scipy.stats import weibull_min @@ -58,10 +59,15 @@ class ResourceManager: scale_parameter = mtbf * 3600 # Convert to seconds # Create a NumPy array of node indices, excluding down nodes + #print(self.down_nodes) + #down_nodes = expand_ranges(self.down_nodes) + #all_nodes = np.setdiff1d(np.arange(self.config['TOTAL_NODES']), np.array(self.down_nodes, dtype=int)) all_nodes = np.array(sorted(set(range(self.total_nodes)) - set(self.down_nodes))) # Sample the Weibull distribution for all nodes at once random_values = weibull_min.rvs(shape_parameter, scale=scale_parameter, size=all_nodes.size) + failure_threshold = 0.1 + failed_nodes = [node for node, r in zip(all_nodes, random_values) if r < failure_threshold] # Identify nodes that have failed failure_threshold = 0.1 @@ -72,6 +78,7 @@ class ResourceManager: for node_index in newly_downed_nodes: if node_index in self.available_nodes: self.available_nodes.remove(node_index) - self.down_nodes.add(str(node_index)) + self.down_nodes.append(str(node_index)) + self.power_manager.set_idle(node_index) return newly_downed_nodes.tolist() -- GitLab From 69fee0f395add0c900ec9ab9dfcf3c91588a41b4 Mon Sep 17 00:00:00 2001 From: Wes Brewer Date: Thu, 6 Feb 2025 15:22:10 -0500 Subject: [PATCH 08/12] Fix bug in UI reporting of down_nodes --- raps/resmgr.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/raps/resmgr.py b/raps/resmgr.py index 508cc6f..196e636 100644 --- a/raps/resmgr.py +++ b/raps/resmgr.py @@ -78,7 +78,7 @@ class ResourceManager: for node_index in newly_downed_nodes: if node_index in self.available_nodes: self.available_nodes.remove(node_index) - self.down_nodes.append(str(node_index)) + self.down_nodes.add(str(node_index)) self.power_manager.set_idle(node_index) return newly_downed_nodes.tolist() -- GitLab From 531ec14972a6dc8cbfa97e331fc790041f748a26 Mon Sep 17 00:00:00 2001 From: Wes Brewer Date: Thu, 6 Feb 2025 15:46:59 -0500 Subject: [PATCH 09/12] Move get_utilization() from engine.py to utils.py and cleanup code. --- raps/resmgr.py | 7 ------- 1 file changed, 7 deletions(-) diff --git a/raps/resmgr.py b/raps/resmgr.py index 196e636..8abce81 100644 --- a/raps/resmgr.py +++ b/raps/resmgr.py @@ -1,6 +1,5 @@ import numpy as np from .job import JobState -from .utils import expand_ranges from scipy.stats import weibull_min @@ -59,15 +58,10 @@ class ResourceManager: scale_parameter = mtbf * 3600 # Convert to seconds # Create a NumPy array of node indices, excluding down nodes - #print(self.down_nodes) - #down_nodes = expand_ranges(self.down_nodes) - #all_nodes = np.setdiff1d(np.arange(self.config['TOTAL_NODES']), np.array(self.down_nodes, dtype=int)) all_nodes = np.array(sorted(set(range(self.total_nodes)) - set(self.down_nodes))) # Sample the Weibull distribution for all nodes at once random_values = weibull_min.rvs(shape_parameter, scale=scale_parameter, size=all_nodes.size) - failure_threshold = 0.1 - failed_nodes = [node for node, r in zip(all_nodes, random_values) if r < failure_threshold] # Identify nodes that have failed failure_threshold = 0.1 @@ -79,6 +73,5 @@ class ResourceManager: if node_index in self.available_nodes: self.available_nodes.remove(node_index) self.down_nodes.add(str(node_index)) - self.power_manager.set_idle(node_index) return newly_downed_nodes.tolist() -- GitLab From a28d22da9a8589dca7f989341229496bec9ca525 Mon Sep 17 00:00:00 2001 From: Matthias Maiterth Date: Fri, 7 Feb 2025 22:18:40 -0500 Subject: [PATCH 10/12] Updated Accounts and Account class - In Accounts, removed ID 'id' as the account name is used as an ID - removed verbose wording 'total' from total_jobs_completed etc. - Added merge classes to Account and Accounts and refactored previous version as classmethods. - Added add_account Method to Accounts - Refactored from_dict and from_json_filename Engine: - Re-added job history to work with the current version of job_dicts for -o Therefore -o works again. Tested with -o --accounts-json of previous output smoketest and normal runs without options. --- main.py | 9 +- raps/account.py | 217 ++++++++++++++++++++++++++++++------------------ raps/engine.py | 5 ++ raps/job.py | 3 + 4 files changed, 149 insertions(+), 85 deletions(-) diff --git a/main.py b/main.py index 5c244a3..7e8919d 100644 --- a/main.py +++ b/main.py @@ -127,8 +127,8 @@ else: # Synthetic jobs jobs = getattr(wl, args.workload)(num_jobs=args.numjobs) job_accounts = Accounts(jobs) if args.accounts_json: - loaded_accounts = Accounts.initialize_accounts_from_json(args.accounts_json) - accounts = loaded_accounts.merge(loaded_accounts,job_accounts) + loaded_accounts = Accounts.from_json_filename(args.accounts_json) + accounts = Accounts.merge(loaded_accounts,job_accounts) else: accounts = job_accounts @@ -240,8 +240,8 @@ if args.output: df.to_parquet(OPATH / 'util.parquet', engine='pyarrow') # Schedule history - schedule_history = pd.DataFrame(sc.get_history()) - schedule_history.to_csv(OPATH / "schedule_history.csv", index=False) + job_history = pd.DataFrame(sc.get_job_history_dict()) + job_history.to_csv(OPATH / "job_history.csv", index=False) try: with open(OPATH / 'stats.out', 'w') as f: @@ -255,3 +255,4 @@ if args.output: f.write(json_string) except TypeError: raise TypeError(f"{sc.accounts} could not be parsed by json.dump") + print("Output directory is: ", OPATH) # If output is enabled, the user wants this information as last output diff --git a/raps/account.py b/raps/account.py index 4955aea..2ded688 100644 --- a/raps/account.py +++ b/raps/account.py @@ -18,20 +18,19 @@ class Account: RUNNING, COMPLETED, CANCELLED, FAILED, or TIMEOUT. """ - def __init__(self, id, name, + def __init__(self, name, priority=0, - total_jobs_enqueued=0, - total_jobs_completed=0, + jobs_enqueued=0, + jobs_completed=0, time_allocated=0, energy_allocated=0, avg_power=0, fugaku_points=0 ): - self.id = id self.name = name self.priority = priority - self.total_jobs_enqueued = total_jobs_enqueued - self.total_jobs_completed = total_jobs_completed + self.jobs_enqueued = jobs_enqueued + self.jobs_completed = jobs_completed self.time_allocated = time_allocated self.energy_allocated = energy_allocated self.avg_power = avg_power @@ -45,7 +44,7 @@ class Account: self.fugaku_points = (average_energy - self.energy_allocated) / average_power def update_statistics(self, jobstats, average_user): - self.total_jobs_completed += 1 + self.jobs_completed += 1 self.time_allocated += jobstats.run_time self.energy_allocated += jobstats.energy if self.time_allocated == 0: @@ -58,10 +57,10 @@ class Account: self.update_fugaku_points(average_user.energy_allocated, average_user.avg_power) def __repr__(self): - return (f"Account(id={self.id}, name={self.name}), " + return (f"Account(name={self.name}), " f"priority: {self.priority}, " - f"total_jobs_enqueued: {self.total_jobs_enqueued}, " - f"total_jobs_completed: {self.total_jobs_completed}, " + f"jobs_enqueued: {self.jobs_enqueued}, " + f"jobs_completed: {self.jobs_completed}, " f"time_allocated: {self.time_allocated}, " f"energy_allocated: {self.energy_allocated}, " f"avg_power: {self.avg_power}, " @@ -70,11 +69,10 @@ class Account: def to_dict(self): return { - "id": self.id, "name": self.name, "priority": self.priority, - "total_jobs_enqueued": self.total_jobs_enqueued, - "total_jobs_completed": self.total_jobs_completed, + "jobs_enqueued": self.jobs_enqueued, + "jobs_completed": self.jobs_completed, "time_allocated": self.time_allocated, "energy_allocated": self.energy_allocated, "avg_power": self.avg_power, @@ -82,57 +80,69 @@ class Account: } @classmethod - def init_from_dict(acct, account_dict): # id ,name, priority, total_jobs, time_allocated, energy_allocated, avg_power, fugaku_points): - acct = Account(account_dict["id"], account_dict["name"], priority=account_dict["priority"]) - acct.id = account_dict["id"] + def from_dict(acct, account_dict): # name, priority, jobs_enqueue, jobs_completed, time_allocated, energy_allocated, avg_power, fugaku_points): + acct = Account(account_dict["name"], priority=account_dict["priority"]) acct.name = account_dict["name"] acct.priority = account_dict["priority"] - acct.total_jobs = account_dict["total_jobs"] + acct.jobs_enqueued = account_dict["jobs_enqueued"] + acct.jobs_completed = account_dict["jobs_completed"] acct.time_allocated = account_dict["time_allocated"] acct.energy_allocated = account_dict["energy_allocated"] acct.avg_power = account_dict["avg_power"] acct.fugaku_points = account_dict["fugaku_points"] return acct + @classmethod + def merge(cls,account1:'Account', account2:'Account') -> 'Account': + """ + Destructive merge + + Priorities are only set if one is zero or both are equal. + """ + if account1.name != account2.name: + raise KeyError(f"{account1.name} != {account2.name}. Input arguments missmatch.") + + merged_account = cls(account1.name) + + if account1.priority == account2.priority: + merged_account.priority = account1.priority + elif account1.priority == 0: + merged_account.priority = account2.priority + elif account2.priority == 0: + merged_account.priority = account1.priority + else: + raise ValueError("Priority Cannot be derived!") + + merged_account.jobs_enqueued = account1.jobs_enqueued + account2.jobs_enqueued + merged_account.jobs_completed = account1.jobs_completed + account2.jobs_completed + merged_account.time_allocated = account1.time_allocated + account2.time_allocated + merged_account.energy_allocated = account1.energy_allocated + account2.energy_allocated + if merged_account.energy_allocated != 0: + merged_account.avg_power = merged_account.time_allocated / merged_account.energy_allocated + else: + merged_account.avg_power = 0 + merged_account.fugaku_points = None # Needs to be invalidated, as averages are not known! -def merge_account_of_same_id(account1:Account, account2:Account, new_id) -> Account: - merged_account = Account() - if account1.name != account2.name: - raise KeyError(f"{account1.name} != {account2.name}. Input arguments missmatch.") - merged_account.name = account1.name - merged_account.id = new_id # This has to be relative to the Accounts Object and cannot be derived from the individual Account objects - if account1.priority is 0: - merged_account.priority = account2.priority - elif account2.priority is 0: - merged_account.priority = account1.priority - else: - raise ValueError("Priority Cannot be derived!") - - merged_account.total_jobs_enqueued = account1.total_jobs_enqueued + account2.total_jobs_enqueued - merged_account.total_jobs_completed = account1.total_jobs_completed + account2.total_jobs_completed - merged_account.time_allocated = account1.time_allocated + account2.time_allocated - merged_account.energy_allocated = account1.energy_allocated + account2.energy_allocated - if merged_account.energy_allocated != 0: - merged_account.avg_power = merged_account.time_allocated / merged_account.energy_allocated - else: - merged_account.avg_power = 0 - merged_account.fugaku_points = None # Needs to be invalidated, as averages are not known! + account1 = None + account2 = None + + return merged_account class Accounts: def update_average_user(self): total_accounts = len(self.account_dict) - self.average_user.total_jobs_enqueued = self.all_users.total_jobs_enqueued / total_accounts - self.average_user.total_jobs_completed = self.all_users.total_jobs_completed / total_accounts + self.average_user.jobs_enqueued = self.all_users.jobs_enqueued / total_accounts + self.average_user.jobs_completed = self.all_users.jobs_completed / total_accounts self.average_user.time_allocated = self.all_users.time_allocated / total_accounts self.average_user.energy_allocated = self.all_users.energy_allocated / total_accounts self.average_user.avg_power = self.all_users.avg_power / total_accounts - self.fugaku_points = self.all_users.fugaku_points / total_accounts # this should be 0 + if self.average_user.jobs_completed != 0.0: + self.average_user.update_fugaku_points(self.average_user.energy_allocated,self.average_user.avg_power) return self def __init__(self, jobs=None): - self._account_id = 0 self.account_dict = dict() self.all_users = Account(-2, "All_Users") self.average_user = Account(-1, "Avg_User") @@ -143,30 +153,58 @@ class Accounts: if not isinstance(job_dict,dict): raise TypeError if job_dict["account"] not in self.account_dict: - self.account_dict[job_dict["account"]] = Account(self._account_id, job_dict["account"], total_jobs_enqueued=1) - self._account_id += 1 - else: - self.account_dict[job_dict["account"]].total_jobs_enqueued += 1 - - def initialize_accounts_from_dict(self, dictionary): - if '_account_id' in dictionary: - self._account_id = dictionary['_account_id'] - if 'account_dict' in dictionary: - dics_from_dictionary = dictionary['account_dict'] - self.account_dict = {} - for account_name, account_dict in dics_from_dictionary.items(): - self.account_dict[account_name] = Account.init_from_dict(account_dict) - - if 'all_users' in dictionary: - self.all_users = Account.init_from_dict(dictionary['all_users']) - if 'average_user' in dictionary: - self.average_user = Account.init_from_dict(dictionary['average_user']) - - def initialize_accounts_from_json(self, filename): + self.account_dict[job_dict["account"]] = Account(job_dict["account"], jobs_enqueued=0) + self.account_dict[job_dict["account"]].jobs_enqueued += 1 + self.all_users.jobs_enqueued += 1 + self.update_average_user() + pass + + def updates_all_users_by_account(self,account:Account): + self.all_users.jobs_enqueued += account.jobs_enqueued + self.all_users.jobs_completed += account.jobs_completed + self.all_users.time_allocated += account.time_allocated + self.all_users.energy_allocated += account.energy_allocated + self.all_users.avg_power = self.energy_allocated / self.time_allocated + self.update_average_user() # Only necessary if averag_user was not updated before calling update all users. + # Therefore As this is needed for fugaku points this should always be called. + self.all_users.update_fugaku_points(self.average_user.energy_allocated,self.average_user.avg_power) + + + + def add_account(self, account:Account): + self.account_dict[account.name] = account + self.add_user_stats_to_all_users(account) + # update_average_user() is already called + + + @classmethod + def from_dict(cls, dictionary): + accounts = cls() + + if 'account_dict' not in dictionary: + raise KeyError("'account_dict' not in dictionary. Failed to restore.") + dicts_from_dictionary = dictionary['account_dict'] + accounts.account_dict = {} + if not isinstance(dicts_from_dictionary, dict): + raise KeyError("'account_dict' is not a dictionary. Failed to restore.") + for account_name, account_dict in dicts_from_dictionary.items(): + accounts.account_dict[account_name] = Account.from_dict(account_dict) + + if 'all_users' not in dictionary: + raise KeyError("'all_users' not in dictionary. Failed to restore.") + accounts.all_users = Account.from_dict(dictionary['all_users']) + + if 'average_user' not in dictionary: + raise KeyError("'average_user' not in dictionary. Failed to restore.") + accounts.average_user = Account.from_dict(dictionary['average_user']) + return accounts + + @classmethod + def from_json_filename(cls, filename): try: with open(filename, 'r', encoding='utf-8') as file: json_object = json.load(file) - self.initialize_accounts_from_dict(json_object) + return cls.from_dict(json_object) except ValueError: raise ValueError(f"{file} could not be read using json.load()") @@ -175,8 +213,7 @@ class Accounts: if isinstance(jobstats, JobStatistics): if jobstats.account not in self.account_dict: raise ValueError(f"Account {jobstats.account} not registered in Accounts object {self}") - # self.account_dict[jobstats.account] = Account(self._account_id, jobstats.account) - # self._account_id += 1 + # self.account_dict[jobstats.account] = Account(jobstats.account) account = self.account_dict[jobstats.account] account.update_statistics(jobstats, self.average_user) self.account_dict[jobstats.account] = account @@ -189,24 +226,42 @@ class Accounts: for account_name,account in self.account_dict.items(): acct_dict[account_name] = account.to_dict() ret_dict = {} - ret_dict['_account_id'] = self._account_id ret_dict['account_dict'] = acct_dict ret_dict['all_users'] = self.all_users.to_dict() ret_dict['average_user'] = self.average_user.to_dict() return ret_dict + @classmethod + def merge(cls, accounts1:'Accounts', accounts2:'Accounts') -> 'Accounts': + """ + Destructive merge of accounts + """ + merged_accounts = Accounts() + merged_accounts.account_dict = accounts1.account_dict + + for ac2_k, ac2_v in accounts2.account_dict.items(): + if ac2_k in accounts1.account_dict: + merged_accounts.account_dict[ac2_k] = Account.merge(accounts1.account_dict[ac2_k], accounts2.account_dict[ac2_k]) + else: + merged_accounts.account_dict[ac2_k] = ac2_v + for ac1_k, ac1_v in accounts1.account_dict.items(): + if ac1_k not in accounts2.account_dict: + merged_accounts.account_dict[ac1_k] = ac1_v + else: + # Already added above + pass + + # update all uers -> then update average user -> then fugagku points for all users (order is important!) + merged_accounts.all_users = Account.merge(accounts1.all_users,accounts2.all_users) + merged_accounts.update_average_user() + # update to average user is needed before fugaku points can be caluculated. + if merged_accounts.all_users.jobs_completed != 0: + merged_accounts.all_users.update_fugaku_points(merged_accounts.average_user.energy_allocated, merged_accounts.average_user.avg_power) + + for ac_k, ac_v in merged_accounts.account_dict.items(): + if merged_accounts.account_dict[ac_k].jobs_completed != 0: + merged_accounts.account_dict[ac_k].update_fugaku_points(merged_accounts.average_user.energy_allocated, merged_accounts.average_user.avg_power) -def merge_accounts(accounts1: Accounts,accounts2: Accounts) -> Accounts: - merged_accounts = Accounts() - merged_accounts._account_id = len(accounts1.account_dict) + len(accounts2.account_dict) - merged_accounts.account_dict = accounts1.account_dict - for ac2_k, ac2_v in accounts2.account_dict.items(): - if ac2_k in accounts1.account_dict: - merged_accounts.account_dict[ac2_k] = merge_account_of_same_id(accounts1.account_dict[ac2_k], accounts2.account_dict[ac2_k]) - # update all uers -> then update average user -> then fugagku points for all users (order is important!) - merged_accounts.all_users = merge_account_of_same_id(accounts1.all_users,accounts2.all_users) - merged_accounts.all_users.update_fugaku_points(merged_accounts.average_user.energy_allocated, merged_accounts.average_user.avg_power) - merged_accounts.update_average_user() - for ac_k, ac_v in merged_accounts.account_dict.items(): - merged_accounts[ac_k].update_fugaku_points(merged_accounts.average_user.energy_allocated, merged_accounts.average_user.avg_power) - return merged_accounts + accounts1 = None + accounts2 = None + return merged_accounts diff --git a/raps/engine.py b/raps/engine.py index c223b8a..057b2d3 100644 --- a/raps/engine.py +++ b/raps/engine.py @@ -41,6 +41,7 @@ class Engine: self.running = [] self.queue = [] self.accounts = None + self.job_history_dict = [] self.jobs_completed = 0 self.current_time = 0 self.cooling_model = cooling_model @@ -135,6 +136,7 @@ class Engine: self.jobs_completed += 1 job_stats = job.statistics() self.accounts.update_account_statistics(job_stats) + self.job_history_dict.append(job_stats.__dict__) # Free the nodes via the resource manager. self.resource_manager.free_nodes_from_job(job) @@ -268,3 +270,6 @@ class Engine: } return stats + + def get_job_history_dict(self): + return self.job_history_dict diff --git a/raps/job.py b/raps/job.py index a06eb84..1add710 100644 --- a/raps/job.py +++ b/raps/job.py @@ -115,6 +115,9 @@ class JobStatistics: self.account = job.account self.num_nodes = len(job.scheduled_nodes) self.run_time = job.running_time + self.start_time = job.start_time + self.end_time = job.end_time + self.state = job._state if len(job.power_history) == 0: self.avg_node_power = 0 self.max_node_power = 0 -- GitLab From 012571f39f5df5f55645417dc3edb48d2f5ff514 Mon Sep 17 00:00:00 2001 From: Matthias Maiterth Date: Mon, 10 Feb 2025 17:14:40 -0500 Subject: [PATCH 11/12] Added sorting function for the fugaku points. --- raps/engine.py | 18 ++++++----------- raps/schedulers/default.py | 40 ++++++++++++++++++++++++++++++++++++-- 2 files changed, 44 insertions(+), 14 deletions(-) diff --git a/raps/engine.py b/raps/engine.py index 057b2d3..3034d27 100644 --- a/raps/engine.py +++ b/raps/engine.py @@ -29,6 +29,7 @@ class TickData: class Engine: """Job scheduling simulation engine.""" + def __init__(self, *, power_manager, flops_manager, cooling_model=None, config, **kwargs): self.config = config self.down_nodes = summarize_ranges(self.config['DOWN_NODES']) @@ -62,11 +63,6 @@ class Engine: ) print(f"Using scheduler: {scheduler_type}") - # Unused! - def add_job(self, job): - self.queue.append(job) - self.queue = self.scheduler.sort_jobs(self.queue) # No need to sort here! - def eligible_jobs(self,jobs_to_submit): eligible_jobs_list = [] while jobs_to_submit and jobs_to_submit[0]['submit_time'] <= self.current_time: @@ -75,7 +71,6 @@ class Engine: eligible_jobs_list.append(job) return eligible_jobs_list - def tick(self): """Simulate a timestep.""" completed_jobs = [job for job in self.running if job.end_time is not None and job.end_time <= self.current_time] @@ -141,7 +136,7 @@ class Engine: self.resource_manager.free_nodes_from_job(job) # Ask scheduler to schedule any jobs waiting in queue - self.scheduler.schedule(self.queue, self.running, self.current_time) + self.scheduler.schedule(self.queue, self.running, self.current_time, self.accounts) # Update the power array UI component rack_power, rect_losses = self.power_manager.compute_rack_power() @@ -176,7 +171,7 @@ class Engine: # FMU inputs are N powers and the wetbulb temp fmu_inputs = self.cooling_model.generate_fmu_inputs(runtime_values, - uncertainties=self.power_manager.uncertainties) + uncertainties=self.power_manager.uncertainties) cooling_inputs, cooling_outputs = ( self.cooling_model.step(self.current_time, fmu_inputs, self.config['POWER_UPDATE_FREQ']) ) @@ -206,7 +201,6 @@ class Engine: self.current_time += 1 return tick_data - def run_simulation(self, jobs, timesteps, autoshutdown=False): """Generator that yields after each simulation tick.""" self.timesteps = timesteps @@ -216,12 +210,12 @@ class Engine: for timestep in range(timesteps): - # identify eligible jobs and add them to the queue. + # Identify eligible jobs and add them to the queue. self.queue += self.eligible_jobs(jobs_to_submit) - #sort the queue according to the policy + # Sort the queue according to the policy self.queue = self.scheduler.sort_jobs(self.queue, self.accounts) # Schedule jobs that are now in the queue. - self.scheduler.schedule(self.queue, self.running, self.current_time, sorted = True) + self.scheduler.schedule(self.queue, self.running, self.current_time, sorted=True) # Stop the simulation if no more jobs are running or in the queue. if autoshutdown and not self.queue and not self.running and not self.replay: diff --git a/raps/schedulers/default.py b/raps/schedulers/default.py index 9cef4c0..3600611 100644 --- a/raps/schedulers/default.py +++ b/raps/schedulers/default.py @@ -9,6 +9,7 @@ class PolicyType(Enum): FCFS = 'fcfs' BACKFILL = 'backfill' PRIORITY = 'priority' + FUGAKU_PTS = 'fugaku_pts' SJF = 'sjf' @@ -31,13 +32,15 @@ class Scheduler: return sorted(queue, key=lambda job: job.wall_time) elif self.policy == PolicyType.PRIORITY: return sorted(queue, key=lambda job: job.priority, reverse=True) + elif self.policy == PolicyType.FUGAKU_PTS: + return self.sort_fugaku_redeeming(queue, accounts) else: raise ValueError(f"Unknown policy type: {self.policy}") - def schedule(self, queue, running, current_time, sorted=False, debug=False): + def schedule(self, queue, running, current_time, accounts=None, sorted=False, debug=False): # Sort the queue in place. if not sorted: - queue[:] = self.sort_jobs(queue) + queue[:] = self.sort_jobs(queue, accounts) # Iterate over a copy of the queue since we might remove items for job in queue[:]: @@ -71,6 +74,7 @@ class Scheduler: scheduled_nodes = summarize_ranges(backfill_job.scheduled_nodes) print(f"t={current_time}: Backfilling job {backfill_job.id} with wall time {backfill_job.wall_time} on nodes {scheduled_nodes}") + def find_backfill_job(self, queue, num_free_nodes, current_time): """Finds a backfill job based on available nodes and estimated completion times. @@ -110,3 +114,35 @@ class Scheduler: return job return None + + def sort_fugaku_redeeming(self, queue, accounts=None): + if queue == []: + return queue + # Priority queues not yet implemented: + # Strategy: Sort by Fugaku Points Representing the Priority Queue + # Everything with negative Fugaku Points get sorted according to normal priority + priority_triple_list = [] + for job in queue: + fugaku_priority = accounts.account_dict[job.account].fugaku_points + # create a tuple of the job and the priority + priority = job.priority + priority_triple_list.append((fugaku_priority,priority,job)) + # Sort everythin according to fugaku_points + priority_triple_list = sorted(priority_triple_list, key=lambda x:x[0], reverse=True) + # Find the first element with negative fugaku_points + for cutoff, triple in enumerate(priority_triple_list): + fugaku_priority, _, _ = triple + if fugaku_priority < 0: + break + first_part = priority_triple_list[:cutoff] + # Sort everything afterwards according to job priority + second_part = sorted(priority_triple_list[cutoff:], key=lambda x:x[1], reverse=True) + queue_a = [] + queue_b = [] + if first_part != []: + _, _, queue_a = zip(*first_part) + queue_a = list(queue_a) + if second_part != []: + _, _, queue_b = zip(*second_part) + queue_b = list(queue_b) + return queue_a + queue_b -- GitLab From 1dfed1d0035d898e748fb5bdc5aedd480ed7d4ba Mon Sep 17 00:00:00 2001 From: Matthias Maiterth Date: Mon, 10 Feb 2025 17:45:11 -0500 Subject: [PATCH 12/12] Formatting to follow Wes's See PEP-8 comments https://peps.python.org/pep-0008/#comments ;) --- raps/account.py | 6 +++--- raps/job.py | 6 +++--- raps/schedulers/default.py | 4 ++-- raps/telemetry.py | 19 ++++++------------- raps/ui.py | 2 +- 5 files changed, 15 insertions(+), 22 deletions(-) diff --git a/raps/account.py b/raps/account.py index 2ded688..5ff80b9 100644 --- a/raps/account.py +++ b/raps/account.py @@ -80,7 +80,7 @@ class Account: } @classmethod - def from_dict(acct, account_dict): # name, priority, jobs_enqueue, jobs_completed, time_allocated, energy_allocated, avg_power, fugaku_points): + def from_dict(acct, account_dict): acct = Account(account_dict["name"], priority=account_dict["priority"]) acct.name = account_dict["name"] acct.priority = account_dict["priority"] @@ -251,10 +251,10 @@ class Accounts: # Already added above pass - # update all uers -> then update average user -> then fugagku points for all users (order is important!) + # Update all users -> then update average user -> then fugagku points for all users (order is important!) merged_accounts.all_users = Account.merge(accounts1.all_users,accounts2.all_users) merged_accounts.update_average_user() - # update to average user is needed before fugaku points can be caluculated. + # Update to average user is needed before fugaku points can be caluculated. if merged_accounts.all_users.jobs_completed != 0: merged_accounts.all_users.update_fugaku_points(merged_accounts.average_user.energy_allocated, merged_accounts.average_user.avg_power) diff --git a/raps/job.py b/raps/job.py index 1add710..c0b0e9b 100644 --- a/raps/job.py +++ b/raps/job.py @@ -42,7 +42,7 @@ class Job: _id_counter = 0 def __init__(self, job_dict, current_time, state=JobState.PENDING, account=None): - # initializations + # Initializations: self.start_time = None self.end_time = None self.running_time = 0 @@ -51,10 +51,10 @@ class Job: self.power_history = [] self._state = state self.account = account - # if a job dict was given, override the values from the job_dict: + # If a job dict was given, override the values from the job_dict: for key, value in job_dict.items(): setattr(self, key, value) - # in any case: provide a job_id! + # In any case: provide a job_id! if not self.id: self.id = Job._get_next_id() diff --git a/raps/schedulers/default.py b/raps/schedulers/default.py index 3600611..c3291ac 100644 --- a/raps/schedulers/default.py +++ b/raps/schedulers/default.py @@ -1,9 +1,9 @@ from enum import Enum -#from ..job import Job, JobState # Unused from ..utils import summarize_ranges from ..workload import MAX_PRIORITY + class PolicyType(Enum): """Supported scheduling policies.""" FCFS = 'fcfs' @@ -124,7 +124,7 @@ class Scheduler: priority_triple_list = [] for job in queue: fugaku_priority = accounts.account_dict[job.account].fugaku_points - # create a tuple of the job and the priority + # Create a tuple of the job and the priority priority = job.priority priority_triple_list.append((fugaku_priority,priority,job)) # Sort everythin according to fugaku_points diff --git a/raps/telemetry.py b/raps/telemetry.py index b46d2c1..2616bc7 100644 --- a/raps/telemetry.py +++ b/raps/telemetry.py @@ -40,45 +40,38 @@ class Telemetry: self.system = kwargs.get('system') self.config = kwargs.get('config') try: - self.dataloader = importlib.import_module(f".dataloaders.{self.system}", package = __package__) + self.dataloader = importlib.import_module(f".dataloaders.{self.system}", package=__package__) except: print("WARNING: Failed to load dataloader") - def save_snapshot(self, jobs: list, accounts: dict, filename: str): """Saves a snapshot of the jobs to a compressed file. """ np.savez_compressed(filename, jobs=jobs, accounts=accounts) - def load_snapshot(self, snapshot: str) -> (list, dict): """Reads a snapshot from a compressed file and returns the jobs.""" jobs, accounts_dict = np.load(snapshot, allow_pickle=True, mmap_mode='r') return jobs['jobs'].tolist(), Accounts.initialize_accounts_from_dict(accounts_dict) - def load_data(self, files): """Load telemetry data using custom data loaders.""" return self.dataloader.load_data(files, **self.kwargs) - def load_data_from_df(self, *args, **kwargs): """Load telemetry data using custom data loaders.""" return self.dataloader.load_data_from_df(*args, **kwargs) - def node_index_to_name(self, index: int): """ Convert node index into a name""" - return self.dataloader.node_index_to_name(index, config = self.config) - + return self.dataloader.node_index_to_name(index, config=self.config) def cdu_index_to_name(self, index: int): """ Convert cdu index into a name""" - return self.dataloader.cdu_index_to_name(index, config = self.config) - + return self.dataloader.cdu_index_to_name(index, config=self.config) def cdu_pos(self, index: int) -> tuple[int, int]: """ Return (row, col) tuple for a cdu index """ - return self.dataloader.cdu_pos(index, config = self.config) + return self.dataloader.cdu_pos(index, config=self.config) if __name__ == "__main__": @@ -88,7 +81,6 @@ if __name__ == "__main__": args_dict['config'] = config td = Telemetry(**args_dict) - if args.replay[0].endswith(".npz"): print(f"Loading {args.replay[0]}...") jobs = td.load_snapshot(args.replay[0]) @@ -115,7 +107,8 @@ if __name__ == "__main__": dt = job.submit_time - last dt_list.append(dt) last = job.submit_time - if args.verbose: print(job) + if args.verbose: + print(job) print(f'Simulation will run for {timesteps} seconds') print(f'Average job arrival time is: {np.mean(dt_list):.2f}s') diff --git a/raps/ui.py b/raps/ui.py index e269663..d71caf5 100644 --- a/raps/ui.py +++ b/raps/ui.py @@ -66,7 +66,7 @@ class LayoutManager: formatted_row = [func(cell) for func, cell in zip(format_funcs, row)] table.add_row(*formatted_row) - def calculate_totals(self, df): # 'Sum' and 'Loss' columns + def calculate_totals(self, df): # 'Sum' and 'Loss' columns total_power_kw = df[self.power_column].sum() + (self.config['NUM_CDUS'] * self.config['POWER_CDU'] / 1000.0) total_power_mw = total_power_kw / 1000.0 total_loss_kw = df[self.loss_column].sum() -- GitLab