Commit 766eef73 authored by Maiterth, Matthias's avatar Maiterth, Matthias
Browse files

Updated for F-Data and disabled node-failure sim (random number generator...

Updated for F-Data and disabled node-failure sim (random number generator takes time and functionality is not implemendted properly)
parent d2de6862
Loading
Loading
Loading
Loading
+2 −1
Original line number Diff line number Diff line
@@ -95,6 +95,7 @@ def load_data_from_df(df, **kwargs):
        scheduled_nodes = None  # Only nodes_required is in the trace

        job_id = row['jid'] if 'jid' in df.columns else 'unknown'

        priority = row['pri'] if 'pri' in df.columns else 0

        submit_timestamp = pd.to_datetime(row['adt']) if 'adt' in df.columns else -1  # Else job was submitted in the past
@@ -143,7 +144,7 @@ def load_data_from_df(df, **kwargs):
            nrx_trace=[],
            end_state=end_state,
            scheduled_nodes=scheduled_nodes,
            job_id=job_id,
            id=job_id,
            priority=priority,
            submit_time=submit_time,
            time_limit=time_limit,
+21 −11
Original line number Diff line number Diff line
from typing import Optional, List
import dataclasses
import pandas as pd

import sys
import numpy as np

from .job import Job, JobState
from .policy import PolicyType
@@ -185,19 +184,30 @@ class Engine:
                # job ended before.
                # For every other error condition trace_start_ and
                # _end_time are used!

                #print(type(job.cpu_trace))
                if isinstance(job.cpu_trace,list) or isinstance(job.cpu_trace,np.ndarray):
                    if time_quanta_index < len(job.cpu_trace):
                        cpu_util = get_utilization(job.cpu_trace, time_quanta_index)
                    else:
                        cpu_util = get_utilization(job.cpu_trace, len(job.cpu_trace) - 1)
                elif isinstance(job.cpu_trace,float) or isinstance(job.cpu_trace,int):
                    cpu_util = job.cpu_trace
                else:
                    raise NotImplementedError()

                if isinstance(job.gpu_trace,list) or isinstance(job.gpu_trace,np.ndarray):
                    if time_quanta_index < len(job.gpu_trace):
                        gpu_util = get_utilization(job.gpu_trace, time_quanta_index)
                    else:
                        gpu_util = get_utilization(job.gpu_trace, len(job.gpu_trace) - 1)
                elif isinstance(job.gpu_trace,float) or isinstance(job.gpu_trace,int):
                    gpu_util = job.gpu_trace
                else:
                    raise NotImplementedError()

                net_util = 0

                if isinstance(job.ntx_trace,List) and len(job.ntx_trace) and isinstance(job.nrx_trace,List) and len(job.nrx_trace):
                if (isinstance(job.ntx_trace,list) or isinstance(job.ntx_trace,np.ndarray)) and len(job.ntx_trace) and (isinstance(job.nrx_trace,list) or isinstance(job.nrx_trace,list)) and len(job.nrx_trace):
                    net_tx = get_utilization(job.ntx_trace, time_quanta_index)
                    net_rx = get_utilization(job.nrx_trace, time_quanta_index)
                    net_util = network_utilization(net_tx, net_rx)
+35 −14
Original line number Diff line number Diff line
from enum import Enum
import numpy as np

"""
Note: want to simplify this in the future to use a minimal required set of job attributes,
@@ -159,22 +160,42 @@ class JobStatistics:
        self.start_time = job.start_time
        self.end_time = job.end_time
        self.state = job._state
        if isinstance(job.cpu_trace,list) or isinstance(job.cpu_trace,np.ndarray):
            if len(job.cpu_trace) == 0:
                self.avg_cpu_usage = 0
            else:
                self.avg_cpu_usage = sum(job.cpu_trace) / len(job.cpu_trace)
        elif isinstance(job.cpu_trace,int) or isinstance(job.cpu_trace,float):
            self.avg_cpu_usage = job.cpu_trace
        else:
            raise NotImplementedError()

        if isinstance(job.gpu_trace,list) or isinstance(job.gpu_trace,np.ndarray):
            if len(job.gpu_trace) == 0:
                self.avg_gpu_usage = 0
            else:
                self.avg_gpu_usage = sum(job.gpu_trace) / len(job.gpu_trace)
        elif isinstance(job.gpu_trace,int) or isinstance(job.gpu_trace,float):
            self.avg_gpu_usage = job.gpu_trace
        else:
            raise NotImplementedError()

        if isinstance(job.ntx_trace,list) or isinstance(job.ntx_trace,np.ndarray):
            if len(job.ntx_trace) == 0:
                self.avg_ntx_usage = 0
            else:
                self.avg_ntx_usage = sum(job.ntx_trace) / len(job.ntx_trace)
        elif isinstance(job.ntx_trace,int) or isinstance(job.ntx_trace,float):
            self.avg_ntx_usage = job.ntx_trace

        if isinstance(job.nrx_trace,list) or isinstance(job.nrx_trace,np.ndarray):
            if len(job.nrx_trace) == 0:
                self.avg_nrx_usage = 0
            else:
                self.avg_nrx_usage = sum(job.nrx_trace) / len(job.nrx_trace)
        elif isinstance(job.nrx_trace,int) or isinstance(job.nrx_trace,float):
            self.avg_nrx_usage = job.nrx_trace

        if len(job.power_history) == 0:
            self.avg_node_power = 0
            self.max_node_power = 0
+1 −0
Original line number Diff line number Diff line
@@ -54,6 +54,7 @@ class ResourceManager:
        return utilization

    def node_failure(self, mtbf):
        return []
        """Simulate node failure using Weibull distribution."""
        shape_parameter = 1.5
        scale_parameter = mtbf * 3600  # Convert to seconds