Module contents

class heizer.BaseConfig(bootstrap_servers)[source]

Base configurations

Parameters:

bootstrap_servers (str) –

bootstrap_servers: str
class heizer.ConsumerConfig(bootstrap_servers, group_id, auto_offset_reset='earliest', other_configs=<factory>, enable_auto_commit=False)[source]

Configuration class for consumer.

Parameters:
  • bootstrap_servers (str) –

  • group_id (str) –

  • auto_offset_reset (str) –

  • other_configs (Dict[str, Any]) –

  • enable_auto_commit (bool) –

auto_offset_reset: str = 'earliest'
enable_auto_commit: bool = False
group_id: str
other_configs: Dict[str, Any]
class heizer.ConsumerSignal(is_running: bool = True)[source]
Parameters:

is_running (bool) –

is_running: bool = True
stop()[source]
Return type:

None

class heizer.Message(message)[source]
Class:

HeizerMessage

Parameters:

message (Message) –

A utility class for parsing Kafka messages using the confluent_kafka library.

Properties:
  • message: A confluent_kafka.Message object representing the Kafka message.

  • topic: Optional[str] - The topic of the Kafka message.

  • partition: int - The partition of the Kafka message.

  • headers: Optional[Dict[str, str]] - The headers of the Kafka message.

  • key: Optional[str] - The key of the Kafka message.

  • value: Optional[str] - The value of the Kafka message.

  • formatted_value: Optional[Any] - A formatted version of the value.

formatted_value: Any | None = None
headers: Dict[str, str] | None
key: str | None
partition: int
topic: str | None
value: str | None
class heizer.Producer(config, serializer=<function _default_serializer>, call_back=None, name=None)[source]

Kafka Message Producer

Parameters:
  • config (Dict[str, Any]) –

  • serializer (Callable[[...], bytes]) –

  • call_back (Callable[[...], Any] | None) –

  • name (str) –

async async_produce(topic, value, key=None, headers=None, partition=-1, auto_flush=True)[source]

Produce a message asynchronously to a Kafka topic.

Parameters:
  • topic (str | Topic) – The topic to produce the message to. Can be either a string or a Topic object.

  • value (bytes | str | Dict[Any, Any]) – The message body to be produced. Can be either bytes, a string, or a dictionary.

  • key (str | None) – The key to associate with the message. Optional.

  • headers (Dict[str, str] | None) – Additional headers to include with the message. Optional.

  • auto_flush (bool) – If set to True, automatically flush the producer after producing the message. Default is True.

  • partition (int | None) –

Returns:

None

Return type:

None

This method asynchronously produces a message to a Kafka topic. It accepts the topic, message, key, headers, and auto_flush parameters.

Example usage:

>>> producer = Producer(config={"xx": "xx"})
>>> asyncio.run(producer.async_produce(topic="my_topic", value={"k":"v"}))
call_back: Callable[[...], Any] | None = None
config: Dict[str, Any]
default_delivery_report(err, msg)[source]

Called once for each message produced to indicate delivery result. Triggered by poll() or flush().

Parameters:
  • msg (Message) –

  • err (str) –

Returns:

Return type:

None

flush()[source]
Return type:

None

name: str
produce(topic, value, key=None, headers=None, partition=-1, auto_flush=True)[source]

This method is used to produce messages to a Kafka topic using the confluent_kafka library.

Parameters:
  • topic (str | Topic) – The topic to publish the message to. Can be either a string representing the topic name, or an instance of the Topic class.

  • value (bytes | str | Dict[Any, Any]) – The message body to be produced. Can be either bytes, a string, or a dictionary.

  • key (str | None) – Optional. The key for the message.

  • headers (Dict[str, str] | None) – Optional. Additional headers for the message.

  • auto_flush (bool) – Optional. Default is True. If True, the producer will automatically flush messages after producing them.

  • partition (int | None) –

Returns:

None

Return type:

None

Example usage:

