Websocket
Introduction
In this post, I will show you how to integrate kafka consumer with websocket server. The websocket server will send message to client when it receives message from kafka consumer. source code
Create a kafka topic topic.test. (you can create new topic in kafka-ui)
Create websocket server server.py on ws://localhost:8001. code
import asyncio import websockets from heizer import ConsumerConfig, Message, Topic, consumer consumer_config = ConsumerConfig( bootstrap_servers="localhost:9092", group_id="websockets_sample", auto_offset_reset="earliest", enable_auto_commit=False, ) topics = [Topic(name="my.topic1")] @consumer(topics=topics, config=consumer_config, is_async=True, init_topics=True, name="websocket_sample") async def handler(message: Message, C: consumer, websocket, *args, **kwargs): print(C.name) await websocket.send(message.value) async def main(): async with websockets.serve(handler, "", 8001): try: await asyncio.Future() except KeyboardInterrupt: return if __name__ == "__main__": asyncio.run(main())
python server.py
Create websocket client client.html and listen to ws://localhost:8001. html file
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <meta http-equiv="X-UA-Compatible" content="IE=edge"> <meta name="viewport" content="width=device-width, initial-scale=1.0"> <title>WebSocker Client</title> </head> <body> <div id="msgs"></div> </body> <script> var msgs = []; const socket = new WebSocket('ws://localhost:8001'); socket.addEventListener('open', function (event) { socket.send('Connection Established'); }); const msg_box = document.getElementById("msgs"); socket.addEventListener('message', function (event) { msgs.push(event.data); console.log(event.data); msg_box.innerHTML = msgs.join("<br>"); }); </script> </html>
and open client.html in browser
Publish message to kafka topic topic.test and you will see the message in browser
You will see the message in browser