Module contents
- class heizer.BaseConfig(bootstrap_servers)[source]
Base configurations
- Parameters:
bootstrap_servers (str) –
- bootstrap_servers: str
- class heizer.ConsumerConfig(bootstrap_servers, group_id, auto_offset_reset='earliest', other_configs=<factory>, enable_auto_commit=False)[source]
Configuration class for consumer.
- Parameters:
bootstrap_servers (str) –
group_id (str) –
auto_offset_reset (str) –
other_configs (Dict[str, Any]) –
enable_auto_commit (bool) –
- auto_offset_reset: str = 'earliest'
- enable_auto_commit: bool = False
- group_id: str
- other_configs: Dict[str, Any]
- class heizer.ConsumerSignal(is_running: bool = True)[source]
- Parameters:
is_running (bool) –
- is_running: bool = True
- class heizer.Message(message)[source]
- Class:
HeizerMessage
- Parameters:
message (Message) –
A utility class for parsing Kafka messages using the confluent_kafka library.
- Properties:
message: A confluent_kafka.Message object representing the Kafka message.
topic: Optional[str] - The topic of the Kafka message.
partition: int - The partition of the Kafka message.
headers: Optional[Dict[str, str]] - The headers of the Kafka message.
key: Optional[str] - The key of the Kafka message.
value: Optional[str] - The value of the Kafka message.
formatted_value: Optional[Any] - A formatted version of the value.
- formatted_value: Any | None = None
- headers: Dict[str, str] | None
- key: str | None
- partition: int
- topic: str | None
- value: str | None
- class heizer.Producer(config, serializer=<function _default_serializer>, call_back=None, name=None)[source]
Kafka Message Producer
- Parameters:
config (Dict[str, Any]) –
serializer (Callable[[...], bytes]) –
call_back (Callable[[...], Any] | None) –
name (str) –
- async async_produce(topic, value, key=None, headers=None, partition=-1, auto_flush=True)[source]
Produce a message asynchronously to a Kafka topic.
- Parameters:
topic (str | Topic) – The topic to produce the message to. Can be either a string or a Topic object.
value (bytes | str | Dict[Any, Any]) – The message body to be produced. Can be either bytes, a string, or a dictionary.
key (str | None) – The key to associate with the message. Optional.
headers (Dict[str, str] | None) – Additional headers to include with the message. Optional.
auto_flush (bool) – If set to True, automatically flush the producer after producing the message. Default is True.
partition (int | None) –
- Returns:
None
- Return type:
None
This method asynchronously produces a message to a Kafka topic. It accepts the topic, message, key, headers, and auto_flush parameters.
Example usage:
>>> producer = Producer(config={"xx": "xx"}) >>> asyncio.run(producer.async_produce(topic="my_topic", value={"k":"v"}))
- call_back: Callable[[...], Any] | None = None
- config: Dict[str, Any]
- default_delivery_report(err, msg)[source]
Called once for each message produced to indicate delivery result. Triggered by poll() or flush().
- Parameters:
msg (Message) –
err (str) –
- Returns:
- Return type:
None
- name: str
- produce(topic, value, key=None, headers=None, partition=-1, auto_flush=True)[source]
This method is used to produce messages to a Kafka topic using the confluent_kafka library.
- Parameters:
topic (str | Topic) – The topic to publish the message to. Can be either a string representing the topic name, or an instance of the Topic class.
value (bytes | str | Dict[Any, Any]) – The message body to be produced. Can be either bytes, a string, or a dictionary.
key (str | None) – Optional. The key for the message.
headers (Dict[str, str] | None) – Optional. Additional headers for the message.
auto_flush (bool) – Optional. Default is True. If True, the producer will automatically flush messages after producing them.
partition (int | None) –
- Returns:
None
- Return type:
None
Example usage:
>>> producer = Producer(config={"xx": "xx"}) >>> producer.produce(topic="my_topic", value={"k":"v"})
- serializer()
- Parameters:
value (Dict[Any, Any] | str | bytes) –
- Returns:
bytes
- Return type:
bytes
Default Kafka message serializer, which will encode inputs to bytes
- class heizer.ProducerConfig(bootstrap_servers, other_configs=<factory>)[source]
Configuration class for producer.
- Parameters:
bootstrap_servers (str) –
other_configs (Dict[str, Any]) –
- other_configs: Dict[str, Any]
- class heizer.Topic(name, partition=None, offset=None, metadata=None, leader_epoch=None, num_partitions=None, replication_factor=None, replica_assignment=None, config=None)[source]
- Parameters:
name (str) –
partition (int) –
offset (int | None) –
metadata (str | None) –
leader_epoch (int | None) –
num_partitions (int | None) –
replication_factor (int | None) –
replica_assignment (List[Any] | None) –
config (Dict[Any, Any] | None) –
- config: Dict[Any, Any] | None = None
- leader_epoch: int | None = None
- metadata: str | None = None
- name: str
- num_partitions: int | None = None
- offset: int | None = None
- partition: int
- replica_assignment: List[Any] | None = None
- replication_factor: int | None = None
- class heizer.consumer(*, topics, config, call_once=False, stopper=None, deserializer=None, is_async=False, id=None, name=None, poll_timeout=None, init_topics=False, consumer_signal=None, enable_retry=False, retry_topic=None, retry_times=1)[source]
A decorator to create a consumer
- Parameters:
topics (List[Topic]) –
config (Dict[str, Any]) –
call_once (bool) –
stopper (Callable[[...], bool] | None) –
deserializer (Callable | None) –
is_async (bool) –
id (str) –
name (str | None) –
poll_timeout (int) –
init_topics (bool) –
consumer_signal (ConsumerSignal) –
enable_retry (bool) –
retry_topic (Topic | None) –
retry_times (int) –
- call_once: bool = False
- property ck_consumer: Consumer
- property ck_producer: Producer | None
- config: Dict[str, Any]
- consumer_signal: ConsumerSignal
- deserializer: Callable | None = None
- enable_retry: bool = False
- id: str
- init_topics: bool = False
- is_async: bool = False
- name: str | None
- poll_timeout: int = 1
- async produce_retry_message(message, has_retired, func_name)[source]
Produces a retry message for a given function.
- Parameters:
message (Message) – The original message that needs to be retried.
has_retired (Optional[int]) – The number of times the function has been retried before. Defaults to None.
func_name (str) – The name of the function.
- Returns:
None
- Return type:
None
- retry_times: int = 1
- retry_times_header_key: str = '!!-heizer_func_retried_times-!!'
- stopper: Callable[[...], bool] | None = None
- heizer.create_new_topics(config, topics)[source]
Create new topics using the provided configuration.
- Parameters:
config (BaseConfig | Dict[str, Any]) –
topics (List[Topic]) –
- Return type:
None
- heizer.delete_topics(config, topics)[source]
Delete topics using the provided configuration.
- Parameters:
config (BaseConfig | Dict[str, Any]) –
topics (List[Topic]) –
- Return type:
None
- heizer.get_admin_client(config)[source]
Create an admin client using the provided configuration.
- Parameters:
config (BaseConfig | Dict[str, Any]) –
- Return type:
AdminClient
- heizer.list_topics(config)[source]
List topics using the provided configuration.
- Parameters:
config (BaseConfig | Dict[str, Any]) –
- Return type:
List[str]