Commit 51a1a94c authored by Hines, Jesse's avatar Hines, Jesse
Browse files

Add kafka admin client

parent bf849425
Loading
Loading
Loading
Loading
+13 −14
Original line number Diff line number Diff line
import os
from kafka import KafkaProducer, KafkaConsumer
from kafka import KafkaProducer, KafkaConsumer, KafkaAdminClient
import functools


@functools.cache
def get_kafka_producer(**configs):
def _get_kafka_config():
    env_configs = {
        # Pick-up credentials from the context
       'bootstrap_servers': [os.environ['KAFKA_BOOTSTRAP']],
@@ -14,18 +13,18 @@ def get_kafka_producer(**configs):
       'security_protocol': os.environ.get('KAFKA_SECURITY_PROTOCOL'),
    }
    env_configs = {k: v for k, v in env_configs.items() if v is not None}
    return KafkaProducer(**{**env_configs, **configs})
    return env_configs

@functools.cache
def get_kafka_producer(**configs):
    return KafkaProducer(**{**_get_kafka_config(), **configs})


@functools.cache
def get_kafka_consumer(*topics, **configs):
    env_configs = {
        # Pick-up credentials from the context
       'bootstrap_servers': [os.environ['KAFKA_BOOTSTRAP']],
       'sasl_mechanism': os.environ.get('KAFKA_SASL_MECHANISM'),
       'sasl_plain_username': os.environ.get('KAFKA_SASL_USERNAME'),
       'sasl_plain_password': os.environ.get('KAFKA_SASL_PASSWORD'),
       'security_protocol': os.environ.get('KAFKA_SECURITY_PROTOCOL'),
    }
    env_configs = {k: v for k, v in env_configs.items() if v is not None}
    return KafkaConsumer(*topics, **{**env_configs, **configs})
    return KafkaConsumer(*topics, **{**_get_kafka_config(), **configs})


@functools.cache
def get_kafka_admin(**configs):
    return KafkaAdminClient(**{**_get_kafka_config(), **configs})