Commit 0c17a3b7 authored by Brewer, Wes's avatar Brewer, Wes
Browse files

Add initial aging_boost when using --reschedule with Frontier telemetry (and...

Add initial aging_boost when using --reschedule with Frontier telemetry (and cleanup/document main.py a bit)
parent 7e4a960f
Loading
Loading
Loading
Loading
+10 −8
Original line number Diff line number Diff line
@@ -61,7 +61,8 @@ args_dict['config'] = config
flops_manager = FLOPSManager(**args_dict)

sc = Scheduler(
    power_manager = power_manager, flops_manager = flops_manager,
    power_manager=power_manager, 
    flops_manager=flops_manager,
    cooling_model=cooling_model,
    **args_dict,
)
@@ -69,7 +70,8 @@ layout_manager = LayoutManager(args.layout, scheduler = sc, debug = args.debug,

if args.replay:

    if args.fastforward: args.fastforward = convert_to_seconds(args.fastforward)
    if args.fastforward: 
        args.fastforward = convert_to_seconds(args.fastforward)

    td = Telemetry(**args_dict)

@@ -82,8 +84,8 @@ if args.replay:
        extracted_date = "Date not found"
        DIR_NAME = create_casename()

    # Read either npz file or telemetry parquet files
    if args.replay[0].endswith(".npz"):
    # Read telemetry data (either npz file or via custom data loader)
    if args.replay[0].endswith(".npz"): # replay .npz file
        print(f"Loading {args.replay[0]}...")
        jobs = td.load_snapshot(args.replay[0])

@@ -98,7 +100,7 @@ if args.replay:
                job['requested_nodes'] = None
                job['submit_time'] = next_arrival(1 / config['JOB_ARRIVAL_TIME'])

    else:
    else: # custom data loader
        print(*args.replay)
        jobs = td.load_data(args.replay)
        td.save_snapshot(jobs, filename=DIR_NAME)
@@ -113,7 +115,7 @@ if args.replay:
    print(f'Simulating {len(jobs)} jobs for {timesteps} seconds')
    time.sleep(1)

else:
else: # synthetic jobs
    wl = Workload(config)
    jobs = getattr(wl, args.workload)(num_jobs=args.numjobs)

+15 −1
Original line number Diff line number Diff line
@@ -17,6 +17,18 @@ from ..job import job_dict
from ..utils import power_to_utilization, next_arrival, encrypt


def aging_boost(nnodes):
    """Frontier aging policy as per documentation here:
       https://docs.olcf.ornl.gov/systems/frontier_user_guide.html#job-priority-by-node-count
    """
    if nnodes >= 5645:
        return 8*24*3600 # seconds
    elif nnodes >= 1882:
        return 4*24*3600
    else:
        return 0


def load_data(files, **kwargs):
    """
    Reads job and job profile data from parquet files and parses them.
@@ -133,15 +145,17 @@ def load_data_from_df(jobs_df: pd.DataFrame, jobprofile_df: pd.DataFrame, **kwar
        if reschedule: # Let the scheduler reschedule the jobs
            scheduled_nodes = None
            time_offset = next_arrival(1/config['JOB_ARRIVAL_TIME'])
            priority = aging_boost(nodes_required)
        else: # Prescribed replay
            scheduled_nodes = []
            priority = 0 # not used for replay
            for xname in xnames:
                indices = xname_to_index(xname, config)
                scheduled_nodes.append(indices)

        if gpu_trace.size > 0 and (jid == job_id or jid == '*') and time_offset > 0:
            job_info = job_dict(nodes_required, name, cpu_trace, gpu_trace, [], [], wall_time, 
                                end_state, scheduled_nodes, time_offset, job_id)
                                end_state, scheduled_nodes, time_offset, job_id, priority)
            jobs.append(job_info)

    return jobs