Commit 2032d408 authored by Brewer, Wes's avatar Brewer, Wes
Browse files

feat(scripts): add --prescreen mode to find busiest congestion windows



Fast full-dataset sweep-line scan over the entire Lassen telemetry
(2018-08-22 → 2020-11-18) that ranks the top N non-overlapping candidate
windows before committing to the heavy fat-tree routing pass.

Two scoring modes:
  --prescreen          node-count proxy (seconds, allocation CSV only)
  --prescreen --prescreen-ib  IB-weighted score (node × norm_ib_rate,
                       loads node history CSV, ~1 min)

Also adds --prescreen-window (default 7 days) and --prescreen-top (default 10).

Co-Authored-By: default avatarClaude Sonnet 4.6 <noreply@anthropic.com>
parent 6cc8eb7c
Loading
Loading
Loading
Loading
+228 −0
Original line number Diff line number Diff line
@@ -352,6 +352,196 @@ def load_records(
    return records


# ---------------------------------------------------------------------------
# Pre-screening pass
# ---------------------------------------------------------------------------

def prescreen_dataset(
    data_path: Path,
    min_nodes: int,
    window_days: int,
    top_n: int,
    use_ib: bool,
) -> None:
    """
    Fast full-dataset scan to identify the busiest windows for congestion.

    This runs entirely from ``final_csm_allocation_history_hashed.csv`` (and
    optionally ``final_csm_allocation_node_history.csv``) without building the
    fat-tree graph or running any routing.  It is designed to be run once to
    choose candidate windows before committing to the heavier full analysis.

    Algorithm
    ---------
    1.  Load the allocation history and filter to multi-node jobs
        (``num_nodes >= min_nodes``).

    2.  Build a **sweep-line** of concurrent network load using vectorised
        pandas operations:

        * For each job create two events: ``+weight`` at ``begin_ts`` and
          ``-weight`` at ``end_ts``.
        * Sort all events by timestamp and take the cumulative sum to get a
          running load signal sampled at event boundaries.
        * Re-sample onto a regular hourly grid with ``pd.merge_asof``.

        *Weight* is ``num_nodes`` by default.  With ``--prescreen-ib`` it is
        replaced by ``num_nodes × normalised_ib_tx_rate``, giving a score
        that rewards windows where many nodes are simultaneously doing heavy
        InfiniBand traffic rather than just occupying nodes idly.

    3.  Compute a **rolling mean** of the hourly load signal over
        ``window_days × 24`` hours.  The peak of this rolling mean marks the
        centre of the busiest sustained window.

    4.  Select the top-N **non-overlapping** windows using a greedy peak-
        picking pass: after choosing a window, suppress all hours within
        ``window_days`` of it before looking for the next candidate.

    5.  Print a ranked table and ready-to-paste CLI commands.

    Parameters
    ----------
    data_path:
        Directory containing the Lassen CSV files.
    min_nodes:
        Minimum job size to include in the load signal.
    window_days:
        Candidate window length in days.
    top_n:
        Number of top non-overlapping windows to report.
    use_ib:
        If True, weight the load signal by per-job IB TX rate (requires
        loading ``final_csm_allocation_node_history.csv``).
    """
    alloc_path = data_path / "final_csm_allocation_history_hashed.csv"
    print(f"[prescreen] Loading allocation history …")
    alloc_df = pd.read_csv(alloc_path, low_memory=False)

    alloc_df["begin_ts"] = pd.to_datetime(alloc_df["begin_time"], format="mixed", errors="coerce", utc=True)
    alloc_df["end_ts"]   = pd.to_datetime(alloc_df["end_time"],   format="mixed", errors="coerce", utc=True)
    alloc_df["wall_time"] = (alloc_df["end_ts"] - alloc_df["begin_ts"]).dt.total_seconds()

    filt = alloc_df[
        (alloc_df["num_nodes"] >= min_nodes) &
        alloc_df["begin_ts"].notna() &
        alloc_df["end_ts"].notna() &
        (alloc_df["wall_time"] > 0)
    ].copy()
    print(f"[prescreen] {len(filt):,} multi-node jobs (≥{min_nodes} nodes) across full dataset")
    print(f"[prescreen] Dataset spans {alloc_df['begin_ts'].min().date()}{alloc_df['end_ts'].max().date()}")

    # --- Compute per-job weight -------------------------------------------
    if use_ib:
        node_path = data_path / "final_csm_allocation_node_history.csv"
        print(f"[prescreen] Loading node history for IB weighting (slow) …")
        node_df = pd.read_csv(node_path, low_memory=False, usecols=["allocation_id", "ib_tx"])
        ib_totals = (
            node_df.groupby("allocation_id")["ib_tx"]
            .sum()
            .rename("raw_ib_tx_sum")
        )
        filt = filt.join(ib_totals, on="allocation_id", how="left")
        filt["raw_ib_tx_sum"] = filt["raw_ib_tx_sum"].fillna(0).clip(lower=0)
        # bytes/s/node (before ×4 scale — we only need relative ranking)
        filt["ib_rate"] = filt["raw_ib_tx_sum"] / filt["num_nodes"] / filt["wall_time"]
        # Normalise to [0,1] so weight ≈ num_nodes when IB is at max
        max_rate = max(float(filt["ib_rate"].quantile(0.99)), 1.0)
        filt["weight"] = filt["num_nodes"] * (filt["ib_rate"] / max_rate).clip(upper=1.0)
        score_label = "nodes × norm_IB_rate"
    else:
        filt["weight"] = filt["num_nodes"].astype(float)
        score_label = "nodes in use"

    # --- Vectorised sweep-line onto hourly grid ---------------------------
    starts = filt[["begin_ts", "weight"]].rename(columns={"begin_ts": "ts", "weight": "delta"})
    ends   = filt[["end_ts",   "weight"]].rename(columns={"end_ts":   "ts", "weight": "delta"})
    ends   = ends.copy()
    ends["delta"] = -ends["delta"]

    events = (
        pd.concat([starts, ends], ignore_index=True)
        .sort_values("ts")
        .reset_index(drop=True)
    )
    events["running"] = events["delta"].cumsum()

    # Hourly grid over full dataset lifetime
    grid_start = filt["begin_ts"].min().floor("h")
    grid_end   = filt["end_ts"].max().ceil("h")
    hourly_grid = pd.DataFrame({"ts": pd.date_range(grid_start, grid_end, freq="h")})

    hourly = pd.merge_asof(hourly_grid, events[["ts", "running"]], on="ts", direction="backward")
    hourly["running"] = hourly["running"].fillna(0).clip(lower=0)

    # Also track concurrent job count (unweighted) for the table
    starts_j = filt[["begin_ts"]].assign(delta=1).rename(columns={"begin_ts": "ts"})
    ends_j   = filt[["end_ts"]].assign(delta=-1).rename(columns={"end_ts": "ts"})
    events_j = pd.concat([starts_j, ends_j]).sort_values("ts").reset_index(drop=True)
    events_j["n_jobs"] = events_j["delta"].cumsum()
    hourly = pd.merge_asof(hourly, events_j[["ts", "n_jobs"]], on="ts", direction="backward")
    hourly["n_jobs"] = hourly["n_jobs"].fillna(0).clip(lower=0).astype(int)

    # --- Rolling mean to find busiest sustained windows -------------------
    window_hours = window_days * 24
    hourly["rolling_score"] = hourly["running"].rolling(window_hours, min_periods=1).mean()

    # --- Greedy non-overlapping peak selection ----------------------------
    scored = hourly.copy()
    candidates = []
    suppressed = pd.Series(False, index=scored.index)

    for _ in range(top_n):
        available = scored[~suppressed]
        if available.empty:
            break
        peak_idx = available["rolling_score"].idxmax()
        peak_row  = scored.loc[peak_idx]

        # The window ends at this hour (rolling mean looks back window_hours)
        win_end   = peak_row["ts"]
        win_start = win_end - pd.Timedelta(hours=window_hours - 1)

        # Stats for the window
        mask = (hourly["ts"] >= win_start) & (hourly["ts"] <= win_end)
        win_data = hourly[mask]

        candidates.append({
            "rank":         len(candidates) + 1,
            "start":        win_start.date(),
            "end":          (win_end + pd.Timedelta(hours=1)).date(),
            "avg_score":    win_data["running"].mean(),
            "peak_score":   win_data["running"].max(),
            "avg_jobs":     win_data["n_jobs"].mean(),
            "peak_jobs":    win_data["n_jobs"].max(),
        })

        # Suppress all hours within one window of this peak
        suppress_mask = (
            (scored["ts"] >= win_start - pd.Timedelta(hours=window_hours)) &
            (scored["ts"] <= win_end   + pd.Timedelta(hours=window_hours))
        )
        suppressed = suppressed | suppress_mask

    # --- Print results ----------------------------------------------------
    print(f"\n{'=' * 72}")
    print(f"  TOP {len(candidates)} BUSIEST {window_days}-DAY WINDOWS  (score = {score_label})")
    print(f"{'=' * 72}")
    print(f"  {'Rank':<5} {'Start':<12} {'End':<12} {'Avg score':>10} {'Peak score':>11} {'Avg jobs':>9} {'Peak jobs':>10}")
    print(f"  {'-'*5} {'-'*12} {'-'*12} {'-'*10} {'-'*11} {'-'*9} {'-'*10}")
    for c in candidates:
        print(
            f"  {c['rank']:<5} {str(c['start']):<12} {str(c['end']):<12} "
            f"{c['avg_score']:>10.1f} {c['peak_score']:>11.1f} "
            f"{c['avg_jobs']:>9.1f} {c['peak_jobs']:>10.0f}"
        )

    print(f"\n  Suggested --start / --end commands:")
    for c in candidates:
        print(f"    #{c['rank']:>2}  --start {c['start']} --end {c['end']}")
    print(f"{'=' * 72}\n")


# ---------------------------------------------------------------------------
# Simulation helpers
# ---------------------------------------------------------------------------
@@ -793,12 +983,50 @@ def parse_args(argv=None):
        "--verbose", "-v", action="store_true",
        help="Print per-snapshot details during the sweep.",
    )

    # --- Pre-screening options ---
    prescreen = p.add_argument_group("pre-screening options")
    prescreen.add_argument(
        "--prescreen", action="store_true",
        help=(
            "Scan the full dataset to find the busiest candidate windows, "
            "print a ranked table with ready-to-paste CLI commands, and exit. "
            "Does not require --start/--end and skips the fat-tree simulation entirely."
        ),
    )
    prescreen.add_argument(
        "--prescreen-window", type=int, default=7, metavar="DAYS",
        help="Candidate window length in days to rank. Default: 7.",
    )
    prescreen.add_argument(
        "--prescreen-top", type=int, default=10, metavar="N",
        help="Number of top non-overlapping windows to report. Default: 10.",
    )
    prescreen.add_argument(
        "--prescreen-ib", action="store_true",
        help=(
            "Weight the load signal by per-job IB TX rate instead of just node count. "
            "More accurate but requires loading the full node history CSV (~1 min extra)."
        ),
    )

    return p.parse_args(argv)


def main(argv=None):
    args = parse_args(argv)

    # --- Pre-screening mode (fast, exits before fat-tree build) -----------
    if args.prescreen:
        prescreen_dataset(
            data_path=args.data,
            min_nodes=args.min_nodes,
            window_days=args.prescreen_window,
            top_n=args.prescreen_top,
            use_ib=args.prescreen_ib,
        )
        sys.exit(0)

    # --- Time window ---
    window_start = pd.Timestamp(args.start, tz="UTC")
    window_end   = pd.Timestamp(args.end,   tz="UTC")