Streamlining Data Flows with Springboot-Kafka:A Quick Guide
Rafael Vera-Mara?ón
Data Engineer @ Minsait | Cloud Architechture | AI Integration | Data Governance
In today's fast-paced data world, Apache Kafka stands out as a powerful tool for managing real-time data streams. This guide offers a concise walkthrough for setting up a basic ( 1 Topic ) Kafka ecosystem, encompassing a producer, stream, and consumer using Spring Boot.
Github: Kafka-Springboot
CONTENT:
Step 1: KafkaProducer Setup
Step 2: Stream Processing with Kafka Streams
Step 3: Consuming Processed Data
DEPENDENCIES
KAFKA INSTALATION ON WINDOWS
Step 1: Downloading Kafka: Apache Kafka homepage
Step 2: Configuring Kafka
log.dirs=c:/kafka/kafka.logs
dataDir=C:/kafka/zookeeper-data
RUNNING ZOOKEEPER AND KAFKA SERVER UP LOCALLY
Step 1: you need to open two cmd consoles on C:\kafka\bin
Step 2: paste and run the next command in one of the cmd
.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
Step 3: paste and run the next command in the other cmd
.\bin\windows\kafka-server-start.bat .\config\server.properties
You should see something like this:
SPRINGBOOT APPS AND STREAMS MICROSERVICE SET UP
Project Modules organization :
Each application was created as separated Maven modules inside a Parent module to avoid dependencies interferencies, run them in different ports and keep them all together in a folder.
!! the next image was edited for teaching purpose, you should have all the directories needed for a springboot maven project such as test folders, pom.xml and many files more...
Step 1: KafkaProducer Setup
Begin by creating a KafkaProducer in Spring Boot. Utilize the KafkaTemplate to send messages to a Kafka topic through a REST Controller. This setup allows for easy and efficient message production to your Kafka system.
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
/**
* REST Controller that provides endpoints for interacting with a Kafka server.
* <p>
* This controller allows sending messages to the Kafka server on a specific topic via
* an HTTP GET request.
* </p>
*/
@RestController
public class KafkaProducerController {
/**
* Kafka Template that facilitates the production of messages to Kafka topics.
*/
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
/**
* The name of the topic to which the messages will be sent.
*/
private static final String TOPIC = "input_topic";
private static final Logger logger = LoggerFactory.getLogger(KafkaProducerController.class);
/**
* Endpoint to send a message to the Kafka topic.
*
* @param message The message to be sent to the Kafka topic.
* @return A string indicating that the message has been sent.
*/
@GetMapping("/send/{message}")
public String sendMessage(@PathVariable String message) {
logger.info("Sending message to topic {}: {}", TOPIC, message);
kafkaTemplate.send(TOPIC, message);
return "Message sent: " + message;
}
}
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import java.util.HashMap;
import java.util.Map;
/**
* Kafka producer configuration that provides the necessary settings and
* Kafka template for sending messages to a Kafka server.
* <p>
* This class uses Spring configuration values (e.g., from the application.properties file)
* to set specific Kafka properties.
* </p>
*/
@Configuration
public class KafkaProducerConfig {
/**
* The address of the Kafka server to which the producer should connect.
* This address is usually in the format: {@code host1:port1,host2:port2,...}.
*/
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
/**
* Creates and returns a configuration for the Kafka producer.
*
* @return A map of properties with configurations for the Kafka producer.
*/
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
/**
* Creates and returns a Kafka template that uses the producer's configuration.
* The Kafka template is used to send messages to Kafka topics.
*
* @return A configured instance of {@link KafkaTemplate}.
*/
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(producerConfigs()));
}
}
server.port=8080
spring.kafka.bootstrap-servers=localhost:9092
logging.level.root=INFO
Step 2: Stream Processing with Kafka Streams
Leverage the Kafka Streams library in Spring Boot for real-time data processing. Configure KafkaStreamsConfig to establish stream processing logic, transforming and enriching data on the fly. This component is crucial for handling complex data flows and analytics.
领英推荐
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import java.util.HashMap;
import java.util.Map;
/**
* Configuration class for Kafka Streams in a Spring application.
* <p>
* This class sets up the necessary configuration for Kafka Streams processing,
* including server details and default SerDes for keys and values.
* </p>
*/
@Configuration
@EnableKafkaStreams
public class KafkaStreamsConfig {
/**
* The bootstrap servers for the Kafka cluster.
*/
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
private static final Logger logger = LoggerFactory.getLogger(KafkaStreamsConfig.class);
/**
* Configures and provides the StreamsConfig bean.
*
* @return The configuration for the Kafka Streams application.
*/
@Bean
public StreamsConfig kStreamsConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-streams-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
return new StreamsConfig(props);
}
/**
* Defines and provides the Kafka Stream for processing.
*
* @return The Kafka Stream for processing messages.
*/
@Bean
public KStream<String, String> kStream(StreamsBuilder builder) {
Serde<String> stringSerde = Serdes.String();
KStream<String, String> stream = builder.stream("input_topic", Consumed.with(stringSerde, stringSerde));
stream.peek((key, value) -> logger.info("Received message - key: {}, value: {}", key, value))
.mapValues(value -> {
String processedValue = "Processed: " + value;
logger.info("Processing message: {}", processedValue);
return processedValue;
})
.peek((key, value) -> logger.info("Sending processed message - key: {}, value: {}", key, value))
.to("output_topic", Produced.with(stringSerde, stringSerde));
return stream;
}
}
server.port=8082
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.streams.application-id=kafka-streams-app
logging.level.root=INFO
Step 3: Consuming Processed Data
Implement a KafkaConsumer to read and act on the processed data. With KafkaListener, you can easily consume messages from Kafka Streams, allowing for responsive data handling and further processing.
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import java.util.HashMap;
import java.util.Map;
/**
* Kafka Consumer configuration for Spring Boot.
* <p>
* This class provides the necessary configurations to establish a connection with
* a Kafka server and consume messages from specific topics. It defines the basic consumer
* configurations and the factory for creating consumer listener instances.
* </p>
*/
@Configuration
public class KafkaConsumerConfig {
/**
* Address of the Kafka bootstrap servers, usually the address
* of the Kafka broker.
*/
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
/**
* Specific configurations for the Kafka consumer.
* <p>
* Defines properties such as the server address, group ID,
* and deserializer classes for message keys and values.
* </p>
*
* @return A map of consumer configurations.
*/
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
/**
* Defines the factory for creating consumer listener instances.
* <p>
* This factory is responsible for creating the containers used
* to consume messages from the Kafka server.
* </p>
*
* @return A configured instance of {@link ConcurrentKafkaListenerContainerFactory}.
*/
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
return factory;
}
}
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
/**
* Service for consuming messages from a Kafka topic and responding to them.
* <p>
* This service listens for messages from a specific topic in Kafka.
* Upon receiving a message, the service sends a response to the defined
* topic {@code TOPIC}.
* </p>
*/
@Service
public class KafkaConsumerService {
/**
* Kafka template for interacting with Kafka, enabling the sending and receiving of messages.
*/
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
/**
* Topic to which responses will be sent after consuming a message.
*/
private static final String TOPIC = "consumed_topic";
/**
* Method to consume messages from the "output_topic" topic.
* <p>
* Upon receiving a message from "output_topic", this method prints the
* consumed message and sends a response to the topic {@code TOPIC}.
* </p>
*
* @param message The message consumed from the "output_topic" topic.
*/
@KafkaListener(topics = "output_topic", groupId = "group_id")
public void consume(String message) {
System.out.println("Consumed message: " + message);
kafkaTemplate.send(TOPIC, "Response to: " + message);
}
}
server.port=8081
spring.kafka.bootstrap-servers=localhost:9092
logging.level.root=INFO
DEPENDENCIES
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="https://maven.apache.org/POM/4.0.0" xmlns:xsi="https://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="https://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.2.0</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.rvm</groupId>
<artifactId>KafkaStreamer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>KafkaStreamer</name>
<description>rvmKafkaStreamer</description>
<properties>
<java.version>17</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
HOW TO RUN THE KAFKA-BASED MICROSERVICES PROJECT
1.???? Start Kafka Environment:
Begin by launching Zookeeper and Kafka Server. Ensure they are running and accessible.
2.???? Launch Producer Application:
Start the Kafka Producer application. This service will produce messages to be sent to the Kafka topic as soon as the HTTP call is made to the defined endpoint.
3.???? Initialize Kafka Streams Application:
Run the Kafka Streams service. It listens for messages on the input topic, processes them, and then forwards them to the output topic.
4.???? Start Consumer Service:
Start the Kafka Consumer application. It listens for processed messages from the output topic and performs actions as defined in its logic.
5. Send the message to the topic
Open your browser and paste the next address to call the endpoint implemented in KafkaProducer. It will trigger the messaging process.
https://localhost:8080/send/helloWorld
LOGS
You can search for logs in each application console directly.
Or you can check the logs by DATADOG
REAL-WORLD POSSIBLE USE CASE
Imagine a scenario where a producer sends temperature readings to a Kafka topic. Kafka Streams processes these readings, filtering or averaging them, and the consumer then triggers alerts based on specific criteria. This setup is ideal for IoT applications, real-time analytics, and monitoring systems.
Kafka's architecture not only simplifies data processing pipelines but also ensures scalability and reliability, making it an excellent choice for various real-time data processing scenarios. Dive into Kafka to explore its full potential for your data needs.
#Kafka #SpringBoot #Microservices #Java #Technology #SoftwareDevelopment #Programming #DevOps #BigData #Streaming #CloudComputing #DataEngineering #SoftwareEngineering #TechCommunity #APIs #DistributedSystems #RealTimeData #DataProcessing #SoftwareArchitecture #TechInnovation Confluent The Apache Software Foundation