Loading raps/policy.py +4 −3 Original line number Diff line number Diff line from enum import Enum from .utils import ValueComparableEnum class PolicyType(Enum): class PolicyType(ValueComparableEnum): """Supported scheduling policies.""" REPLAY = 'replay' # Default is specified in each scheduler! FCFS = 'fcfs' Loading @@ -9,7 +9,8 @@ class PolicyType(Enum): SJF = 'sjf' LJF = 'ljf' class BackfillType(Enum): class BackfillType(ValueComparableEnum): """Supported backfilling policies.""" NONE = None FIRSTFIT = 'firstfit' Loading raps/schedulers/experimental.py +41 −11 Original line number Diff line number Diff line Loading @@ -6,12 +6,14 @@ from ..policy import BackfillType # Extending PolicyType: from ..policy import PolicyType as BasePolicyType from ..utils import ValueComparableEnum class ExtendedPolicyType(Enum): class ExtendedPolicyType(ValueComparableEnum): ACCT_FUGAKU_PTS = 'acct_fugaku_pts' ACCT_AVG_P = 'acct_avg_power' ACCT_AVG_PW4LJ = 'acct_avg_power_w4lj' ACCT_LOW_AVG_P = 'acct_low_avg_power' ACCT_AVG_PW4LJ = 'acct_avg_power_w4jl' ACCT_EDP = 'acct_edp' ACCT_ED2P = 'acct_ed2p' ACCT_PDP = 'acct_pdp' Loading @@ -22,7 +24,7 @@ combined_members = { **{name: member.value for name, member in BasePolicyType.__members__.items()}, **{name: member.value for name, member in ExtendedPolicyType.__members__.items()} } PolicyType = Enum('PolicyType', combined_members) PolicyType = Enum('PolicyType', combined_members, type=ValueComparableEnum) # The scheduler can now use both the BasePolicies and the Extended Policies Loading @@ -42,12 +44,16 @@ class Scheduler: def sort_jobs(self, queue, accounts=None): """Sort jobs based on the selected scheduling policy.""" if self.policy == PolicyType.ACCT_FUGAKU_PTS: if self.policy == PolicyType.REPLAY: # REPLAY NEEDS TO BE THERE return sorted(queue, key=lambda job: job.start_time) elif self.policy == PolicyType.ACCT_FUGAKU_PTS: return self.sort_fugaku_redeeming(queue, accounts) elif self.policy == PolicyType.ACCT_AVG_PW4LJ: return self.sort_avg_Pw4LJ(queue, accounts) elif self.policy == PolicyType.ACCT_AVG_P: return self.sort_avg_P(queue, accounts) elif self.policy == PolicyType.ACCT_LOW_AVG_P: return self.sort_low_avg_P(queue, accounts) elif self.policy == PolicyType.ACCT_EDP: return self.sort_AEDP(queue, accounts) elif self.policy == PolicyType.ACCT_ED2P: Loading Loading @@ -81,12 +87,16 @@ class Scheduler: self.backfill(queue, running, current_time) # After backfill dedice continue processing the queue or wait, continuing may result in fairness issues. if False: # self.policy in [PolicyType.REPLAY]: # print(f"Nodes available {nodes_available} - Req:{len(job.requested_nodes)} N-avail:{len(self.resource_manager.available_nodes)}") print(f"if {self.policy} in [PolicyType.REPLAY]") # REPLAY NEEDS TO BE THERE print(f"== {self.policy in [PolicyType.REPLAY]}") # REPLAY NEEDS TO BE THERE print(f"==value {self.policy.value in [PolicyType.REPLAY.value]}") # REPLAY NEEDS TO BE THERE print(f"type {type(self.policy)} in [{type(PolicyType.REPLAY)}]") # REPLAY NEEDS TO BE THERE print(f"id {id(self.policy)} in [{id(PolicyType.REPLAY)}]") # REPLAY NEEDS TO BE THERE if self.policy in [PolicyType.REPLAY]: # REPLAY NEEDS TO BE THERE continue # Regardless if the job at the front of the queue doenst fit, try placing all of them. elif self.policy in [PolicyType.ACCT_FUGAKU_PTS, PolicyType.ACCT_AVG_PW4LJ, PolicyType.ACCT_AVG_P, PolicyType.ACCT_EDP, PolicyType.ACCT_ED2P, PolicyType.ACCT_PDP, PolicyType.ACCT_AVG_PW4LJ, PolicyType.ACCT_LOW_AVG_P, PolicyType.ACCT_AVG_P, PolicyType.ACCT_EDP, PolicyType.ACCT_ED2P, PolicyType.ACCT_PDP ]: break # The job at the front of the queue doesnt fit stop processing the queue. else: Loading Loading @@ -270,6 +280,26 @@ class Scheduler: queue = list(queue) return queue def sort_low_avg_P(self, queue, accounts=None): if queue == []: return queue priority_tuple_list = [] for job in queue: power = accounts.account_dict[job.account].avg_power if power is None: power = 0 priority = power priority_tuple_list.append((priority,job)) # Sort everythin according to power_acct_priority Disregarding size priority_tuple_list = sorted(priority_tuple_list, key=lambda x:x[0], reverse=False) queue = [] if priority_tuple_list != []: _, queue = zip(*priority_tuple_list) queue = list(queue) return queue def sort_AEDP(self, queue, accounts=None): if queue == []: return queue Loading @@ -285,7 +315,7 @@ class Scheduler: priority = energy * time priority_tuple_list.append((priority,job)) # Sort everythin according to power_acct_priority Disregarding size priority_tuple_list = sorted(priority_tuple_list, key=lambda x:x[0], reverse=True) priority_tuple_list = sorted(priority_tuple_list, key=lambda x:x[0], reverse=False) queue = [] if priority_tuple_list != []: _, queue = zip(*priority_tuple_list) Loading @@ -307,7 +337,7 @@ class Scheduler: priority = energy * time * time priority_tuple_list.append((priority,job)) # Sort everythin according to power_acct_priority Disregarding size priority_tuple_list = sorted(priority_tuple_list, key=lambda x:x[0], reverse=True) priority_tuple_list = sorted(priority_tuple_list, key=lambda x:x[0], reverse=False) queue = [] if priority_tuple_list != []: _, queue = zip(*priority_tuple_list) Loading @@ -329,7 +359,7 @@ class Scheduler: priority = power * time priority_tuple_list.append((priority,job)) # Sort everythin according to power_acct_priority Disregarding size priority_tuple_list = sorted(priority_tuple_list, key=lambda x:x[0], reverse=True) priority_tuple_list = sorted(priority_tuple_list, key=lambda x:x[0], reverse=False) queue = [] if priority_tuple_list != []: _, queue = zip(*priority_tuple_list) Loading raps/utils.py +11 −0 Original line number Diff line number Diff line Loading @@ -7,6 +7,7 @@ generating random numbers, summarizing and expanding ranges, determining job sta """ from datetime import timedelta from enum import Enum import hashlib import math Loading Loading @@ -382,3 +383,13 @@ def get_utilization(trace, time_quanta_index): return float(trace) else: raise TypeError(f"Invalid type for utilization: {type(trace)}.") class ValueComparableEnum(Enum): def __eq__(self, other): if isinstance(other, Enum): return self.value == other.value return self.value == other def __hash__(self): # required if you override __eq__ return hash(self.value) Loading
raps/policy.py +4 −3 Original line number Diff line number Diff line from enum import Enum from .utils import ValueComparableEnum class PolicyType(Enum): class PolicyType(ValueComparableEnum): """Supported scheduling policies.""" REPLAY = 'replay' # Default is specified in each scheduler! FCFS = 'fcfs' Loading @@ -9,7 +9,8 @@ class PolicyType(Enum): SJF = 'sjf' LJF = 'ljf' class BackfillType(Enum): class BackfillType(ValueComparableEnum): """Supported backfilling policies.""" NONE = None FIRSTFIT = 'firstfit' Loading
raps/schedulers/experimental.py +41 −11 Original line number Diff line number Diff line Loading @@ -6,12 +6,14 @@ from ..policy import BackfillType # Extending PolicyType: from ..policy import PolicyType as BasePolicyType from ..utils import ValueComparableEnum class ExtendedPolicyType(Enum): class ExtendedPolicyType(ValueComparableEnum): ACCT_FUGAKU_PTS = 'acct_fugaku_pts' ACCT_AVG_P = 'acct_avg_power' ACCT_AVG_PW4LJ = 'acct_avg_power_w4lj' ACCT_LOW_AVG_P = 'acct_low_avg_power' ACCT_AVG_PW4LJ = 'acct_avg_power_w4jl' ACCT_EDP = 'acct_edp' ACCT_ED2P = 'acct_ed2p' ACCT_PDP = 'acct_pdp' Loading @@ -22,7 +24,7 @@ combined_members = { **{name: member.value for name, member in BasePolicyType.__members__.items()}, **{name: member.value for name, member in ExtendedPolicyType.__members__.items()} } PolicyType = Enum('PolicyType', combined_members) PolicyType = Enum('PolicyType', combined_members, type=ValueComparableEnum) # The scheduler can now use both the BasePolicies and the Extended Policies Loading @@ -42,12 +44,16 @@ class Scheduler: def sort_jobs(self, queue, accounts=None): """Sort jobs based on the selected scheduling policy.""" if self.policy == PolicyType.ACCT_FUGAKU_PTS: if self.policy == PolicyType.REPLAY: # REPLAY NEEDS TO BE THERE return sorted(queue, key=lambda job: job.start_time) elif self.policy == PolicyType.ACCT_FUGAKU_PTS: return self.sort_fugaku_redeeming(queue, accounts) elif self.policy == PolicyType.ACCT_AVG_PW4LJ: return self.sort_avg_Pw4LJ(queue, accounts) elif self.policy == PolicyType.ACCT_AVG_P: return self.sort_avg_P(queue, accounts) elif self.policy == PolicyType.ACCT_LOW_AVG_P: return self.sort_low_avg_P(queue, accounts) elif self.policy == PolicyType.ACCT_EDP: return self.sort_AEDP(queue, accounts) elif self.policy == PolicyType.ACCT_ED2P: Loading Loading @@ -81,12 +87,16 @@ class Scheduler: self.backfill(queue, running, current_time) # After backfill dedice continue processing the queue or wait, continuing may result in fairness issues. if False: # self.policy in [PolicyType.REPLAY]: # print(f"Nodes available {nodes_available} - Req:{len(job.requested_nodes)} N-avail:{len(self.resource_manager.available_nodes)}") print(f"if {self.policy} in [PolicyType.REPLAY]") # REPLAY NEEDS TO BE THERE print(f"== {self.policy in [PolicyType.REPLAY]}") # REPLAY NEEDS TO BE THERE print(f"==value {self.policy.value in [PolicyType.REPLAY.value]}") # REPLAY NEEDS TO BE THERE print(f"type {type(self.policy)} in [{type(PolicyType.REPLAY)}]") # REPLAY NEEDS TO BE THERE print(f"id {id(self.policy)} in [{id(PolicyType.REPLAY)}]") # REPLAY NEEDS TO BE THERE if self.policy in [PolicyType.REPLAY]: # REPLAY NEEDS TO BE THERE continue # Regardless if the job at the front of the queue doenst fit, try placing all of them. elif self.policy in [PolicyType.ACCT_FUGAKU_PTS, PolicyType.ACCT_AVG_PW4LJ, PolicyType.ACCT_AVG_P, PolicyType.ACCT_EDP, PolicyType.ACCT_ED2P, PolicyType.ACCT_PDP, PolicyType.ACCT_AVG_PW4LJ, PolicyType.ACCT_LOW_AVG_P, PolicyType.ACCT_AVG_P, PolicyType.ACCT_EDP, PolicyType.ACCT_ED2P, PolicyType.ACCT_PDP ]: break # The job at the front of the queue doesnt fit stop processing the queue. else: Loading Loading @@ -270,6 +280,26 @@ class Scheduler: queue = list(queue) return queue def sort_low_avg_P(self, queue, accounts=None): if queue == []: return queue priority_tuple_list = [] for job in queue: power = accounts.account_dict[job.account].avg_power if power is None: power = 0 priority = power priority_tuple_list.append((priority,job)) # Sort everythin according to power_acct_priority Disregarding size priority_tuple_list = sorted(priority_tuple_list, key=lambda x:x[0], reverse=False) queue = [] if priority_tuple_list != []: _, queue = zip(*priority_tuple_list) queue = list(queue) return queue def sort_AEDP(self, queue, accounts=None): if queue == []: return queue Loading @@ -285,7 +315,7 @@ class Scheduler: priority = energy * time priority_tuple_list.append((priority,job)) # Sort everythin according to power_acct_priority Disregarding size priority_tuple_list = sorted(priority_tuple_list, key=lambda x:x[0], reverse=True) priority_tuple_list = sorted(priority_tuple_list, key=lambda x:x[0], reverse=False) queue = [] if priority_tuple_list != []: _, queue = zip(*priority_tuple_list) Loading @@ -307,7 +337,7 @@ class Scheduler: priority = energy * time * time priority_tuple_list.append((priority,job)) # Sort everythin according to power_acct_priority Disregarding size priority_tuple_list = sorted(priority_tuple_list, key=lambda x:x[0], reverse=True) priority_tuple_list = sorted(priority_tuple_list, key=lambda x:x[0], reverse=False) queue = [] if priority_tuple_list != []: _, queue = zip(*priority_tuple_list) Loading @@ -329,7 +359,7 @@ class Scheduler: priority = power * time priority_tuple_list.append((priority,job)) # Sort everythin according to power_acct_priority Disregarding size priority_tuple_list = sorted(priority_tuple_list, key=lambda x:x[0], reverse=True) priority_tuple_list = sorted(priority_tuple_list, key=lambda x:x[0], reverse=False) queue = [] if priority_tuple_list != []: _, queue = zip(*priority_tuple_list) Loading
raps/utils.py +11 −0 Original line number Diff line number Diff line Loading @@ -7,6 +7,7 @@ generating random numbers, summarizing and expanding ranges, determining job sta """ from datetime import timedelta from enum import Enum import hashlib import math Loading Loading @@ -382,3 +383,13 @@ def get_utilization(trace, time_quanta_index): return float(trace) else: raise TypeError(f"Invalid type for utilization: {type(trace)}.") class ValueComparableEnum(Enum): def __eq__(self, other): if isinstance(other, Enum): return self.value == other.value return self.value == other def __hash__(self): # required if you override __eq__ return hash(self.value)