Basic Producer and Consumer
Note
You need spin up a Kafka server before running this example.
Create producer and consumer configurations
In [1]: from heizer import HeizerConfig, HeizerTopic, consumer, producer, HeizerMessage
In [2]: import json
In [3]: producer_config = HeizerConfig(
...: {
...: "bootstrap.servers": "localhost:9092",
...: }
...: )
...:
In [4]: consumer_config = HeizerConfig(
...: {
...: "bootstrap.servers": "localhost:9092",
...: "group.id": "default",
...: "auto.offset.reset": "earliest",
...: }
...: )
...:
Create the topic
In [5]: topics = [HeizerTopic(name="my.topic1.consumer.example")]
Create producer
In [6]: @producer(
...: topics=topics,
...: config=producer_config,
...: key_alias="key",
...: headers_alias="headers",
...: )
...: def produce_data(status: str, result: str):
...: return {
...: "status": status,
...: "result": result,
...: "key": "my_key",
...: "headers": {"my_header": "my_header_value"},
...: }
...:
Publish messages
In [7]: produce_data("start", "1")
Out[7]:
{'status': 'start',
'result': '1',
'key': 'my_key',
'headers': {'my_header': 'my_header_value'}}
In [8]: produce_data("loading", "2")
Out[8]:
{'status': 'loading',
'result': '2',
'key': 'my_key',
'headers': {'my_header': 'my_header_value'}}
In [9]: produce_data("success", "3")
Out[9]:
{'status': 'success',
'result': '3',
'key': 'my_key',
'headers': {'my_header': 'my_header_value'}}
In [10]: produce_data("postprocess", "4")
Out[10]:
{'status': 'postprocess',
'result': '4',
'key': 'my_key',
'headers': {'my_header': 'my_header_value'}}
Create consumer
# Heizer expects consumer stopper func return Bool type result
# For this example, consumer will stop and return value if
# `status` is `success` in msg
# If there is no stopper func, consumer will keep running forever
In [11]: def stopper(msg: HeizerMessage):
....: data = json.loads(msg.value)
....: if data["status"] == "success":
....: return True
....: return False
....:
In [12]: @consumer(
....: topics=topics,
....: config=consumer_config,
....: stopper=stopper,
....: )
....: def consume_data(message: HeizerMessage):
....: data = json.loads(message.value)
....: print(data)
....: print(message.key)
....: print(message.headers)
....: return data["result"]
....:
In [13]: result = consume_data()
{'status': 'start', 'result': '1', 'key': 'my_key', 'headers': {'my_header': 'my_header_value'}}
my_key
{'my_header': 'my_header_value'}
{'status': 'loading', 'result': '2', 'key': 'my_key', 'headers': {'my_header': 'my_header_value'}}
my_key
{'my_header': 'my_header_value'}
{'status': 'success', 'result': '3', 'key': 'my_key', 'headers': {'my_header': 'my_header_value'}}
my_key
{'my_header': 'my_header_value'}
In [14]: print("Expected Result:", result)
Expected Result: 3