Streamlining Data Flows with Springboot-Kafka:A Quick Guide
Springboot v3.2.0 | Maven 4.0.0 | JDK 17 | Spring-Kafka 3.1.0 | Spring-Kafka-Streams 3.6.0 | Intellij IDEA 2022.3.2

Streamlining Data Flows with Springboot-Kafka:A Quick Guide

https://medium.com/@jr.vera.ma/streamlining-data-flows-with-springboot-kafka-a-quick-guide-e21df72c713f

In today's fast-paced data world, Apache Kafka stands out as a powerful tool for managing real-time data streams. This guide offers a concise walkthrough for setting up a basic ( 1 Topic ) Kafka ecosystem, encompassing a producer, stream, and consumer using Spring Boot.

Kafka -Streams workflow (the simple project in this article was made with Topic 1 from the producer only)

Github: Kafka-Springboot


CONTENT:

  • KAFKA INSTALATION ON WINDOWS
  • RUNNING ZOOKEEPER AND KAFKA SERVER UP LOCALLY
  • SPRINGBOOT APPS AND STREAMS MICROSERVICE SET UP

Step 1: KafkaProducer Setup

Step 2: Stream Processing with Kafka Streams

Step 3: Consuming Processed Data

DEPENDENCIES

  • HOW TO RUN THE KAFKA-BASED MICROSERVICES PROJECT
  • LOGS
  • REAL-WORLD POSSIBLE USE CASE



KAFKA INSTALATION ON WINDOWS

Step 1: Downloading Kafka: Apache Kafka homepage

  • Download the latest version available
  • Select the binary download. In this case: Scala 2.13 - kafka_2.13-3.6.1.tgz (asc, sha512)
  • Extract the folder in C:\kafka_2.13-3.6.1 (to ease the environment variables management)
  • Change the folder name to C:\kakfa

Step 2: Configuring Kafka

log.dirs=c:/kafka/kafka.logs         
server.properties

dataDir=C:/kafka/zookeeper-data        
zookeeper.properties


RUNNING ZOOKEEPER AND KAFKA SERVER UP LOCALLY

Step 1: you need to open two cmd consoles on C:\kafka\bin

Step 2: paste and run the next command in one of the cmd

.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties         

Step 3: paste and run the next command in the other cmd

.\bin\windows\kafka-server-start.bat .\config\server.properties        

You should see something like this:

both Zookeeper (on the left) and Kafka Server (on the right) up and running


SPRINGBOOT APPS AND STREAMS MICROSERVICE SET UP

Project Modules organization :

Each application was created as separated Maven modules inside a Parent module to avoid dependencies interferencies, run them in different ports and keep them all together in a folder.

!! the next image was edited for teaching purpose, you should have all the directories needed for a springboot maven project such as test folders, pom.xml and many files more...

EDITED PROJECT MODULES ORGANIZATION


Step 1: KafkaProducer Setup

Begin by creating a KafkaProducer in Spring Boot. Utilize the KafkaTemplate to send messages to a Kafka topic through a REST Controller. This setup allows for easy and efficient message production to your Kafka system.


  • KakfaProducerController:

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;

/**
 * REST Controller that provides endpoints for interacting with a Kafka server.
 * <p>
 * This controller allows sending messages to the Kafka server on a specific topic via
 * an HTTP GET request.
 * </p>
 */


@RestController
public class KafkaProducerController {


    /**
     * Kafka Template that facilitates the production of messages to Kafka topics.
     */


    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    /**
     * The name of the topic to which the messages will be sent.
     */


    private static final String TOPIC = "input_topic";

    private static final Logger logger = LoggerFactory.getLogger(KafkaProducerController.class);


    /**
     * Endpoint to send a message to the Kafka topic.
     *
     * @param message The message to be sent to the Kafka topic.
     * @return A string indicating that the message has been sent.
     */


    @GetMapping("/send/{message}")
    public String sendMessage(@PathVariable String message) {
        logger.info("Sending message to topic {}: {}", TOPIC, message);
        kafkaTemplate.send(TOPIC, message);
        return "Message sent: " + message;
    }
}
        


  • KafkaProducerConfig

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;

