from confluent_kafka.admin import AdminClient
from heizer._source import get_logger
from heizer._source.topic import Topic
from heizer.config import BaseConfig
from heizer.types import KafkaConfig, List, Union
logger = get_logger(__name__)
[docs]def get_admin_client(config: Union[BaseConfig, KafkaConfig]) -> AdminClient:
"""
Create an admin client using the provided configuration.
"""
if isinstance(config, BaseConfig):
config_dict = {"bootstrap.servers": config.bootstrap_servers}
else:
config_dict = config
return AdminClient(config_dict)
[docs]def create_new_topics(config: Union[BaseConfig, KafkaConfig], topics: List[Topic]) -> None:
"""
Create new topics using the provided configuration.
"""
admin_client = get_admin_client(config)
new_topics = [topic._new_topic for topic in topics]
fs = admin_client.create_topics(new_topics)
# Wait for each operation to finish.
for topic, f in fs.items():
try:
f.result() # The result itself is None
logger.info("Topic {} created".format(topic))
except Exception as e:
if "TOPIC_ALREADY_EXISTS" in str(e):
logger.info(f"Topic {topic} already exists")
continue
else:
logger.exception(f"Failed to create topic {topic}", exc_info=e)
[docs]def delete_topics(config: Union[BaseConfig, KafkaConfig], topics: List[Topic]) -> None:
"""Delete topics using the provided configuration."""
admin_client = get_admin_client(config)
fs = admin_client.delete_topics([tp.name for tp in topics])
# Wait for each operation to finish.
for topic, f in fs.items():
try:
f.result() # The result itself is None
logger.info("Topic {} deleted".format(topic))
except Exception as e:
logger.exception(f"Failed to delete topic {topic}", exc_info=e)
[docs]def list_topics(config: Union[BaseConfig, KafkaConfig]) -> List[str]:
"""List topics using the provided configuration."""
admin_client = get_admin_client(config)
return list(admin_client.list_topics().topics.keys())