Commit efcc5204 authored by Brewer, Wes's avatar Brewer, Wes
Browse files

Fix BlueWaters dataloader: PBS job ID parsing, walltime field, remove filter



- Parse PBS job ID positionally from semicolon-delimited log header
  (field 3: "MM/DD/YYYY HH:MM:SS;E;6335144.bw;key=value...") instead of
  regex searching for jobid=/job_id=/Job_Id= which never appear in Torque
  accounting logs. All GT jobs previously got sequential IDs (1, 2, 3...)
  from Job._get_next_id(); now they get the real PBS IDs (e.g. 6335144.bw).

- Change wall_time regex from resources_used.walltime (actual) to
  Resource_List.walltime (requested limit), matching what REDI extracts
  for the time_limit field. Fixes time_limit MAE in validation.

- Remove filter_str / eval(filter_str) traffic filter support. The
  filter: "traffic > 1e8" in experiments/bluewaters.yaml was accidentally
  keeping the same 18 jobs as the full parse, but obscured the root cause.

- experiments/bluewaters.yaml: remove filter, fix start date to ISO format
  (20170328 was parsed as Unix timestamp → 1970-08-22), disable
  simulate_network (torus topology crashes on unconstrained job set).

Result: BlueWaters validation now uses id_aligned comparison (9 shared
jobs, corr=1.000, MAE=0). Remaining 9/18 GT-only jobs reflect the
date-window mismatch (RAPS logs by completion date, REDI filters by start
date) — not a bug.

Co-Authored-By: default avatarClaude Sonnet 4.6 <noreply@anthropic.com>
parent 9ec32af3
Loading
Loading
Loading
Loading
+2 −3
Original line number Diff line number Diff line
system: bluewaters
replay:
  - /opt/data/bluewaters
start: "20170328"
simulate_network: True
filter: "traffic > 1e8"
start: "2017-03-28"
simulate_network: False
+9 −13
Original line number Diff line number Diff line
@@ -138,7 +138,6 @@ def extract_nodes_from_line(hosts_field: str):


PATS = {
    "id": re.compile(r"\b(jobid|job_id|Job_Id)[:=]\s*([^\s,]+)", re.I),
    "name": re.compile(r"\b(jobname)[:=]\s*([^\s,]+)", re.I),
    "account": re.compile(r"\b(account)[:=]\s*([^\s,]+)", re.I),
    # Nodes: use Resource_List.nodect or unique_node_count
@@ -153,13 +152,19 @@ PATS = {
    "submit_time": re.compile(r"\bqtime=([0-9]+)", re.I),
    "start_time": re.compile(r"\bstart=([0-9]+)", re.I),
    "end_time": re.compile(r"\bend=([0-9]+)", re.I),
    # Walltime used
    "wall_time": re.compile(r"resources_used\.walltime=(\d{2}:\d{2}:\d{2})", re.I),
    # Requested walltime limit (used for time_limit field)
    "wall_time": re.compile(r"Resource_List\.walltime=(\d{2}:\d{2}:\d{2})", re.I),
}


def _parse_line(line: str, debug=False):
    rec = {}
    # PBS/Torque accounting lines are semicolon-delimited:
    #   MM/DD/YYYY HH:MM:SS;E;6335144.bw;key=value ...
    # The job ID is always the third field (index 2).
    parts = line.split(';')
    if len(parts) >= 3:
        rec['id'] = parts[2].strip()
    for key, pat in PATS.items():
        m = pat.search(line)
        if m:
@@ -187,7 +192,6 @@ def load_data(local_dataset_path, **kwargs):
    # TODO: support multiple day replay
    day = start.strftime("%Y%m%d")
    fp = root / "torque_logs" / day
    filter_str = kwargs.get("filter")
    debug = kwargs.get("debug")

    jobs_raw = []
@@ -331,14 +335,6 @@ def load_data(local_dataset_path, **kwargs):
            trace_missing_values=False,
        )

        if filter_str:
            traffic = (avg_tx_per_node + avg_rx_per_node) / 2.
            keep_jobs = eval(filter_str)
            print(job_d["id"], filter_str, traffic, keep_jobs)
        else:
            keep_jobs = True

        if keep_jobs:
        jobs.append(Job(job_d))

    # Normalize times so first start = 0