import java.util.HashMap;
import java.util.Map;

/**
 * Kafka producer configuration that provides the necessary settings and
 * Kafka template for sending messages to a Kafka server.
 * <p>
 * This class uses Spring configuration values (e.g., from the application.properties file)
 * to set specific Kafka properties.
 * </p>
 */

@Configuration
public class KafkaProducerConfig {

    /**
     * The address of the Kafka server to which the producer should connect.
     * This address is usually in the format: {@code host1:port1,host2:port2,...}.
     */


    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;


    /**
     * Creates and returns a configuration for the Kafka producer.
     *
     * @return A map of properties with configurations for the Kafka producer.
     */


    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }


    /**
     * Creates and returns a Kafka template that uses the producer's configuration.
     * The Kafka template is used to send messages to Kafka topics.
     *
     * @return A configured instance of {@link KafkaTemplate}.
     */


    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(producerConfigs()));
    }
}        


server.port=8080
spring.kafka.bootstrap-servers=localhost:9092
logging.level.root=INFO        


Step 2: Stream Processing with Kafka Streams

Leverage the Kafka Streams library in Spring Boot for real-time data processing. Configure KafkaStreamsConfig to establish stream processing logic, transforming and enriching data on the fly. This component is crucial for handling complex data flows and analytics.


  • KafkaStreamsConfig

import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafkaStreams;

import java.util.HashMap;
import java.util.Map;


/**
 * Configuration class for Kafka Streams in a Spring application.
 * <p>
 * This class sets up the necessary configuration for Kafka Streams processing,
 * including server details and default SerDes for keys and values.
 * </p>
 */


@Configuration
@EnableKafkaStreams
public class KafkaStreamsConfig {

    /**
     * The bootstrap servers for the Kafka cluster.
     */


    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    private static final Logger logger = LoggerFactory.getLogger(KafkaStreamsConfig.class);


    /**
     * Configures and provides the StreamsConfig bean.
     *
     * @return The configuration for the Kafka Streams application.
     */


    @Bean
    public StreamsConfig kStreamsConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-streams-app");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        return new StreamsConfig(props);
    }


    /**
     * Defines and provides the Kafka Stream for processing.
     *
     * @return The Kafka Stream for processing messages.
     */


    @Bean
    public KStream<String, String> kStream(StreamsBuilder builder) {
        Serde<String> stringSerde = Serdes.String();
        KStream<String, String> stream = builder.stream("input_topic", Consumed.with(stringSerde, stringSerde));

        stream.peek((key, value) -> logger.info("Received message - key: {}, value: {}", key, value))
                .mapValues(value -> {
                    String processedValue = "Processed: " + value;
                    logger.info("Processing message: {}", processedValue);
                    return processedValue;
                })
                .peek((key, value) -> logger.info("Sending processed message - key: {}, value: {}", key, value))
                .to("output_topic", Produced.with(stringSerde, stringSerde));

        return stream;
    }
}        

server.port=8082
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.streams.application-id=kafka-streams-app
logging.level.root=INFO        


Step 3: Consuming Processed Data

Implement a KafkaConsumer to read and act on the processed data. With KafkaListener, you can easily consume messages from Kafka Streams, allowing for responsive data handling and further processing.


  • KafkaConsumerConfig

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

import java.util.HashMap;
import java.util.Map;


/**
 * Kafka Consumer configuration for Spring Boot.
 * <p>
 * This class provides the necessary configurations to establish a connection with
 * a Kafka server and consume messages from specific topics. It defines the basic consumer
 * configurations and the factory for creating consumer listener instances.
 * </p>
 */


@Configuration
public class KafkaConsumerConfig {

    /**
     * Address of the Kafka bootstrap servers, usually the address
     * of the Kafka broker.
     */


    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    /**
     * Specific configurations for the Kafka consumer.
     * <p>
     * Defines properties such as the server address, group ID,
     * and deserializer classes for message keys and values.
     * </p>
     *
     * @return A map of consumer configurations.
     */


    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }


    /**
     * Defines the factory for creating consumer listener instances.
     * <p>
     * This factory is responsible for creating the containers used
     * to consume messages from the Kafka server.
     * </p>
     *
     * @return A configured instance of {@link ConcurrentKafkaListenerContainerFactory}.
     */


    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
        return factory;
    }
}        


  • KafkaConsumerService

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

