Commit d98ab85c authored by Brewer, Wes's avatar Brewer, Wes
Browse files

Merge branch 'test-scheduling' into 'main'

Test scheduling

See merge request !52
parents bde550d8 576704a3
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
{
    "SEED": 42,
    "JOB_ARRIVAL_TIME": 60,
    "JOB_ARRIVAL_TIME": 20,
    "MTBF": 11,
    "MAX_TIME": 88200,
    "TRACE_QUANTA": 20,
+3 −1
Original line number Diff line number Diff line
@@ -24,7 +24,7 @@ parser.add_argument('-e', '--encrypt', action='store_true', help='Encrypt any se
parser.add_argument('-n', '--numjobs', type=int, default=1000, help='Number of jobs to schedule')
parser.add_argument('-t', '--time', type=str, default=None, help='Length of time to simulate, e.g., 123, 123s, 27m, 3h, 7d')
parser.add_argument('-v', '--verbose', action='store_true', help='Enable verbose output')
parser.add_argument('-s', '--seed', action='store_true', help='Set random number seed for deterministic simulation')
parser.add_argument('--seed', action='store_true', help='Set random number seed for deterministic simulation')
parser.add_argument('-f', '--replay', nargs='+', type=str, help='Either: path/to/joblive path/to/jobprofile' + \
                                                                ' -or- filename.npz (overrides --workload option)')
parser.add_argument('--reschedule', action='store_true', help='Reschedule the telemetry workload')
@@ -39,6 +39,8 @@ parser.add_argument('-p', '--plot', nargs='+', choices=['power', 'loss', 'pue',
choices = ['png', 'svg', 'jpg', 'pdf', 'eps']
parser.add_argument('--imtype', type=str, choices=choices, default=choices[0], help='Plot image type')
parser.add_argument('--system', type=str, default='frontier', help='System config to use')
choices = ['fcfs', 'sjf', 'prq']
parser.add_argument('-s', '--schedule', type=str, choices=choices, default=choices[0], help='Type of schedule to use')
choices = ['random', 'benchmark', 'peak', 'idle']
parser.add_argument('-w', '--workload', type=str, choices=choices, default=choices[0], help='Type of synthetic workload')
choices = ['layout1', 'layout2']
+2 −1
Original line number Diff line number Diff line
@@ -145,7 +145,8 @@ def load_data_from_df(jobs_df: pd.DataFrame, jobprofile_df: pd.DataFrame, **kwar
                end_state,
                scheduled_nodes,
                time_offset,
                job_id
                job_id,
                0 # priority (not supported for Frontier at the moment)
            ])

    return jobs
+17 −11
Original line number Diff line number Diff line
@@ -110,6 +110,9 @@ def load_data_from_df(jobs_df: pd.DataFrame, **kwargs):
            gpu_util = power_to_utilization(gpu_power_array, gpu_min_power, gpu_max_power)
            gpu_trace = gpu_util * GPUS_PER_NODE
            
        priority = int(jobs_df.loc[i, 'priority'])
            
        # wall_time = jobs_df.loc[i, 'run_time']
        wall_time = gpu_trace.size * TRACE_QUANTA # seconds
        end_state = jobs_df.loc[i, 'job_state']
        time_start = jobs_df.loc[i+1, 'start_time']
@@ -127,6 +130,8 @@ def load_data_from_df(jobs_df: pd.DataFrame, **kwargs):
        else: # Prescribed replay
            scheduled_nodes = (jobs_df.loc[i, 'nodes']).tolist()
            
        # if gpu_trace.size > 0 and (jid == job_id or jid == '*'):
        if (gpu_trace.size > 0):
            jobs.append([
                nodes_required,
                name,
@@ -136,7 +141,8 @@ def load_data_from_df(jobs_df: pd.DataFrame, **kwargs):
                end_state,
                scheduled_nodes,
                time_offset,
            job_id
                job_id, 
                priority
            ])

    return jobs
+19 −15
Original line number Diff line number Diff line
@@ -147,6 +147,7 @@ class Job:
            self.id = vector[8]
        else:
            self.id = Job._get_next_id()
        self.priority = vector[9]
        self.start_time = None
        self.end_time = None
        self.running_time = 0
@@ -180,11 +181,6 @@ class Job:
        else:
            raise ValueError(f"Invalid state: {value}")

    def __lt__(self, other):
        """Implement less than comparison for jobs based on wall time."""
        #return self.end_time < other.end_time # First-come, First-served (FCFS)
        return self.wall_time < other.wall_time  # Shortest-job-first (SJF)

    @classmethod
    def _get_next_id(cls):
        """Generate the next unique identifier for a job.
@@ -261,21 +257,32 @@ class Scheduler:
        self.debug = kwargs.get('debug')
        self.output = kwargs.get('output')
        self.replay = kwargs.get('replay')
        self.policy = kwargs.get('schedule')
        self.sys_util_history = []

    def add_job(self, job):
        self.queue.append(job)
        self._sort_queue()

    def _sort_queue(self):
        if self.policy == 'fcfs':
            self.queue.sort(key=lambda job: job.submit_time)
        elif self.policy == 'sjf':
            self.queue.sort(key=lambda job: job.wall_time)
        elif self.policy == 'prq':
            self.queue.sort(key=lambda job: -job.priority)
        else:
            raise ValueError(f"The scheduling policy {self.policy} is not supported.")
    
    def schedule(self, jobs):
        """Schedule jobs."""
        for job_vector in jobs:
            job = Job(job_vector, self.current_time)
            heapq.heappush(self.queue, job)

        #if self.debug:
        #    print(f"\nt={self.current_time} queue={self.queue} heapq={heapq}")
            self.add_job(job)

        while self.queue:

            job = heapq.heappop(self.queue)

            job = self.queue.pop(0)
            synthetic_bool = len(self.available_nodes) >= job.nodes_required
            telemetry_bool = job.requested_nodes and job.requested_nodes[0] in self.available_nodes

@@ -305,7 +312,7 @@ class Scheduler:
                    print(f"t={self.current_time}: Scheduled job with wall time {job.wall_time} on nodes {scheduled_nodes}")

            else:
                heapq.heappush(self.queue, job)
                self.queue.append(job)
                break

    def tick(self):
@@ -342,9 +349,6 @@ class Scheduler:
                            self.available_nodes.append(node)
                    self.available_nodes.sort()

                    # Keep the job in the queue for visibility
                    heapq.heappush(self.queue, job)

                    # Remove job from the list of running jobs
                    self.running.remove(job)

Loading