Kafka Producer And Consumer In Spring Boot

Kafka Producer And Consumer In Spring Boot

Appache Kafka is a distributed event streaming platform that is widely used for handling real-time data streams in large-scale applications.

Why Use Kafka?

There are many reasons to use kafka, but the main reasons are: Real-time Data Streaming: Kafka efficiently and reliably transfers real-time data between applications and systems, making it perfect for handling events, logs, metrics, and other time-sensitive data.

When to use Kafka?

Stream Processing: Kafka is commonly used in stream processing applications where real-time data needs to be ingested, processed, and distributed to different components of the system.

you can read more about kafka here. also you can download the whole sample code in my Github repository here.

what do we need:

  • JDK 11+
  • kafak (we used kafka 2.8.0 in this example, Download Link)
  • Spring Boot
  • Proper Idea(we used intellij here)

here is the steps to run Kafka server after downloading it: to run the kafka you have to run the ZooKeeper server first

open the CMD in the downloaded folder and run the following command:

.\bin\windows\zookeeper-server-start.bat config\zookeeper.properties        

after making sure that zookeper is up, open another CMD in the downloaded folder and paste the command below:

.\bin\windows\kafka-server-start.bat config\server.properties        

Now the kafka server is ready. now let’s create a producer using SpringBoot:

1- first setup a SpringBoot project and add Kafka dependency to the pom.xml or the gradle file:

pom.xml:

<dependencies>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>

    // Other dependencies

</dependencies>        

Gradle:

dependencies {
    // Other dependencies
    implementation 'org.springframework.kafka:spring-kafka'
}        

2- config the kafka properties.

just add these properties including broker address and producer settings in the application.properties:

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer        

3- create the kafkaProducer componenet, it will be responsible to send the message to the kafka topic:

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Component
public class KafkaProducer {

    private final KafkaTemplate<String, String> kafkaTemplate;
    private final String TOPIC_NAME= "any-topic-name"; // Replace with your desired topic name

    public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void sendMessage(String message) {
        kafkaTemplate.send(TOPIC_NAME, message);
        System.out.println("Message " + message +
             " has been sucessfully sent to the topic: " + TOPIC_NAME);
    }
}        

4- now you can use the producer to send message to the kafka:

import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class MessageController {

    private final KafkaProducer kafkaProducer;

    public MessageController(KafkaProducer kafkaProducer) {
        this.kafkaProducer = kafkaProducer;
    }

    @PostMapping("/send")
    public void sendMessageToKafka(@RequestBody String message) {
        kafkaProducer.sendMessage(message);
    }
}        

Now when you created the producer successfully, it’s time to create the consumer to consume the message you produced:

1- create a spring Boot project same as the previous, Similarly add the same kafka dependency and config kafk consumer in application.properties:

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=your-consumer-group-id
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer        

2- Create a KafkaConsumer component in your Spring Boot application. This component will be responsible for receiving and processing messages from the Kafka topic.

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class KafkaConsumer {

    @KafkaListener(topics = "any-topic-name", groupId = "your-consumer-group-id")
    public void consumeMessage(String message) {
        System.out.println("Received message: " + message);
    }
}        
Remember to name the consumer topic same as the name that you set for the producer, because In Kafka, the producer and consumer are decoupled entities. The producer is responsible for sending messages to a specific topic, while the consumer reads messages from a specific topic. The topic acts as a logical channel or category to which messages are published and from which they are consumed.

That’s it, In this simple project, we created a producer that when a POST request is made to the /send endpoint with a JSON payload containing a message field, the message will be sent to the Kafka topic specified in the topicName variable within the KafkaProducer component, and consume the exact published message by consumer. so make sure that both modules are up. Let’s test our system with the Postman:

Now when we check the producer console, we have to see the success message as follow:

Simultaneously, when we check the consumer console, we should have encountered the successful message of reading the message from the topic:

Conclusion:

This article demonstrated how to implement a Kafka producer and consumer using Spring Boot. Spring Kafka’s integration simplified the process, allowing efficient communication between components. Consistent topic naming enhances clarity and aligns with Domain-Driven Design principles. Mastering this approach empowers developers to build scalable, event-driven systems. Embracing Kafka’s benefits drives innovation and efficiency in modern applications.

要查看或添加评论,请登录

Ahmed Abdelaziz的更多文章

  • Java 23

    Java 23

    New and updated Java language features, core API, and the JVM – Java 23 packs it all – for new Java developers to…

  • Docker with Spring Boot in a simple way

    Docker with Spring Boot in a simple way

    This guide walks you through the process of building a Docker image for running a Spring Boot application. We start…

  • Quarkus Framework and Comparison with Spring Boot

    Quarkus Framework and Comparison with Spring Boot

    In this article, we’ll give an overview of the Quarkus framework and compare it with Spring Boot – the most popular…

  • Spring AI

    Spring AI

    Spring AI supports ChatGPT, the AI language model by OpenAI. ChatGPT has been instrumental in sparking interest in…

  • Aspect Oriented Programming and AOP in Spring Framework

    Aspect Oriented Programming and AOP in Spring Framework

    Aspect-Oriented Programming (AOP) is a programming paradigm that aims to increase modularity by allowing the separation…

    2 条评论
  • Spring Boot Caching with Redis

    Spring Boot Caching with Redis

    Spring Boot Cache Providers Cache providers allow us to transparently and clearly configure the cache in the…

  • The DispatcherServlet

    The DispatcherServlet

    The Engine of Request Handling in Spring Boot. 1.

  • Spring Data with MongoDB

    Spring Data with MongoDB

    1. Overview This article will be a quick and practical introduction to Spring Data MongoDB.

  • Transactionality

    Transactionality

    By default, methods inherited from inherit the transactional configuration from . For read operations, the transaction…

  • What is the use of @Qualifier annotation in Spring?

    What is the use of @Qualifier annotation in Spring?

    he use of @Qualifier is to support the @Autowired annotation when it needs help. Typically, @Autowired can implicitly…

社区洞察

其他会员也浏览了