Commit 9f4d5b5a authored by Hines, Jesse's avatar Hines, Jesse
Browse files

Merge branch 'refactor-dataloaders' into 'develop'

Dataloader dates

See merge request !111
parents 7c70f33f 04692718
Loading
Loading
Loading
Loading
+10 −11
Original line number Diff line number Diff line
@@ -24,7 +24,7 @@ import pandas as pd
from tqdm import tqdm

from ..job import job_dict, Job
from ..utils import next_arrival_byconfkwargs
from ..utils import WorkloadData


def load_data(jobs_path, **kwargs):
@@ -58,7 +58,6 @@ def load_data_from_df(jobs_df: pd.DataFrame, **kwargs):
    """
    count_jobs_notOK = 0
    config = kwargs.get('config')
    arrival = kwargs.get('arrival')
    validate = kwargs.get('validate')
    jid = kwargs.get('jid', '*')

@@ -146,10 +145,6 @@ def load_data_from_df(jobs_df: pd.DataFrame, **kwargs):

        priority = int(jobs_df.loc[jidx, 'priority'])

        if arrival == 'poisson':  # Modify the arrival times of the jobs according to Poisson distribution
            scheduled_nodes = None
            submit_time = next_arrival_byconfkwargs(config, kwargs)
        else:  # Prescribed replay
        scheduled_nodes = (jobs_df.loc[jidx, 'nodes']).tolist()

        submit_timestamp = jobs_df.loc[jidx, 'submit_time']
@@ -205,7 +200,11 @@ def load_data_from_df(jobs_df: pd.DataFrame, **kwargs):
            count_jobs_notOK += 1

    print("jobs not added: ", count_jobs_notOK)
    return jobs, telemetry_start_time, telemetry_end_time
    return WorkloadData(
        jobs=jobs,
        telemetry_start=telemetry_start_time, telemetry_end=telemetry_end_time,
        start_date=telemetry_start_timestamp.tz_localize("UTC"),
    )


def xname_to_index(xname: str, config: dict):
+12 −3
Original line number Diff line number Diff line
@@ -42,8 +42,10 @@ import math
import re
import pandas as pd
from pathlib import Path
from datetime import datetime, timezone
from pprint import pprint
from raps.telemetry import Job, job_dict
from raps.utils import WorkloadData


def throughput_traces(total_tx, total_rx, intervals):
@@ -345,9 +347,16 @@ def load_data(local_dataset_path, **kwargs):
        j.trace_start_time -= t0
        j.trace_end_time -= t0

    # pprint(jobs)

    if debug:
        pprint(jobs)

    simulation_start = 0
    simulation_end = max((j.end_time for j in jobs), default=0)
    return jobs, simulation_start, simulation_end
    telemetry_start = 0
    telemetry_end = max((j.end_time for j in jobs), default=0)

    return WorkloadData(
        jobs=jobs,
        telemetry_start=telemetry_start, telemetry_end=telemetry_end,
        start_date=datetime.fromtimestamp(t0, timezone.utc),
    )
+20 −18
Original line number Diff line number Diff line
@@ -10,12 +10,13 @@
    python -m raps.telemetry -f $DPATH/slurm/joblive/$DATEDIR,$DPATH/jobprofile/$DATEDIR
"""
import time
from datetime import datetime, timezone
import numpy as np
import pandas as pd
from tqdm import tqdm

from ..job import job_dict, Job
from ..utils import power_to_utilization, next_arrival_byconfkwargs, encrypt
from ..utils import power_to_utilization, encrypt, WorkloadData


