Kafka Producer And Consumer In Spring Boot
Ahmed Abdelaziz
Sr Software Engineer Team Leader at TAMM | Java Champion ??| Java and Web technologies Expert |Solutions Architect | Spring Boot Expert | Microservices | Databases | Expert in and Sprinklr, GENESYS and CISCO CC solutions
Appache Kafka is a distributed event streaming platform that is widely used for handling real-time data streams in large-scale applications.
Why Use Kafka?
There are many reasons to use kafka, but the main reasons are: Real-time Data Streaming: Kafka efficiently and reliably transfers real-time data between applications and systems, making it perfect for handling events, logs, metrics, and other time-sensitive data.
When to use Kafka?
Stream Processing: Kafka is commonly used in stream processing applications where real-time data needs to be ingested, processed, and distributed to different components of the system.
you can read more about kafka here. also you can download the whole sample code in my Github repository here.
what do we need:
here is the steps to run Kafka server after downloading it: to run the kafka you have to run the ZooKeeper server first
open the CMD in the downloaded folder and run the following command:
.\bin\windows\zookeeper-server-start.bat config\zookeeper.properties
after making sure that zookeper is up, open another CMD in the downloaded folder and paste the command below:
.\bin\windows\kafka-server-start.bat config\server.properties
Now the kafka server is ready. now let’s create a producer using SpringBoot:
1- first setup a SpringBoot project and add Kafka dependency to the pom.xml or the gradle file:
pom.xml:
<dependencies>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
// Other dependencies
</dependencies>
Gradle:
dependencies {
// Other dependencies
implementation 'org.springframework.kafka:spring-kafka'
}
2- config the kafka properties.
领英推荐
just add these properties including broker address and producer settings in the application.properties:
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
3- create the kafkaProducer componenet, it will be responsible to send the message to the kafka topic:
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
@Component
public class KafkaProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
private final String TOPIC_NAME= "any-topic-name"; // Replace with your desired topic name
public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendMessage(String message) {
kafkaTemplate.send(TOPIC_NAME, message);
System.out.println("Message " + message +
" has been sucessfully sent to the topic: " + TOPIC_NAME);
}
}
4- now you can use the producer to send message to the kafka:
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class MessageController {
private final KafkaProducer kafkaProducer;
public MessageController(KafkaProducer kafkaProducer) {
this.kafkaProducer = kafkaProducer;
}
@PostMapping("/send")
public void sendMessageToKafka(@RequestBody String message) {
kafkaProducer.sendMessage(message);
}
}
Now when you created the producer successfully, it’s time to create the consumer to consume the message you produced:
1- create a spring Boot project same as the previous, Similarly add the same kafka dependency and config kafk consumer in application.properties:
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=your-consumer-group-id
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
2- Create a KafkaConsumer component in your Spring Boot application. This component will be responsible for receiving and processing messages from the Kafka topic.
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class KafkaConsumer {
@KafkaListener(topics = "any-topic-name", groupId = "your-consumer-group-id")
public void consumeMessage(String message) {
System.out.println("Received message: " + message);
}
}
Remember to name the consumer topic same as the name that you set for the producer, because In Kafka, the producer and consumer are decoupled entities. The producer is responsible for sending messages to a specific topic, while the consumer reads messages from a specific topic. The topic acts as a logical channel or category to which messages are published and from which they are consumed.
That’s it, In this simple project, we created a producer that when a POST request is made to the /send endpoint with a JSON payload containing a message field, the message will be sent to the Kafka topic specified in the topicName variable within the KafkaProducer component, and consume the exact published message by consumer. so make sure that both modules are up. Let’s test our system with the Postman:
Now when we check the producer console, we have to see the success message as follow:
Simultaneously, when we check the consumer console, we should have encountered the successful message of reading the message from the topic:
Conclusion:
This article demonstrated how to implement a Kafka producer and consumer using Spring Boot. Spring Kafka’s integration simplified the process, allowing efficient communication between components. Consistent topic naming enhances clarity and aligns with Domain-Driven Design principles. Mastering this approach empowers developers to build scalable, event-driven systems. Embracing Kafka’s benefits drives innovation and efficiency in modern applications.