Commit 7f6d9035 authored by Hines, Jesse's avatar Hines, Jesse
Browse files

Create topics on boot in dev mode

parent 51a1a94c
Loading
Loading
Loading
Loading
+18 −3
Original line number Diff line number Diff line
""" A simple REST API for triggering and querying the results from the digital twin """
from pathlib import Path
import subprocess, asyncio, functools, os, json
import asyncio, functools, os, json
from contextlib import asynccontextmanager
from starlette.exceptions import HTTPException
from starlette.requests import Request
@@ -15,6 +15,8 @@ from loguru import logger
from ..util.druid import submit_ingest
from .service import cleanup_jobs
from .config import AppSettings, get_app_settings, get_druid_engine, get_kafka_producer
from ..util.kafka import get_kafka_admin
from kafka.admin import NewTopic

settings = AppSettings()

@@ -47,11 +49,24 @@ async def lifespan(api: FastAPI):
        )

    if settings.env == 'dev':
        kafka_admin = get_kafka_admin()
        kafka_admin.create_topics([
            NewTopic("svc-event-exadigit-sim", 1, 1),
            NewTopic("svc-ts-exadigit-schedulersimsystem", 1, 1),
            NewTopic("svc-event-exadigit-schedulersimjob", 1, 1),
            NewTopic("svc-ts-exadigit-coolingsimcdu", 1, 1),
            NewTopic("svc-ts-exadigit-coolingsimcep", 1, 1),
            NewTopic("svc-ts-exadigit-jobpowerhistory", 1, 1),
        ])

        druid_ingests_dir = Path(__file__).parent.parent.parent.resolve() / 'druid_ingests'
        ingests = [
            "cooling-sim-cdu", "cooling-sim-cep", "scheduler-job-power-history",
            "scheduler-sim-job", "scheduler-sim-system",
            "sim",
            "scheduler-sim-system",
            "scheduler-sim-job",
            "cooling-sim-cdu",
            "cooling-sim-cep",
            "scheduler-job-power-history",
        ]

        for ingest in ingests: