Source code for heizer._source.producer

import functools
import json
from uuid import uuid4

from confluent_kafka import Message, Producer

from heizer._source import get_logger
from heizer._source.topic import HeizerTopic
from heizer.config import HeizerConfig
from heizer.types import Any, Callable, Dict, List, Optional, TypeVar, Union, cast

logger = get_logger(__name__)

R = TypeVar("R")
F = TypeVar("F", bound=Callable[..., Dict[str, str]])


def _default_serializer(value: Union[Dict[Any, Any], str, bytes]) -> bytes:
    """
    :param value:
    :return: bytes

    Default Kafka message serializer, which will encode inputs to bytes

    """
    if isinstance(value, dict):
        return json.dumps(value).encode("utf-8")
    elif isinstance(value, str):
        return value.encode("utf-8")
    elif isinstance(value, bytes):
        return value
    else:
        raise ValueError(f"Input type is not supported: {type(value).__name__}")


[docs]class producer(object): """ A decorator to create a producer """ __id__: str name: str # args config: HeizerConfig = HeizerConfig() topics: List[HeizerTopic] serializer: Callable[..., bytes] = _default_serializer call_back: Optional[Callable[..., Any]] = None default_key: Optional[str] = None default_headers: Optional[Dict[str, str]] = None key_alias: str = "key" headers_alias: str = "headers" # private properties _producer_instance: Optional[Producer] = None def __init__( self, topics: List[HeizerTopic], config: HeizerConfig = HeizerConfig(), serializer: Callable[..., bytes] = _default_serializer, call_back: Optional[Callable[..., Any]] = None, default_key: Optional[str] = None, default_headers: Optional[Dict[str, str]] = None, key_alias: Optional[str] = None, headers_alias: Optional[str] = None, name: Optional[str] = None, init_topics: bool = True, ): self.topics = topics self.config = config self.serializer = serializer self.call_back = call_back self.default_key = default_key self.default_headers = default_headers if key_alias: self.key_alias = key_alias if headers_alias: self.headers_alias = headers_alias self.__id__ = str(uuid4()) self.name = name or self.__id__ @property def _producer(self) -> Producer: if not self._producer_instance: self._producer_instance = Producer(self.config.value) return self._producer_instance def __call__(self, func: F) -> F: @functools.wraps(func) def decorator(*args: Any, **kwargs: Any) -> Any: try: result = func(*args, **kwargs) except Exception as e: raise e try: key = result.pop(self.key_alias, self.default_key) except Exception as e: logger.debug(f"Failed to get key from result. {str(e)}") key = self.default_key try: headers = result.pop(self.headers_alias, self.default_headers) except Exception as e: logger.debug( f"Failed to get headers from result. {str(e)}", ) headers = self.default_headers headers = cast(Dict[str, str], headers) msg = self.serializer(result) self._produce_message(message=msg, key=key, headers=headers) return result return cast(F, decorator) def _produce_message(self, message: bytes, key: Optional[str], headers: Optional[Dict[str, str]]) -> None: for topic in self.topics: for partition in topic.partitions: try: self._producer.poll(0) self._producer.produce( topic=topic.name, value=message, partition=partition, key=key, headers=headers, on_delivery=self.call_back, ) self._producer.flush() except Exception as e: logger.exception(f"Failed to produce msg to topic: {topic.name}", exc_info=e) raise e
def delivery_report(err: str, msg: Message) -> None: """ Called once for each message produced to indicate delivery result. Triggered by poll() or flush(). :param msg: :param err: :return: """ if err is not None: logger.error("Message delivery failed: {}".format(err)) else: logger.debug("Message delivered to {} [{}]".format(msg.topic(), msg.partition()))