Verified Commit 18c6bdcd authored by Hines, Jesse's avatar Hines, Jesse
Browse files

Update druid ingest script

parent ad6f3d64
Loading
Loading
Loading
Loading
+53 −25
Original line number Diff line number Diff line
#!/usr/bin/env python3
"""
Submits the replay data ingests to druid.
"""
from pathlib import Path
import urllib.parse
from typing import Any
import time, os
import pyjson5
import requests
import getpass, argparse
from loguru import logger
import orjson


class DruidApi:
@@ -34,10 +37,18 @@ class DruidApi:


def submit_ingest(druid: DruidApi, file):
    ingest = pyjson5.loads(Path(file).read_text()) # using yaml as hack to allow comments
    logger.info(f"Submitting ingest for {file}...")
    ingest = pyjson5.loads(Path(file).read_text()) # using yaml as hack to allow comments
    ingest_type = ingest['type']

    if ingest_type == "kafka":
        response = druid.request("POST", "/druid/indexer/v1/supervisor", json = ingest)
        logger.info(f"Supervisor for {file} submitted")
        logger.info(f"See {druid.url}/unified-console.html to view the streaming ingest.")
    else:
        response = druid.request("POST", "/druid/indexer/v1/task", json = ingest)
        task_id = response['task']

        logger.info(f"See {druid.url}/unified-console.html#tasks/task_id~{task_id} to view ingest progress.")
        logger.info(f"Waiting for ingest{task_id} to complete...")

@@ -53,17 +64,34 @@ def submit_ingest(druid: DruidApi, file):


if __name__ == "__main__":
    DRUID_URL = os.environ.get("DRUID_URL", "http://localhost:8888")
    DRUID_USER = os.environ.get("DRUID_USER") or None # Convert "" to None
    DRUID_PASSWORD = os.environ.get("DRUID_PASSWORD") or None
    parser = argparse.ArgumentParser(
        description = __doc__.strip(),
        formatter_class = argparse.RawDescriptionHelpFormatter,
    )
    parser.add_argument("ingests", type = Path, nargs = "*", help = "List of druid ingests")
    args = parser.parse_args()

    druid = DruidApi(DRUID_URL, DRUID_USER, DRUID_PASSWORD)
    if not args.ingests:
        ingests = sorted(Path("./druid_ingests").resolve().glob("data-*.json"))
    else:
        ingests = [Path(p).resolve() for p in args.ingests]

    submit_ingest(druid, "./druid_ingests/data-marconi100.json")
    submit_ingest(druid, "./druid_ingests/data-lassen-allocation-history.json")
    submit_ingest(druid, "./druid_ingests/data-lassen-node-history.json")
    submit_ingest(druid, "./druid_ingests/data-lassen-step-history.json")
    submit_ingest(druid, "./druid_ingests/data-fugaku.json")
    druid_url = os.environ.get("DRUID_URL")
    if not druid_url:
        druid_url = input("Druid URL (http://localhost:8888): ")
        druid_url = druid_url.strip() or "http://localhost:8888"

    logger.info("Done!")
    druid_username = os.environ.get("DRUID_USERNAME")
    if not druid_username:
        druid_username = input("Druid Username: ").strip() or None
    
    druid_password = os.environ.get("DRUID_PASSWORD")
    if not druid_password:
        druid_password = getpass.getpass("Druid Password: ").strip() or None

    druid = DruidApi(druid_url, druid_username, druid_password)

    for ingest in ingests:
        submit_ingest(druid, ingest)

    logger.info("Done!")