Kafka Consumer in .NET

Kafka sample consumer in .NET C#

Introduction

Kafka is a Producer-Subscriber model messaging platform and in this article Kafka producer in .NET, we have demonstrated how to create a very simple kafka producer service in .NET. Now it’s time to create a simple Kafka Consumer in .NET and C# to complete the learning. Assuming that we already have completed the necessary setup on docker and created a simple producer, we are going to consume the messages produced by our producer service.

Creating the .NET console application

Open Visual Studio and create another console application as we did in previous article. For this example also I have created a console application on top of .NET 7.  

Then install the Confluent kafka nuget package.

NuGet\Install-Package Confluent.Kafka -Version 1.9.3

Program.cs (here also i’m just using top level statements)


using Confluent.Kafka;

var config = new ConsumerConfig
{
    GroupId = "samplemessage-consumer",
    BootstrapServers = "localhost:9092",
    AutoOffsetReset = AutoOffsetReset.Latest
};

using (var consumer = new ConsumerBuilder<Null, string>(config).Build())
{
    consumer.Subscribe("kafkaproducer.samplemessage");
    Console.WriteLine("Consumer started listening....");

    try
    {
        while (true)
        {
            var response = consumer.Consume();
            var message = response.Message;
            if (response is not null && !string.IsNullOrEmpty(message.Value))
            {
                Console.WriteLine($"Message: {message.Value}");
                Console.WriteLine($"created at {TimeZoneInfo.ConvertTime(DateTimeOffset.FromUnixTimeMilliseconds(message.Timestamp.UnixTimestampMs), TimeZoneInfo.Local)}");
                Console.WriteLine($"received at {DateTime.Now}");
                Console.WriteLine("-------------------");
            }
        }
    }
    catch (KafkaException ex)
    {
        Console.WriteLine($"Kafka exception: {ex.Message}");
    }
    catch (Exception ex)
    {
        Console.WriteLine(ex.Message);
    }
}

Here we have subscribed to the same topic "kafkaproducer.samplemessage" created in the Producer example. consumer.Subscribe("kafkaproducer.samplemessage");

Then we consume the messages here var response = consumer.Consume(); 

I’m using a while loop just to keep this application as much as simple. In real world applications we can create a hosted mode service to do that.

Please note that, Kafka returns the created time stamp in Unix format in UTC and we have to convert it to local date time just to show in console.

Testing Kafka Consumer in .NET

Now we can run both Producer and Consumer apps at the same time to send and receive messages through kafka.

producer-consumer

We can see the messages with created time and received time.

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.