import asyncio
import json
from uuid import uuid4
import confluent_kafka as ck
from heizer._source import get_logger
from heizer._source.topic import Topic
from heizer.config import ProducerConfig
from heizer.types import Any, Callable, Dict, KafkaConfig, Optional, Union
logger = get_logger(__name__)
def _create_producer_config_dict(config: ProducerConfig) -> KafkaConfig:
config_dict = {"bootstrap.servers": config.bootstrap_servers}
if config.other_configs:
config_dict.update(config.other_configs)
return config_dict
def _default_serializer(value: Union[Dict[Any, Any], str, bytes]) -> bytes:
"""
:param value:
:return: bytes
Default Kafka message serializer, which will encode inputs to bytes
"""
if isinstance(value, dict) or isinstance(value, list):
return json.dumps(value).encode("utf-8")
elif isinstance(value, str):
return value.encode("utf-8")
elif isinstance(value, bytes):
return value
else:
raise ValueError(f"Input type is not supported: {type(value).__name__}")
[docs]class Producer(object):
"""
Kafka Message Producer
"""
__id__: str
name: str
# attrs need be initialized
config: KafkaConfig
serializer: Callable[..., bytes] = _default_serializer
call_back: Optional[Callable[..., Any]] = None
# private properties
_producer_instance: Optional[ck.Producer] = None
def __init__(
self,
config: Union[ProducerConfig, KafkaConfig],
serializer: Callable[..., bytes] = _default_serializer,
call_back: Optional[Callable[..., Any]] = None,
name: Optional[str] = None,
):
self.config = _create_producer_config_dict(config) if isinstance(config, ProducerConfig) else config
self.serializer = serializer
self.call_back = call_back or self.default_delivery_report
self.__id__ = str(uuid4())
self.name = name or self.__id__
def __repr__(self) -> str:
return self.name
[docs] def default_delivery_report(self, err: str, msg: ck.Message) -> None:
"""
Called once for each message produced to indicate delivery result.
Triggered by poll() or flush().
:param msg:
:param err:
:return:
"""
if err is not None:
logger.error(f"[{self}] Message delivery failed: {err}")
else:
logger.debug(f"[{self}] Message delivered to {msg.topic()} [{msg.partition()}]")
@property
def _producer(self) -> ck.Producer:
if not self._producer_instance:
self._producer_instance = ck.Producer(self.config)
return self._producer_instance
[docs] def produce(
self,
topic: Union[str, Topic],
value: Union[bytes, str, Dict[Any, Any]],
key: Optional[str] = None,
headers: Optional[Dict[str, str]] = None,
partition: Optional[int] = -1,
auto_flush: bool = True,
) -> None:
"""
This method is used to produce messages to a Kafka topic using the confluent_kafka library.
:param topic: The topic to publish the message to. Can be either a string representing the topic name,
or an instance of the Topic class.
:param value: The message body to be produced. Can be either bytes, a string, or a dictionary.
:param key: Optional. The key for the message.
:param headers: Optional. Additional headers for the message.
:param auto_flush: Optional. Default is True. If True, the producer will automatically flush messages after
producing them.
:return: None
Example usage:
>>> producer = Producer(config={"xx": "xx"})
>>> producer.produce(topic="my_topic", value={"k":"v"})
"""
return asyncio.run(
self._produce(
topic=topic, value=value, key=key, headers=headers, partition=partition, auto_flush=auto_flush
)
)
[docs] async def async_produce(
self,
topic: Union[str, Topic],
value: Union[bytes, str, Dict[Any, Any]],
key: Optional[str] = None,
headers: Optional[Dict[str, str]] = None,
partition: Optional[int] = -1,
auto_flush: bool = True,
) -> None:
"""
Produce a message asynchronously to a Kafka topic.
:param topic: The topic to produce the message to. Can be either a string or a `Topic` object.
:param value: The message body to be produced. Can be either bytes, a string, or a dictionary.
:param key: The key to associate with the message. Optional.
:param headers: Additional headers to include with the message. Optional.
:param auto_flush: If set to True, automatically flush the producer after producing the message.
Default is True.
:return: None
This method asynchronously produces a message to a Kafka topic. It accepts the topic, message, key, headers,
and auto_flush parameters.
Example usage:
>>> producer = Producer(config={"xx": "xx"})
>>> asyncio.run(producer.async_produce(topic="my_topic", value={"k":"v"}))
"""
return await self._produce(
topic=topic, value=value, key=key, headers=headers, partition=partition, auto_flush=auto_flush
)
async def _produce(
self,
topic: Union[str, Topic],
value: Union[bytes, str, Dict[Any, Any]],
key: Optional[str] = None,
headers: Optional[Dict[str, str]] = None,
partition: Optional[int] = -1,
auto_flush: bool = True,
) -> None:
topic = Topic(name=topic) if isinstance(topic, str) else topic
try:
self._producer.produce(
topic=topic.name,
value=self.serializer(value),
key=key,
headers=headers,
on_delivery=self.call_back,
partition=partition,
)
if auto_flush:
self.flush()
except ck.error.KafkaError.QUEUE_FULL:
logger.warning(f"[{self}] Local queue is full, force to flush")
self.flush()
except Exception as e:
logger.exception(f"[{self}] Failed to produce msg to topic: {topic.name}", exc_info=e)
raise e
[docs] def flush(self) -> None:
self._producer.flush()