Commit 34b341ba authored by Brewer, Wes's avatar Brewer, Wes
Browse files

Add fastforward feature. Note: may not be working as expected

parent 7dccd9b1
Loading
Loading
Loading
Loading
+5 −1
Original line number Diff line number Diff line
@@ -25,6 +25,7 @@ parser.add_argument('-d', '--debug', action='store_true', help='Enable debug mod
parser.add_argument('-e', '--encrypt', action='store_true', help='Encrypt any sensitive data in telemetry')
parser.add_argument('-n', '--numjobs', type=int, default=1000, help='Number of jobs to schedule')
parser.add_argument('-t', '--time', type=str, default=None, help='Length of time to simulate, e.g., 123, 123s, 27m, 3h, 7d')
parser.add_argument('-ff', '--fastforward', type=str, default=None, help='Fast-forward by time amount (uses same units as -t)')
parser.add_argument('-v', '--verbose', action='store_true', help='Enable verbose output')
parser.add_argument('--seed', action='store_true', help='Set random number seed for deterministic simulation')
parser.add_argument('-f', '--replay', nargs='+', type=str, help='Either: path/to/joblive path/to/jobprofile' + \
@@ -107,6 +108,9 @@ sc = Scheduler(TOTAL_NODES, DOWN_NODES, power_manager, flops_manager, layout_man
               cooling_model, **args_dict)

if args.replay:

    if args.fastforward: args.fastforward = convert_to_seconds(args.fastforward)

    td = Telemetry(**args_dict)

    # Try to extract date from given name to use as case directory
@@ -133,7 +137,7 @@ if args.replay:
    else:
        timesteps = int(max(job['wall_time'] + job['submit_time'] for job in jobs)) + 1

    print(f'Running simulation for {timesteps} seconds')
    print(f'Simulating {len(jobs)} jobs for {timesteps} seconds')
    time.sleep(1)

else:
+17 −15
Original line number Diff line number Diff line
@@ -28,11 +28,14 @@ def load_data_from_df(allocation_df, node_df, step_df, **kwargs):
    Loads data from pandas DataFrames and returns the extracted job info.
    """
    reschedule = kwargs.get('reschedule')
    fastforward = kwargs.get('fastforward')
    if fastforward: print(f"fast-forwarding {fastforward} seconds")

    allocation_df['begin_time'] = pd.to_datetime(allocation_df['begin_time'], format='mixed', errors='coerce')
    allocation_df['end_time'] = pd.to_datetime(allocation_df['end_time'], format='mixed', errors='coerce')

    earliest_begin_time = pd.to_datetime(allocation_df['begin_time']).min()
    print(earliest_begin_time)
    job_list = []

    for _, row in tqdm(allocation_df.iterrows(), total=len(allocation_df), desc="Processing Jobs"):
@@ -45,12 +48,8 @@ def load_data_from_df(allocation_df, node_df, step_df, **kwargs):
        # Compute GPU power
        gpu_energy = node_data['gpu_energy'].sum() # Joules
        # divide by nodes_required to get average gpu_usage per node
        #gpu_usage = node_data['gpu_usage'].sum() / 1E6 / nodes_required # seconds
        #gpu_power = gpu_energy / gpu_usage if gpu_usage > 0 else 0
        gpu_power = gpu_energy / wall_time
        if gpu_power == 0:
            continue
        else:
        gpu_power_array = np.array([gpu_power] * samples)

        gpu_min_power = nodes_required * POWER_GPU_IDLE
@@ -60,7 +59,6 @@ def load_data_from_df(allocation_df, node_df, step_df, **kwargs):

        # Compute CPU power (assuming total energy minus gpu_energy is cpu_energy)
        total_energy = node_data['energy'].sum() # Joules
        # divide by nodes_required to get average cpu_usage per node
        #cpu_usage = node_data['cpu_usage'].sum() / 1E9 / nodes_required # seconds
        cpu_energy = total_energy - gpu_energy 

@@ -80,6 +78,9 @@ def load_data_from_df(allocation_df, node_df, step_df, **kwargs):
        else:
            scheduled_nodes = get_scheduled_nodes(row['allocation_id'], node_df)
            time_offset = compute_time_offset(row['begin_time'], earliest_begin_time)
            if fastforward: time_offset -= fastforward

        if time_offset >= 0:

            job_info = job_dict(nodes_required, \
                                row['primary_job_id'], \
@@ -89,6 +90,7 @@ def load_data_from_df(allocation_df, node_df, step_df, **kwargs):
                                time_offset, \
                                row['primary_job_id'], \
                                row.get('priority', 0))

            job_list.append(job_info)

    return job_list