Commit cbaa9989 authored by Hines, Jesse's avatar Hines, Jesse
Browse files

Use confluent_kafka client

This is significantly faster
parent 4f06747a
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -19,7 +19,6 @@ dependencies = [
    "loguru==0.7.3",
    "SQLAlchemy==2.0.43",
    "pydruid==0.6.9",
    "kafka-python==2.2.15",
    "python-snappy==0.7.3",
    "jsonpath-ng==1.7.0",
    "fastapi==0.116.2",
@@ -30,6 +29,7 @@ dependencies = [
    "elasticsearch-dbapi==0.2.11",
    "requests==2.32.5",
    "orjson==3.11.3",
    "confluent_kafka==2.11.1",
    "raps@{root:uri}/raps",
]

+2 −2
Original line number Diff line number Diff line
@@ -4,7 +4,7 @@ from pydantic import StringConstraints
from pydantic_settings import BaseSettings, SettingsConfigDict
from fastapi import Depends
import sqlalchemy as sqla
from kafka import KafkaProducer
from confluent_kafka import Producer
from ..util.kafka import get_kafka_producer as _get_kafka_producer
from ..util.druid import get_druid_engine as _get_druid_engine

@@ -43,7 +43,7 @@ DruidDep = A[sqla.Engine, Depends(get_druid_engine)]

@functools.cache
def get_kafka_producer(): return _get_kafka_producer()
KafkaProducerDep = A[KafkaProducer, Depends(get_kafka_producer)]
KafkaProducerDep = A[Producer, Depends(get_kafka_producer)]


class AppDeps_(NamedTuple):
+11 −11
Original line number Diff line number Diff line
@@ -16,7 +16,7 @@ from ..util.druid import submit_ingest
from .service import cleanup_jobs
from .config import AppSettings, get_app_settings, get_druid_engine, get_kafka_producer
from ..util.kafka import get_kafka_admin
from kafka.admin import NewTopic
from confluent_kafka.admin import NewTopic

settings = AppSettings()

@@ -50,18 +50,18 @@ async def lifespan(api: FastAPI):

    if settings.env == 'dev':
        kafka_admin = get_kafka_admin()
        existing_topics = set(kafka_admin.list_topics())
        topic_configs = {"compression.type": "snappy"}
        existing_topics = {t.topic for t in kafka_admin.list_topics().topics}
        topic_config = {"compression.type": "snappy"}
        new_topics = [
            NewTopic("svc-event-exadigit-sim", 1, 1, topic_configs = topic_configs),
            NewTopic("svc-ts-exadigit-schedulersimsystem", 4, 1, topic_configs = topic_configs),
            NewTopic("svc-event-exadigit-schedulersimjob", 2, 1, topic_configs = topic_configs),
            NewTopic("svc-ts-exadigit-coolingsimcdu", 4, 1, topic_configs = topic_configs),
            NewTopic("svc-ts-exadigit-coolingsimcep", 2, 1, topic_configs = topic_configs),
            NewTopic("svc-ts-exadigit-jobpowerhistory", 4, 1, topic_configs = topic_configs),
            NewTopic("svc-event-exadigit-sim", 1, 1, config = topic_config),
            NewTopic("svc-ts-exadigit-schedulersimsystem", 4, 1, config = topic_config),
            NewTopic("svc-event-exadigit-schedulersimjob", 2, 1, config = topic_config),
            NewTopic("svc-ts-exadigit-coolingsimcdu", 4, 1, config = topic_config),
            NewTopic("svc-ts-exadigit-coolingsimcep", 2, 1, config = topic_config),
            NewTopic("svc-ts-exadigit-jobpowerhistory", 4, 1, config = topic_config),
        ]
        new_topics = [t for t in new_topics if t.name not in existing_topics]
        logger.info(f"Creating kafka topics {', '.join(t.name for t in new_topics)}")
        new_topics = [t for t in new_topics if t.topic not in existing_topics]
        logger.info(f"Creating kafka topics {', '.join(t.topic for t in new_topics)}")
        kafka_admin.create_topics(new_topics)

        druid_ingests_dir = Path(__file__).parent.parent.parent.resolve() / 'druid_ingests'
+2 −4
Original line number Diff line number Diff line
@@ -69,7 +69,7 @@ def run_simulation(sim_config: ServerSimConfig, deps: AppDeps):
        config = sim_config.model_dump(mode = 'json'),
    )
    logger.info(f"Launching simulation {sim.id}")
    deps.kafka_producer.send("svc-event-exadigit-sim", value = sim.serialize_for_druid())
    deps.kafka_producer.produce("svc-event-exadigit-sim", sim.serialize_for_druid())
    deps.kafka_producer.flush()

    if 'KUBERNETES_SERVICE_HOST' in os.environ: # We're running on k8s
@@ -174,9 +174,7 @@ def cleanup_jobs(druid_engine, kafka_producer):
            sim.execution_end = now
            sim.error_messages = "Simulation crashed"
            logger.warning(f"Marking stuck sim {sim.id} as failed")
            kafka_producer.send("svc-event-exadigit-sim",
                value = sim.serialize_for_druid()
            )
            kafka_producer.produce("svc-event-exadigit-sim", sim.serialize_for_druid())

        for sim in stuck_sims:
            stmt = (
+11 −9
Original line number Diff line number Diff line
@@ -52,19 +52,21 @@ def run_simulation_serialized(sim: Sim) -> Iterable[dict[str, list[bytes]]]:


def write_sim_to_kafka(sim: Sim):
    kafka_producer = get_kafka_producer(
        linger_ms = 2 * 1000,
        batch_size = 65536,
        compression_type = "snappy",
    )
    kafka_producer = get_kafka_producer({
        'bootstrap.servers': os.environ['KAFKA_BOOTSTRAP'],
        'linger.ms': 2 * 1000,
        'batch.size': 65536,
        "compression.type": "snappy",
    })

    try:
        for data in run_simulation_serialized(sim):
            # kafka_producer does its own buffering of output so we don't need to worry about batching
            for topic, rows in data.items():
                for row in rows:
                    kafka_producer.send(topic=topic, value=row)
            for topic, messages in data.items():
                for message in messages:
                    kafka_producer.produce(topic, message)
    finally:
        kafka_producer.close()
        kafka_producer.flush()


def write_sim_to_disk(sim: Sim, dest: str):
Loading