Basic Producer and Consumer

Note

You need spin up a Kafka server before running this example.

  1. Create producer and consumer configurations

In [1]: from heizer import HeizerConfig, HeizerTopic, consumer, producer, HeizerMessage

In [2]: import json

In [3]: producer_config = HeizerConfig(
   ...:     {
   ...:         "bootstrap.servers": "localhost:9092",
   ...:     }
   ...: )
   ...: 

In [4]: consumer_config = HeizerConfig(
   ...:     {
   ...:         "bootstrap.servers": "localhost:9092",
   ...:         "group.id": "default",
   ...:         "auto.offset.reset": "earliest",
   ...:     }
   ...: )
   ...: 
  1. Create the topic

In [5]: topics = [HeizerTopic(name="my.topic1.consumer.example")]
  1. Create producer

In [6]: @producer(
   ...:     topics=topics,
   ...:     config=producer_config,
   ...:     key_alias="key",
   ...:     headers_alias="headers",
   ...: )
   ...: def produce_data(status: str, result: str):
   ...:     return {
   ...:         "status": status,
   ...:         "result": result,
   ...:         "key": "my_key",
   ...:         "headers": {"my_header": "my_header_value"},
   ...:     }
   ...: 
  1. Publish messages

In [7]: produce_data("start", "1")
Out[7]: {'status': 'start', 'result': '1'}

In [8]: produce_data("loading", "2")
Out[8]: {'status': 'loading', 'result': '2'}

In [9]: produce_data("success", "3")
Out[9]: {'status': 'success', 'result': '3'}

In [10]: produce_data("postprocess", "4")
Out[10]: {'status': 'postprocess', 'result': '4'}
  1. Create consumer

# Heizer expects consumer stopper func return Bool type result
# For this example, consumer will stop and return value if
# `status` is `success` in msg
# If there is no stopper func, consumer will keep running forever
In [11]: def stopper(msg: HeizerMessage):
   ....:     data = json.loads(msg.value)
   ....:     if data["status"] == "success":
   ....:         return True
   ....:     return False
   ....: 

In [12]: @consumer(
   ....:     topics=topics,
   ....:     config=consumer_config,
   ....:     stopper=stopper,
   ....: )
   ....: def consume_data(message: HeizerMessage):
   ....:     data = json.loads(message.value)
   ....:     print(data)
   ....:     print(message.key)
   ....:     print(message.headers)
   ....:     return data["result"]
   ....: 

In [13]: result = consume_data()
{'status': 'postprocess', 'result': '4', 'key': 'my_key', 'headers': {'my_header': 'my_header_value'}}
my_key
{'my_header': 'my_header_value'}
{'status': 'start', 'result': '1'}
my_key
{'my_header': 'my_header_value'}
{'status': 'loading', 'result': '2'}
my_key
{'my_header': 'my_header_value'}
{'status': 'success', 'result': '3'}
my_key
{'my_header': 'my_header_value'}

In [14]: print("Expected Result:", result)
Expected Result: 3