def aging_boost(nnodes):
@@ -136,7 +137,6 @@ def load_data_from_df(jobs_df: pd.DataFrame, jobprofile_df: pd.DataFrame, **kwar
    """
    config = kwargs.get('config')
    encrypt_bool = kwargs.get('encrypt')
    arrival = kwargs.get('arrival')
    validate = kwargs.get('validate')
    jid = kwargs.get('jid', '*')
    debug = kwargs.get('debug')
@@ -266,14 +266,6 @@ def load_data_from_df(jobs_df: pd.DataFrame, jobprofile_df: pd.DataFrame, **kwar
        if '' in xnames:
            continue

        if arrival == 'poisson':  # Modify the arrival times of the jobs according to Poisson distribution
            scheduled_nodes = None
            submit_time = next_arrival_byconfkwargs(config, kwargs)
            start_time = None  # ?
            end_time = None  # ?
            priority = aging_boost(nodes_required)

        else:  # Prescribed replay
        scheduled_nodes = []
        # priority = 0  # not used for replay
        priority = aging_boost(nodes_required)
@@ -325,7 +317,12 @@ def load_data_from_df(jobs_df: pd.DataFrame, jobprofile_df: pd.DataFrame, **kwar

            job = Job(job_info)
            jobs.append(job)
    return jobs, telemetry_start, telemetry_end
    return WorkloadData(
        jobs=jobs,
        telemetry_start=telemetry_start,
        telemetry_end=telemetry_end,
        start_date=telemetry_start_timestamp,
    )


def load_live_data(**kwargs):
@@ -537,7 +534,12 @@ def load_live_data(**kwargs):
        job = Job(job_info)
        jobs.append(job)

    return jobs, telemetry_start, telemetry_end
    return WorkloadData(
        jobs=jobs,
        telemetry_start=telemetry_start,
        telemetry_end=telemetry_end,
        start_date=datetime.fromtimestamp(telemetry_start, timezone.utc),
    )


def xname_to_index(xname: str, config: dict):
+6 −8
Original line number Diff line number Diff line
@@ -17,6 +17,7 @@
import pandas as pd
from tqdm import tqdm
from ..job import job_dict, Job
from ..utils import WorkloadData


def load_data(path, **kwargs):
@@ -134,13 +135,6 @@ def load_data_from_df(df, **kwargs):
        trace_missing_values = False  # Sane Choice?
        trace_quanta = config['TRACE_QUANTA']

        # Should we still have this?
        # if arrival == 'poisson':  # Modify the arrival times of according to Poisson distribution
        #     time_offset = next_arrival(1/config['JOB_ARRIVAL_TIME'])
        # else:
        #     time_offset = (submit_time - min_time).total_seconds()  # Compute time offset in seconds
        # Removed from job_dict: time_offset=time_offset,

        # Create job dictionary
        job_info = job_dict(
            nodes_required=nodes_required,
@@ -167,7 +161,11 @@ def load_data_from_df(df, **kwargs):
        job = Job(job_info)
        job_list.append(job)

    return job_list, telemetry_start, telemetry_end
    return WorkloadData(
        jobs=job_list,
        telemetry_start=telemetry_start, telemetry_end=telemetry_end,
        start_date=telemetry_start_timestamp,
    )


def node_index_to_name(index: int, config: dict):
+13 −7
Original line number Diff line number Diff line
import os
import re
from datetime import datetime
from tqdm import tqdm
from typing import List, Optional, Generator, Tuple, Any, Union
from typing import List, Optional, Generator, Any, Union

import numpy as np
import pandas as pd

from raps.job import job_dict
from raps.job import Job
from raps.job import job_dict, Job
from raps.utils import WorkloadData

"""
Official instructions are here:
@@ -200,7 +201,7 @@ class GoogleClusterV2DataLoader:
            yield pd.concat(dfs, ignore_index=True)


def load_data(data_path: Union[str, List[str]], **kwargs: Any) -> Tuple[List[Any], float, float]:
def load_data(data_path: Union[str, List[str]], **kwargs: Any):
    config = kwargs.get('config')
    # Unpack list
    if isinstance(data_path, list):
@@ -331,6 +332,11 @@ def load_data(data_path: Union[str, List[str]], **kwargs: Any) -> Tuple[List[Any
        jobs.append(Job(job_d))

    # Compute simulation span: start at t=0, end at the latest job finish
    simulation_start = 0
    simulation_end = int(max(usage_map_end.values()) - t0)
    return jobs, simulation_start, simulation_end
    telemetry_start = 0
    telemetry_end = int(max(usage_map_end.values()) - t0)
    return WorkloadData(
        jobs=jobs,
        telemetry_start=telemetry_start, telemetry_end=telemetry_end,
        # gcloud dataset timestamps are already relative, and it doesn't list a start exact date.
        start_date=datetime.fromisoformat("2011-05-02T00:00:00Z"),
    )
Loading