/**
 * Service for consuming messages from a Kafka topic and responding to them.
 * <p>
 * This service listens for messages from a specific topic in Kafka.
 * Upon receiving a message, the service sends a response to the defined
 * topic {@code TOPIC}.
 * </p>
 */


@Service
public class KafkaConsumerService {

    /**
     * Kafka template for interacting with Kafka, enabling the sending and receiving of messages.
     */

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    /**
     * Topic to which responses will be sent after consuming a message.
     */


    private static final String TOPIC = "consumed_topic";


    /**
     * Method to consume messages from the "output_topic" topic.
     * <p>
     * Upon receiving a message from "output_topic", this method prints the
     * consumed message and sends a response to the topic {@code TOPIC}.
     * </p>
     *
     * @param message The message consumed from the "output_topic" topic.
     */


    @KafkaListener(topics = "output_topic", groupId = "group_id")
    public void consume(String message) {
        System.out.println("Consumed message: " + message);
        kafkaTemplate.send(TOPIC, "Response to: " + message);
    }
}        


server.port=8081
spring.kafka.bootstrap-servers=localhost:9092
logging.level.root=INFO        


DEPENDENCIES

  • pom.xml (the same pom works for each application)the kafkaStreams dependency is only needed by KafkaStreamer app.

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="https://maven.apache.org/POM/4.0.0" xmlns:xsi="https://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="https://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>3.2.0</version>
		<relativePath/> <!-- lookup parent from repository -->
	</parent>
	<groupId>com.rvm</groupId>
	<artifactId>KafkaStreamer</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<name>KafkaStreamer</name>
	<description>rvmKafkaStreamer</description>
	<properties>
		<java.version>17</java.version>
	</properties>
	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka-streams</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka-test</artifactId>
			<scope>test</scope>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>

</project>        


HOW TO RUN THE KAFKA-BASED MICROSERVICES PROJECT

1.???? Start Kafka Environment:

Begin by launching Zookeeper and Kafka Server. Ensure they are running and accessible.

2.???? Launch Producer Application:

Start the Kafka Producer application. This service will produce messages to be sent to the Kafka topic as soon as the HTTP call is made to the defined endpoint.

3.???? Initialize Kafka Streams Application:

Run the Kafka Streams service. It listens for messages on the input topic, processes them, and then forwards them to the output topic.

4.???? Start Consumer Service:

Start the Kafka Consumer application. It listens for processed messages from the output topic and performs actions as defined in its logic.

5. Send the message to the topic

Open your browser and paste the next address to call the endpoint implemented in KafkaProducer. It will trigger the messaging process.

https://localhost:8080/send/helloWorld        


LOGS

You can search for logs in each application console directly.

  • KafkaProducer:the log doesnot use to show directly at the end of the console output. Seek for it.

KafkaProducer logs during the process

  • KafkaStreamer

KafkaStreamer logs during the process

  • KafkaConsumer

KafkaConsumer logs during the process

Or you can check the logs by DATADOG



REAL-WORLD POSSIBLE USE CASE

Imagine a scenario where a producer sends temperature readings to a Kafka topic. Kafka Streams processes these readings, filtering or averaging them, and the consumer then triggers alerts based on specific criteria. This setup is ideal for IoT applications, real-time analytics, and monitoring systems.

Kafka's architecture not only simplifies data processing pipelines but also ensures scalability and reliability, making it an excellent choice for various real-time data processing scenarios. Dive into Kafka to explore its full potential for your data needs.



#Kafka #SpringBoot #Microservices #Java #Technology #SoftwareDevelopment #Programming #DevOps #BigData #Streaming #CloudComputing #DataEngineering #SoftwareEngineering #TechCommunity #APIs #DistributedSystems #RealTimeData #DataProcessing #SoftwareArchitecture #TechInnovation Confluent The Apache Software Foundation


要查看或添加评论,请登录

Rafael Vera-Mara?ón的更多文章

社区洞察

其他会员也浏览了