Websocket
Introduction
In this post, I will show you how to integrate kafka consumer with websocket server. The websocket server will send message to client when it receives message from kafka consumer. source code
Create a kafka topic topic.test. (you can create new topic in kafka-ui)
Create websocket server server.py on ws://localhost:8001. code
import asyncio import websockets from heizer import HeizerConfig, HeizerTopic, consumer consumer_config = HeizerConfig( { "bootstrap.servers": "0.0.0.0:9092", "group.id": "test", "auto.offset.reset": "earliest", } ) topics = [HeizerTopic(name="my.topic1")] @consumer(topics=topics, config=consumer_config, is_async=True) async def handler(message, websocket, *args, **kwargs): await websocket.send(message.value) async def main(): async with websockets.serve(handler, "", 8001): await asyncio.Future() if __name__ == "__main__": asyncio.run(main())
python server.py
Create websocket client client.html and listen to ws://localhost:8001. html file
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <meta http-equiv="X-UA-Compatible" content="IE=edge"> <meta name="viewport" content="width=device-width, initial-scale=1.0"> <title>WebSocker Client</title> </head> <body> <div id="msgs"></div> </body> <script> var msgs = []; const socket = new WebSocket('ws://localhost:8001'); socket.addEventListener('open', function (event) { socket.send('Connection Established'); }); const msg_box = document.getElementById("msgs"); socket.addEventListener('message', function (event) { msgs.push(event.data); console.log(event.data); msg_box.innerHTML = msgs.join("<br>"); }); </script> </html>
and open client.html in browser
Publish message to kafka topic topic.test and you will see the message in browser
You will see the message in browser