Basic Producer and Consumer
Note
You need spin up a Kafka server before running this example.
Create producer and consumer configurations
In [1]: from heizer import HeizerConfig, HeizerTopic, consumer, producer, HeizerMessage
In [2]: import json
In [3]: producer_config = HeizerConfig(
...: {
...: "bootstrap.servers": "localhost:9092",
...: }
...: )
...:
In [4]: consumer_config = HeizerConfig(
...: {
...: "bootstrap.servers": "localhost:9092",
...: "group.id": "default",
...: "auto.offset.reset": "earliest",
...: }
...: )
...:
Create the topic
In [5]: topics = [HeizerTopic(name="my.topic1")]
Create producer
In [6]: @producer(
...: topics=topics,
...: config=producer_config
...: )
...: def produce_data(status: str, result: str):
...: return {
...: "status": status,
...: "result": result,
...: }
...:
Publish messages
In [7]: produce_data("start", "1")
Out[7]: {'status': 'start', 'result': '1'}
In [8]: produce_data("start", "1")
Out[8]: {'status': 'start', 'result': '1'}
In [9]: produce_data("loading", "2")
Out[9]: {'status': 'loading', 'result': '2'}
In [10]: produce_data("success", "3")
Out[10]: {'status': 'success', 'result': '3'}
In [11]: produce_data("postprocess", "4")
Out[11]: {'status': 'postprocess', 'result': '4'}
Create consumer
# Heizer expects consumer stopper func return Bool type result
# For this example, consumer will stop and return value if
# `status` is `success` in msg
# If there is no stopper func, consumer will keep running forever
In [12]: def stopper(msg: HeizerMessage):
....: data = json.loads(msg.value)
....: if data["status"] == "success":
....: return True
....: return False
....:
In [13]: @consumer(
....: topics=topics,
....: config=consumer_config,
....: stopper=stopper,
....: )
....: def consume_data(message: HeizerMessage):
....: data = json.loads(message.value)
....: print(data)
....: return data["result"]
....:
In [14]: result = consume_data()
{'status': 'start', 'result': '1'}
{'status': 'start', 'result': '1'}
{'status': 'loading', 'result': '2'}
{'status': 'success', 'result': '3'}
In [15]: print("Expected Result:", result)
Expected Result: 3