Commit 1f7cb50d authored by Brewer, Wes's avatar Brewer, Wes
Browse files

Add support for fastforward to frontier and marconi100 dataloaders

parent 9223dc66
Loading
Loading
Loading
Loading
+8 −10
Original line number Diff line number Diff line
@@ -37,7 +37,7 @@ Note: Requires python3.9 or greater.
Download `job_table.parquet` from https://zenodo.org/records/10127767

    # Marconi100
    python main.py --system marconi100 -f ~/data/job_table.parquet 
    python main.py --system marconi100 -f ~/data/marconi100/job_table.parquet 

## Snapshot of extracted workload data

@@ -55,15 +55,6 @@ given instead of the parquet files for more quickly running subsequent simulatio

    python -m raps.telemetry -f $DPATH/slurm/joblive/$DATEDIR $DPATH/jobprofile/$DATEDIR

## OpenStreetMap Attribution

Map data used in this project is provided by [OpenStreetMap](https://www.openstreetmap.org/copyright) and is available under the Open Database License (ODbL). © OpenStreetMap contributors.

## Open-Meteo API Attribution

Weather data used in this project is provided by the [Open-Meteo API](https://open-meteo.com/en/docs). Open-Meteo offers free weather forecast data for various applications, and their API provides easy access to weather information without requiring user authentication.


## Build and run Docker container

    make docker_build && make docker_run
@@ -90,3 +81,10 @@ All new contributions must be made under both the MIT and Apache-2.0 licenses.
See LICENSE-MIT, LICENSE-APACHE, COPYRIGHT, NOTICE, and CONTRIBUTORS.txt for details.  

SPDX-License-Identifier: (Apache-2.0 OR MIT)  

## Attributions

Map data used in this project is provided by [OpenStreetMap](https://www.openstreetmap.org/copyright) and is available under the Open Database License (ODbL). © OpenStreetMap contributors.

Weather data used in this project is provided by the [Open-Meteo API](https://open-meteo.com/en/docs). Open-Meteo offers free weather forecast data for various applications, and their API provides easy access to weather information without requiring user authentication.
+6 −1
Original line number Diff line number Diff line
@@ -49,10 +49,13 @@ def load_data_from_df(jobs_df: pd.DataFrame, jobprofile_df: pd.DataFrame, **kwar
        The list of parsed jobs.
    """
    encrypt_bool = kwargs.get('encrypt')
    fastforward = kwargs.get('fastforward')
    reschedule = kwargs.get('reschedule')
    validate = kwargs.get('validate')
    jid = kwargs.get('jid', '*')

    if fastforward: print(f"fast-forwarding {fastforward} seconds")

    min_time = kwargs.get('min_time', None)

    # Sort jobs dataframe based on values in time_start column, adjust indices after sorting
@@ -122,6 +125,8 @@ def load_data_from_df(jobs_df: pd.DataFrame, jobprofile_df: pd.DataFrame, **kwar
        diff = time_start - time_zero
        time_offset = max(diff.total_seconds(), 0)

        if fastforward: time_offset -= fastforward

        xnames = jobs_df.loc[jidx, 'xnames']
        # Don't replay any job with an empty set of xnames
        if '' in xnames: continue
@@ -135,7 +140,7 @@ def load_data_from_df(jobs_df: pd.DataFrame, jobprofile_df: pd.DataFrame, **kwar
                indices = xname_to_index(xname)
                scheduled_nodes.append(indices)

        if gpu_trace.size > 0 and (jid == job_id or jid == '*'):
        if gpu_trace.size > 0 and (jid == job_id or jid == '*') and time_offset > 0:
            job_info = job_dict(nodes_required, name, cpu_trace, gpu_trace, wall_time, 
                                end_state, scheduled_nodes, time_offset, job_id)
            jobs.append(job_info)
+6 −1
Original line number Diff line number Diff line
@@ -49,9 +49,12 @@ def load_data_from_df(jobs_df: pd.DataFrame, **kwargs):
    """
    min_time = kwargs.get('min_time', None)
    reschedule = kwargs.get('reschedule')
    fastforward = kwargs.get('fastforward')
    validate = kwargs.get('validate')
    jid = kwargs.get('jid', '*')

    if fastforward: print(f"fast-forwarding {fastforward} seconds")

    # Sort jobs dataframe based on values in time_start column, adjust indices after sorting
    jobs_df = jobs_df.sort_values(by='start_time')
    jobs_df = jobs_df.reset_index(drop=True)
@@ -125,13 +128,15 @@ def load_data_from_df(jobs_df: pd.DataFrame, **kwargs):
            # When extracting out a single job, run one iteration past the end of the job
            time_offset = UI_UPDATE_FREQ

        if fastforward: time_offset -= fastforward

        if reschedule: # Let the scheduler reschedule the jobs
            scheduled_nodes = None
            time_offset = next_arrival()
        else: # Prescribed replay
            scheduled_nodes = (jobs_df.loc[i, 'nodes']).tolist()
            
        if (gpu_trace.size > 0):
        if gpu_trace.size > 0 and time_offset >= 0:
            job_info = job_dict(nodes_required, name, cpu_trace, gpu_trace, wall_time,
                                end_state, scheduled_nodes, time_offset, job_id, priority)
            jobs.append(job_info)