Websocket

Introduction

In this post, I will show you how to integrate kafka consumer with websocket server. The websocket server will send message to client when it receives message from kafka consumer. source code

  1. Start kafka server and install heizer

  2. Create a kafka topic topic.test. (you can create new topic in kafka-ui)

    ../_images/create-topic.png
  3. Create websocket server server.py on ws://localhost:8001. code

    import asyncio
    
    import websockets
    
    from heizer import ConsumerConfig, Message, Topic, consumer
    
    consumer_config = ConsumerConfig(
        bootstrap_servers="localhost:9092",
        group_id="websockets_sample",
        auto_offset_reset="earliest",
        enable_auto_commit=False,
    )
    
    topics = [Topic(name="my.topic1")]
    
    
    @consumer(topics=topics, config=consumer_config, is_async=True, init_topics=True, name="websocket_sample")
    async def handler(message: Message, C: consumer, websocket, *args, **kwargs):
        print(C.name)
        await websocket.send(message.value)
    
    
    async def main():
        async with websockets.serve(handler, "", 8001):
            try:
                await asyncio.Future()
            except KeyboardInterrupt:
                return
    
    
    if __name__ == "__main__":
        asyncio.run(main())
    
    python server.py
    
  4. Create websocket client client.html and listen to ws://localhost:8001. html file

    <!DOCTYPE html>
    
    <html lang="en">
    
    <head>
    
        <meta charset="UTF-8">
    
        <meta http-equiv="X-UA-Compatible" content="IE=edge">
    
        <meta name="viewport" content="width=device-width, initial-scale=1.0">
    
        <title>WebSocker Client</title>
    
    </head>
    
    <body>
    
    <div id="msgs"></div>
    
    </body>
    
    <script>
        var msgs = [];
    
        const socket = new WebSocket('ws://localhost:8001');
    
        socket.addEventListener('open', function (event) {
    
            socket.send('Connection Established');
    
        });
    
        const msg_box = document.getElementById("msgs");
    
        socket.addEventListener('message', function (event) {
            msgs.push(event.data);
            console.log(event.data);
    
            msg_box.innerHTML = msgs.join("<br>");
    
        });
    
    </script>
    
    </html>
    

    and open client.html in browser

  5. Publish message to kafka topic topic.test and you will see the message in browser

    ../_images/publish-kafka-msg.png
  6. You will see the message in browser

    ../_images/message-in-browser.png