>>> producer = Producer(config={"xx": "xx"})
>>> producer.produce(topic="my_topic", value={"k":"v"})
serializer()
Parameters:

value (Dict[Any, Any] | str | bytes) –

Returns:

bytes

Return type:

bytes

Default Kafka message serializer, which will encode inputs to bytes

class heizer.ProducerConfig(bootstrap_servers, other_configs=<factory>)[source]

Configuration class for producer.

Parameters:
  • bootstrap_servers (str) –

  • other_configs (Dict[str, Any]) –

other_configs: Dict[str, Any]
class heizer.Topic(name, partition=None, offset=None, metadata=None, leader_epoch=None, num_partitions=None, replication_factor=None, replica_assignment=None, config=None)[source]
Parameters:
  • name (str) –

  • partition (int) –

  • offset (int | None) –

  • metadata (str | None) –

  • leader_epoch (int | None) –

  • num_partitions (int | None) –

  • replication_factor (int | None) –

  • replica_assignment (List[Any] | None) –

  • config (Dict[Any, Any] | None) –

config: Dict[Any, Any] | None = None
leader_epoch: int | None = None
metadata: str | None = None
name: str
num_partitions: int | None = None
offset: int | None = None
partition: int
replica_assignment: List[Any] | None = None
replication_factor: int | None = None
class heizer.consumer(*, topics, config, call_once=False, stopper=None, deserializer=None, is_async=False, id=None, name=None, poll_timeout=None, init_topics=False, consumer_signal=None, enable_retry=False, retry_topic=None, retry_times=1)[source]

A decorator to create a consumer

Parameters:
  • topics (List[Topic]) –

  • config (Dict[str, Any]) –

  • call_once (bool) –

  • stopper (Callable[[...], bool] | None) –

  • deserializer (Callable | None) –

  • is_async (bool) –

  • id (str) –

  • name (str | None) –

  • poll_timeout (int) –

  • init_topics (bool) –

  • consumer_signal (ConsumerSignal) –

  • enable_retry (bool) –

  • retry_topic (Topic | None) –

  • retry_times (int) –

call_once: bool = False
property ck_consumer: Consumer
property ck_producer: Producer | None
commit(asynchronous=False, *arg, **kwargs)[source]
Parameters:

asynchronous (bool) –

config: Dict[str, Any]
consumer_signal: ConsumerSignal
deserializer: Callable | None = None
enable_retry: bool = False
id: str
init_topics: bool = False
is_async: bool = False
name: str | None
poll_timeout: int = 1
async produce_retry_message(message, has_retired, func_name)[source]

Produces a retry message for a given function.

Parameters:
  • message (Message) – The original message that needs to be retried.

  • has_retired (Optional[int]) – The number of times the function has been retried before. Defaults to None.

  • func_name (str) – The name of the function.

Returns:

None

Return type:

None

retry_times: int = 1
retry_times_header_key: str = '!!-heizer_func_retried_times-!!'
retry_topic: Topic | None = None
stopper: Callable[[...], bool] | None = None
topics: List[Topic]
heizer.create_new_topics(config, topics)[source]

Create new topics using the provided configuration.

Parameters:
Return type:

None

heizer.delete_topics(config, topics)[source]

Delete topics using the provided configuration.

Parameters:
Return type:

None

heizer.get_admin_client(config)[source]

Create an admin client using the provided configuration.

Parameters:

config (BaseConfig | Dict[str, Any]) –

Return type:

AdminClient

heizer.list_topics(config)[source]

List topics using the provided configuration.

Parameters:

config (BaseConfig | Dict[str, Any]) –

Return type:

List[str]

heizer.read_consumer_status(consumer_id=None)[source]
Parameters:

consumer_id (str | None) –

Return type:

Dict[str, Any]

heizer.write_consumer_status(consumer_id, status, pid, consumer_name=None)[source]
Parameters:
  • consumer_id (str) –

  • status (ConsumerStatusEnum) –

  • pid (int) –

  • consumer_name (str | None) –

Return type:

None