Source code for heizer._source.admin

from confluent_kafka.admin import AdminClient, NewTopic

from heizer._source import get_logger
from heizer._source.topic import HeizerTopic
from heizer.config import HeizerConfig
from heizer.types import List

logger = get_logger(__name__)


[docs]def get_admin_client(config: HeizerConfig) -> AdminClient: return AdminClient({"bootstrap.servers": config.value.get("bootstrap.servers")})
[docs]def create_new_topic(admin_client: AdminClient, topics: List[HeizerTopic]) -> None: new_topics = [NewTopic(topic.name, num_partitions=len(topic._partitions)) for topic in topics] fs = admin_client.create_topics(new_topics) # Wait for each operation to finish. for topic, f in fs.items(): try: f.result() # The result itself is None logger.info("Topic {} created".format(topic)) except Exception as e: if "TOPIC_ALREADY_EXISTS" in str(e): logger.info(f"Topic {topic} already exists") continue else: logger.exception(f"Failed to create topic {topic}", exc_info=e)