Source code for heizer._source.admin

from confluent_kafka.admin import AdminClient

from heizer._source import get_logger
from heizer._source.topic import Topic
from heizer.config import BaseConfig
from heizer.types import KafkaConfig, List, Union

logger = get_logger(__name__)


[docs]def get_admin_client(config: Union[BaseConfig, KafkaConfig]) -> AdminClient: """ Create an admin client using the provided configuration. """ if isinstance(config, BaseConfig): config_dict = {"bootstrap.servers": config.bootstrap_servers} else: config_dict = config return AdminClient(config_dict)
[docs]def create_new_topics(config: Union[BaseConfig, KafkaConfig], topics: List[Topic]) -> None: """ Create new topics using the provided configuration. """ admin_client = get_admin_client(config) new_topics = [topic._new_topic for topic in topics] fs = admin_client.create_topics(new_topics) # Wait for each operation to finish. for topic, f in fs.items(): try: f.result() # The result itself is None logger.info("Topic {} created".format(topic)) except Exception as e: if "TOPIC_ALREADY_EXISTS" in str(e): logger.info(f"Topic {topic} already exists") continue else: logger.exception(f"Failed to create topic {topic}", exc_info=e)
[docs]def delete_topics(config: Union[BaseConfig, KafkaConfig], topics: List[Topic]) -> None: """Delete topics using the provided configuration.""" admin_client = get_admin_client(config) fs = admin_client.delete_topics([tp.name for tp in topics]) # Wait for each operation to finish. for topic, f in fs.items(): try: f.result() # The result itself is None logger.info("Topic {} deleted".format(topic)) except Exception as e: logger.exception(f"Failed to delete topic {topic}", exc_info=e)
[docs]def list_topics(config: Union[BaseConfig, KafkaConfig]) -> List[str]: """List topics using the provided configuration.""" admin_client = get_admin_client(config) return list(admin_client.list_topics().topics.keys())