Loading main.py +8 −9 Original line number Diff line number Diff line Loading @@ -19,6 +19,8 @@ if sys.version_info < (required_major, required_minor): parser = argparse.ArgumentParser(description='Resource Allocator & Power Simulator (RAPS)') parser.add_argument('-c', '--cooling', action='store_true', help='Include FMU cooling model') parser.add_argument('--start', type=str, help='ISO8061 string for start of simulation') parser.add_argument('--end', type=str, help='ISO8061 string for end of simulation') parser.add_argument('-d', '--debug', action='store_true', help='Enable debug mode and disable rich layout') parser.add_argument('-e', '--encrypt', action='store_true', help='Encrypt any sensitive data in telemetry') parser.add_argument('-n', '--numjobs', type=int, default=1000, help='Number of jobs to schedule') Loading Loading @@ -82,6 +84,9 @@ if args.cooling: cooling_model = ThermoFluidsModel(FMU_PATH) cooling_model.initialize() args.layout = "layout2" if args_dict['start']: cooling_model.weather = Weather(args_dict['start']) else: cooling_model = None Loading Loading @@ -115,18 +120,12 @@ if args.replay: # Read either npz file or telemetry parquet files if args.replay[0].endswith(".npz"): jobs, start = td.load_snapshot(args.replay[0]) sc.start = start jobs = td.load_snapshot(args.replay[0]) else: print(*args.replay) jobs, start = td.load_data(args.replay) if start is not None: sc.start = start['start_date'] jobs = td.load_data(args.replay) td.save_snapshot(jobs, filename=DIR_NAME) if args.cooling: cooling_model.weather = Weather(sc) # Set number of timesteps based on the last job running which we assume # is the maximum value of submit_time + wall_time of all the jobs if args.time: Loading raps/cooling.py +29 −30 Original line number Diff line number Diff line Loading @@ -33,6 +33,12 @@ from datetime import datetime, timedelta load_config_variables(['FMU_OUTPUT_KEYS','NUM_CDUS', 'COOLING_EFFICIENCY','WET_BULB_TEMP', 'RACKS_PER_CDU', 'ZIP_CODE', 'COUNTRY_CODE'], globals()) temperature_key = "simulator_1_centralEnergyPlant_1_coolingTowerLoop_1_sources_Towb" base_path = 'simulator[1].centralEnergyPlant[1]' hot_water_loop_htwp = f'{base_path}.hotWaterLoop[1].summary.W_flow_HTWP_kW' cooling_tower_loop_ctwp = f'{base_path}.coolingTowerLoop[1].summary.W_flow_CTWP_kW' cooling_tower_loop_ct = f'{base_path}.coolingTowerLoop[1].summary.W_flow_CT_kW' # Define the Merge function outside of the class def merge_dicts(dict1, dict2): """ Loading Loading @@ -172,27 +178,20 @@ class ThermoFluidsModel: key = f"simulator_1_datacenter_1_computeBlock_{i+1}_cabinet_1_sources_Q_flow_total" runtime_values[key] = cdu_power[i] * COOLING_EFFICIENCY / RACKS_PER_CDU # Use temp from config by default if not sc.replay: runtime_values["simulator_1_centralEnergyPlant_1_coolingTowerLoop_1_sources_Towb"] = WET_BULB_TEMP # Default temperature is from the config temperature = WET_BULB_TEMP # Otherwise get temperature based on the day we're replaying else: if sc.start is not None: if self.weather.has_coords: # If replay mode is on and weather data is available if sc.replay and self.weather and self.weather.start is not None and self.weather.has_coords: # Convert total seconds to timedelta object delta = timedelta(seconds=sc.current_time) target_datetime = sc.start + delta # YYYY-MM-DD HH:MM:SS format target_datetime = self.weather.start + delta # Get temperature temperature = self.weather.get_temperature(target_datetime) # Get temperature from weather data temperature = self.weather.get_temperature(target_datetime) or WET_BULB_TEMP if temperature is not None: runtime_values["simulator_1_centralEnergyPlant_1_coolingTowerLoop_1_sources_Towb"] = temperature else: runtime_values["simulator_1_centralEnergyPlant_1_coolingTowerLoop_1_sources_Towb"] = WET_BULB_TEMP else: runtime_values["simulator_1_centralEnergyPlant_1_coolingTowerLoop_1_sources_Towb"] = WET_BULB_TEMP # Set the temperature value runtime_values[temperature_key] = temperature return runtime_values Loading Loading @@ -231,9 +230,9 @@ class ThermoFluidsModel: def calculate_pue(self, cooling_input, datacenter_output, cep_output): # Convert values from kW to Watts W_HTWPs = np.array(cep_output['simulator[1].centralEnergyPlant[1].hotWaterLoop[1].summary.W_flow_HTWP_kW']) * 1e3 W_CTWPs = np.array(cep_output['simulator[1].centralEnergyPlant[1].coolingTowerLoop[1].summary.W_flow_CTWP_kW']) * 1e3 W_CTs = np.array(cep_output['simulator[1].centralEnergyPlant[1].coolingTowerLoop[1].summary.W_flow_CT_kW']) * 1e3 W_HTWPs = np.array(cep_output[hot_water_loop_htwp]) * 1e3 W_CTWPs = np.array(cep_output[cooling_tower_loop_ctwp]) * 1e3 W_CTs = np.array(cep_output[cooling_tower_loop_ct]) * 1e3 # Initialize W_CDUPs as zero array of the same shape as datacenter output W_CDUPs = np.zeros_like(W_HTWPs) Loading raps/dataloaders/frontier.py +1 −11 Original line number Diff line number Diff line Loading @@ -37,15 +37,6 @@ def load_data(files, **kwargs): jobprofile_path = files[1] jobprofile_df = pd.read_parquet(jobprofile_path, engine='pyarrow') # Add meta data for start date match = re.search(r'\d{4}-\d{2}-\d{2}', files[0]) if match: date_str = match.group() # Extract the date string # Convert to datetime object start_date = datetime.strptime(date_str, "%Y-%m-%d") jobs_df.attrs['metadata'] = {'start_date': start_date} return load_data_from_df(jobs_df, jobprofile_df, **kwargs) Loading @@ -70,7 +61,6 @@ def load_data_from_df(jobs_df: pd.DataFrame, jobprofile_df: pd.DataFrame, **kwar jobs_df = jobs_df.drop_duplicates(subset='job_id', keep='last').reset_index() jobs_df = jobs_df.sort_values(by='time_start') jobs_df = jobs_df.reset_index(drop=True) start = jobs_df.attrs['metadata'] # Convert timestamp column to datetime format jobprofile_df['timestamp'] = pd.to_datetime(jobprofile_df['timestamp']) Loading Loading @@ -160,7 +150,7 @@ def load_data_from_df(jobs_df: pd.DataFrame, jobprofile_df: pd.DataFrame, **kwar 0 # priority (not supported for Frontier at the moment) ]) return jobs, start return jobs def xname_to_index(xname: str): Loading raps/dataloaders/marconi100.py +1 −2 Original line number Diff line number Diff line Loading @@ -50,7 +50,6 @@ def load_data_from_df(jobs_df: pd.DataFrame, **kwargs): reschedule = kwargs.get('reschedule') validate = kwargs.get('validate') jid = kwargs.get('jid', '*') start = None # Sort jobs dataframe based on values in time_start column, adjust indices after sorting jobs_df = jobs_df.sort_values(by='start_time') Loading Loading @@ -146,4 +145,4 @@ def load_data_from_df(jobs_df: pd.DataFrame, **kwargs): priority ]) return jobs, start return jobs raps/scheduler.py +1 −5 Original line number Diff line number Diff line Loading @@ -215,7 +215,7 @@ class TickData: class Scheduler: """Job scheduler and simulation manager.""" def __init__(self, total_nodes, down_nodes, power_manager, flops_manager, \ layout_manager, cooling_model=None, start=None, end=None, **kwargs): layout_manager, cooling_model=None, **kwargs): """Initialize the scheduler. Args: Loading Loading @@ -262,8 +262,6 @@ class Scheduler: self.replay = kwargs.get('replay') self.policy = kwargs.get('schedule') self.sys_util_history = [] self.start = start self.end = end def add_job(self, job): # add job to queue Loading Loading @@ -434,8 +432,6 @@ class Scheduler: if self.current_time % FMU_UPDATE_FREQ == 0: # Power for NUM_CDUS (25 for Frontier) cdu_power = rack_power.T[-1] * 1000 #print(self.start) #breakpoint() runtime_values = self.cooling_model.generate_runtime_values(cdu_power, self) # FMU inputs are N powers and the wetbulb temp Loading Loading
main.py +8 −9 Original line number Diff line number Diff line Loading @@ -19,6 +19,8 @@ if sys.version_info < (required_major, required_minor): parser = argparse.ArgumentParser(description='Resource Allocator & Power Simulator (RAPS)') parser.add_argument('-c', '--cooling', action='store_true', help='Include FMU cooling model') parser.add_argument('--start', type=str, help='ISO8061 string for start of simulation') parser.add_argument('--end', type=str, help='ISO8061 string for end of simulation') parser.add_argument('-d', '--debug', action='store_true', help='Enable debug mode and disable rich layout') parser.add_argument('-e', '--encrypt', action='store_true', help='Encrypt any sensitive data in telemetry') parser.add_argument('-n', '--numjobs', type=int, default=1000, help='Number of jobs to schedule') Loading Loading @@ -82,6 +84,9 @@ if args.cooling: cooling_model = ThermoFluidsModel(FMU_PATH) cooling_model.initialize() args.layout = "layout2" if args_dict['start']: cooling_model.weather = Weather(args_dict['start']) else: cooling_model = None Loading Loading @@ -115,18 +120,12 @@ if args.replay: # Read either npz file or telemetry parquet files if args.replay[0].endswith(".npz"): jobs, start = td.load_snapshot(args.replay[0]) sc.start = start jobs = td.load_snapshot(args.replay[0]) else: print(*args.replay) jobs, start = td.load_data(args.replay) if start is not None: sc.start = start['start_date'] jobs = td.load_data(args.replay) td.save_snapshot(jobs, filename=DIR_NAME) if args.cooling: cooling_model.weather = Weather(sc) # Set number of timesteps based on the last job running which we assume # is the maximum value of submit_time + wall_time of all the jobs if args.time: Loading
raps/cooling.py +29 −30 Original line number Diff line number Diff line Loading @@ -33,6 +33,12 @@ from datetime import datetime, timedelta load_config_variables(['FMU_OUTPUT_KEYS','NUM_CDUS', 'COOLING_EFFICIENCY','WET_BULB_TEMP', 'RACKS_PER_CDU', 'ZIP_CODE', 'COUNTRY_CODE'], globals()) temperature_key = "simulator_1_centralEnergyPlant_1_coolingTowerLoop_1_sources_Towb" base_path = 'simulator[1].centralEnergyPlant[1]' hot_water_loop_htwp = f'{base_path}.hotWaterLoop[1].summary.W_flow_HTWP_kW' cooling_tower_loop_ctwp = f'{base_path}.coolingTowerLoop[1].summary.W_flow_CTWP_kW' cooling_tower_loop_ct = f'{base_path}.coolingTowerLoop[1].summary.W_flow_CT_kW' # Define the Merge function outside of the class def merge_dicts(dict1, dict2): """ Loading Loading @@ -172,27 +178,20 @@ class ThermoFluidsModel: key = f"simulator_1_datacenter_1_computeBlock_{i+1}_cabinet_1_sources_Q_flow_total" runtime_values[key] = cdu_power[i] * COOLING_EFFICIENCY / RACKS_PER_CDU # Use temp from config by default if not sc.replay: runtime_values["simulator_1_centralEnergyPlant_1_coolingTowerLoop_1_sources_Towb"] = WET_BULB_TEMP # Default temperature is from the config temperature = WET_BULB_TEMP # Otherwise get temperature based on the day we're replaying else: if sc.start is not None: if self.weather.has_coords: # If replay mode is on and weather data is available if sc.replay and self.weather and self.weather.start is not None and self.weather.has_coords: # Convert total seconds to timedelta object delta = timedelta(seconds=sc.current_time) target_datetime = sc.start + delta # YYYY-MM-DD HH:MM:SS format target_datetime = self.weather.start + delta # Get temperature temperature = self.weather.get_temperature(target_datetime) # Get temperature from weather data temperature = self.weather.get_temperature(target_datetime) or WET_BULB_TEMP if temperature is not None: runtime_values["simulator_1_centralEnergyPlant_1_coolingTowerLoop_1_sources_Towb"] = temperature else: runtime_values["simulator_1_centralEnergyPlant_1_coolingTowerLoop_1_sources_Towb"] = WET_BULB_TEMP else: runtime_values["simulator_1_centralEnergyPlant_1_coolingTowerLoop_1_sources_Towb"] = WET_BULB_TEMP # Set the temperature value runtime_values[temperature_key] = temperature return runtime_values Loading Loading @@ -231,9 +230,9 @@ class ThermoFluidsModel: def calculate_pue(self, cooling_input, datacenter_output, cep_output): # Convert values from kW to Watts W_HTWPs = np.array(cep_output['simulator[1].centralEnergyPlant[1].hotWaterLoop[1].summary.W_flow_HTWP_kW']) * 1e3 W_CTWPs = np.array(cep_output['simulator[1].centralEnergyPlant[1].coolingTowerLoop[1].summary.W_flow_CTWP_kW']) * 1e3 W_CTs = np.array(cep_output['simulator[1].centralEnergyPlant[1].coolingTowerLoop[1].summary.W_flow_CT_kW']) * 1e3 W_HTWPs = np.array(cep_output[hot_water_loop_htwp]) * 1e3 W_CTWPs = np.array(cep_output[cooling_tower_loop_ctwp]) * 1e3 W_CTs = np.array(cep_output[cooling_tower_loop_ct]) * 1e3 # Initialize W_CDUPs as zero array of the same shape as datacenter output W_CDUPs = np.zeros_like(W_HTWPs) Loading
raps/dataloaders/frontier.py +1 −11 Original line number Diff line number Diff line Loading @@ -37,15 +37,6 @@ def load_data(files, **kwargs): jobprofile_path = files[1] jobprofile_df = pd.read_parquet(jobprofile_path, engine='pyarrow') # Add meta data for start date match = re.search(r'\d{4}-\d{2}-\d{2}', files[0]) if match: date_str = match.group() # Extract the date string # Convert to datetime object start_date = datetime.strptime(date_str, "%Y-%m-%d") jobs_df.attrs['metadata'] = {'start_date': start_date} return load_data_from_df(jobs_df, jobprofile_df, **kwargs) Loading @@ -70,7 +61,6 @@ def load_data_from_df(jobs_df: pd.DataFrame, jobprofile_df: pd.DataFrame, **kwar jobs_df = jobs_df.drop_duplicates(subset='job_id', keep='last').reset_index() jobs_df = jobs_df.sort_values(by='time_start') jobs_df = jobs_df.reset_index(drop=True) start = jobs_df.attrs['metadata'] # Convert timestamp column to datetime format jobprofile_df['timestamp'] = pd.to_datetime(jobprofile_df['timestamp']) Loading Loading @@ -160,7 +150,7 @@ def load_data_from_df(jobs_df: pd.DataFrame, jobprofile_df: pd.DataFrame, **kwar 0 # priority (not supported for Frontier at the moment) ]) return jobs, start return jobs def xname_to_index(xname: str): Loading
raps/dataloaders/marconi100.py +1 −2 Original line number Diff line number Diff line Loading @@ -50,7 +50,6 @@ def load_data_from_df(jobs_df: pd.DataFrame, **kwargs): reschedule = kwargs.get('reschedule') validate = kwargs.get('validate') jid = kwargs.get('jid', '*') start = None # Sort jobs dataframe based on values in time_start column, adjust indices after sorting jobs_df = jobs_df.sort_values(by='start_time') Loading Loading @@ -146,4 +145,4 @@ def load_data_from_df(jobs_df: pd.DataFrame, **kwargs): priority ]) return jobs, start return jobs
raps/scheduler.py +1 −5 Original line number Diff line number Diff line Loading @@ -215,7 +215,7 @@ class TickData: class Scheduler: """Job scheduler and simulation manager.""" def __init__(self, total_nodes, down_nodes, power_manager, flops_manager, \ layout_manager, cooling_model=None, start=None, end=None, **kwargs): layout_manager, cooling_model=None, **kwargs): """Initialize the scheduler. Args: Loading Loading @@ -262,8 +262,6 @@ class Scheduler: self.replay = kwargs.get('replay') self.policy = kwargs.get('schedule') self.sys_util_history = [] self.start = start self.end = end def add_job(self, job): # add job to queue Loading Loading @@ -434,8 +432,6 @@ class Scheduler: if self.current_time % FMU_UPDATE_FREQ == 0: # Power for NUM_CDUS (25 for Frontier) cdu_power = rack_power.T[-1] * 1000 #print(self.start) #breakpoint() runtime_values = self.cooling_model.generate_runtime_values(cdu_power, self) # FMU inputs are N powers and the wetbulb temp Loading