Commit 55d02ceb authored by Brewer, Wes's avatar Brewer, Wes
Browse files

Shift jobs relative to earliest submitted time

parent f5b9fc41
Loading
Loading
Loading
Loading
+92 −52
Original line number Diff line number Diff line
@@ -4,6 +4,7 @@ import csv
import datetime
import pandas as pd
import warnings
from tqdm import tqdm
from raps.job import job_dict, Job

DATE_FORMAT_STR = "%Y-%m-%d %H:%M:%S"
@@ -66,70 +67,110 @@ def load_data(files, **kwargs):
    with open(job_file) as f:
        job_log = json.load(f)

    jobs = []
    # --- First pass: find earliest submit time ---
    earliest_submit = None
    for raw in job_log:
        submitted = raw.get("submitted_time")
        if submitted is None or submitted == "None":
            continue

        # Philly uses either string dates or epoch ints
        if isinstance(submitted, (int, float)):
            t = int(submitted)
        else:
            t = parse_date(submitted).timestamp()

        if earliest_submit is None or t < earliest_submit:
            earliest_submit = t

    if earliest_submit is None:
        raise ValueError("No valid submitted_time found in Philly traces")

    # --- Second pass: build jobs ---
    jobs = []
    for raw in tqdm(job_log, desc="Building Job objects"):
        jobid = raw.get("jobid")
        user = raw.get("user")
        status = raw.get("status")
        submitted = parse_date(raw.get("submitted_time"))

        # Submitted time
        submitted = raw.get("submitted_time")
        if isinstance(submitted, (int, float)):
            submitted = datetime.datetime.fromtimestamp(int(submitted))
        else:
            submitted = parse_date(submitted)

        attempts = raw.get("attempts", [])
        start, end = None, None
        if attempts:
            start = parse_date(attempts[0].get("start_time"))
            end = parse_date(attempts[-1].get("end_time"))
            st = attempts[0].get("start_time")
            et = attempts[-1].get("end_time")

            if isinstance(st, (int, float)):
                start = datetime.datetime.fromtimestamp(int(st))
            elif st:
                start = parse_date(st)

            if isinstance(et, (int, float)):
                end = datetime.datetime.fromtimestamp(int(et))
            elif et:
                end = parse_date(et)

        wall_time = None
        if start and end:
            wall_time = (end - start).total_seconds()

        # Which machines did this job run on?
        machine_ids = []
        gpus = 0
        machine_ids, gpus = [], 0
        if attempts and "detail" in attempts[0]:
            for detail in attempts[0]["detail"]:
                mid = detail["ip"]
                machine_ids.append(mid)
                gpus += len(detail.get("gpus", []))

        # Collect utilization traces for each machine this job touched
        # Collect utilization traces
        job_cpu = cpu_util[cpu_util["machine_id"].isin(machine_ids)]
        job_gpu = gpu_util[gpu_util["machineId"].isin(machine_ids)]

        print("***", len(machine_ids), machine_ids)

        if machine_ids:
            # Shift times relative to earliest_submit
            submit_time = submitted.timestamp() - earliest_submit if submitted else None
            start_time = start.timestamp() - earliest_submit if start else None
            end_time = end.timestamp() - earliest_submit if end else None

            if not submit_time or not start_time or not end_time:
                warnings.warn(
                    f"skipped {jobid} b/c missing submit_time, start_time, or end_time",
                    UserWarning
                )
       

            if submit_time and start_time and end_time: 

                job = job_dict(
                # Core identity
                    id=jobid,
                    name=f"philly-{jobid}",
                account=user if user else "unknown",   # Philly log has user
                    account=user if user else "unknown",

                # Partition & priority
                nodes_required=len(machine_ids) if machine_ids else 0,
                    nodes_required=len(machine_ids),
                    partition=0,
                    priority=0,

                # Resource requests
                cpu_cores_required=0,    # Philly logs don’t track cores
                gpu_units_required=gpus, # we can count GPUs from attempts
                    cpu_cores_required=0,
                    gpu_units_required=gpus,
                    allocated_cpu_cores=0,
                    allocated_gpu_units=gpus,

                # State
                    end_state=status,

                # Traces
                    cpu_trace=job_cpu if not job_cpu.empty else None,
                    gpu_trace=job_gpu if not job_gpu.empty else None,
                    ntx_trace=None,
                    nrx_trace=None,

                # Timing
                submit_time=submitted.timestamp() if submitted else 0,
                start_time=start.timestamp() if start else 0,
                end_time=end.timestamp() if end else 0,
                    submit_time=submit_time,
                    start_time=start_time,
                    end_time=end_time,
                    time_limit=0,
                    expected_run_time=wall_time if wall_time else 0,
                    current_run_time=0,
@@ -140,7 +181,6 @@ def load_data(files, **kwargs):
                    trace_missing_values=False,
                    downscale=1
                )
            print(job)
                jobs.append(Job(job))

    return jobs