Kafka Streams Spring Boot Demo

Kafka Streams Spring Boot Demo

Introduction

This is part of a series of articles focussing on Kafka Streams. The?first article?gave an introduction to the Kafka Streams API and its architecture, benefits and usage. This article details the accompanying Spring Boot application that demonstrates stateless and stateful message processing using the API.

Spring Boot Application

For this a demo a Spring Boot application has been created that simulates receiving payment events from Kafka and processes these payments including currency conversion using stateless KStream processors. The application tracks the account balances by aggregating the payment amounts using a stateful KTable processor, using RocksDB as the state store. The payments are then emitted to the relevant outbound rails topic. The rails topics relate to the bank rails that would then pick up the payments to make the actual money transfers. The code repository is?here.

Topology

The Kafka Streams topology is as follows:

No alt text provided for this image

Figure 1: Payment Kafka Streams topology

Configuration

Spring handles the wiring of the Kafka Streams components and their configuration, with the following Kafka Streams configuration defined in KafkaStreamsDemoConfiguration?being wired in:

@EnableKafkaStreams
public class KafkaStreamsDemoConfiguration {

   @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
   public KafkaStreamsConfiguration kafkaStreamsConfig(@Value("${kafka.bootstrap-servers}") final String bootstrapServers) {
       Map props = new HashMap<>();
       props.put(APPLICATION_ID_CONFIG, "kafka-streams-demo");
       props.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
  props.put(DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
       props.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass().getName());
       return new KafkaStreamsConfiguration(props);
   }        

Processing Flow

The processors are defined in?PaymentTopology. This Spring component is wired with a StreamsBuilder:

@Autowired
public void buildPipeline(StreamsBuilder streamsBuilder) {        

The PaymentTopology then defines the following processing flow that consumed events will pass through:

  • Payment events are received from the inbound payment-topic.

KStream messageStream = streamsBuilder
   .stream(properties.getPaymentInboundTopic(), Consumed.with(STRING_SERDE, PaymentSerdes.serdes()))        

  • A custom serdes class?PaymentSerdes?(for serializing/deserializing) is provided here as the inbound JSON must be deserialized into a PaymentEvent PoJO.

public static Serde serdes() {
   JsonSerializer serializer = new JsonSerializer<>();
   JsonDeserializer deserializer = new JsonDeserializer<>(PaymentEvent.class);
   return Serdes.serdeFrom(serializer, deserializer);
}        

  • Any payments that use unsupported rails are filtered out.

.filter((key, payment) -> SUPPORTED_RAILS.contains(payment.getRails()))        

  • The payments are branched into two streams, one for GBP payments, and one for USD payments.

KStream[] currenciesBranches = messageStream.branch(
   (key, payment) -> payment.getCurrency().equals(Currency.GBP.name()),
   (key, payment) -> payment.getCurrency().equals(Currency.USD.name())
);        

  • The USD payments are transformed to GBP payments by applying the FX rate.

KStream fxStream = currenciesBranches[1].mapValues(
   // Use mapValues() as we are transforming the payment, but not changing the key.
   (payment) -> {
       // Perform FX conversion.
       double usdToGbpRate = 0.8;
       PaymentEvent transformedPayment = PaymentEvent.builder()
               .paymentId(payment.getPaymentId())
               .amount(Math.round(payment.getAmount() * usdToGbpRate))
               .currency(Currency.GBP.name())
               .fromAccount(payment.getFromAccount())
               .toAccount(payment.getToAccount())
               .rails(payment.getRails())
               .build();
       return transformedPayment;
   });        

  • The branched streams are merged back into one.

KStream mergedStreams = currenciesBranches[0].merge(fxStream)        

Two things now occur:

1.

  • The stream is branched by the payments’ bank rails.

KStream[] railsBranches = mergedStreams.branch(
   (key, payment) -> payment.getRails().equals(Rails.BANK_RAILS_FOO.name()),
   (key, payment) -> payment.getRails().equals(Rails.BANK_RAILS_BAR.name()));        

  • The payment events are serialized and emitted to their respective outbound bank rails topics.

railsBranches[0].to(properties.getRailsFooOutboundTopic(), Produced.with(STRING_SERDE, PaymentSerdes.serdes()));
railsBranches[1].to(properties.getRailsBarOutboundTopic(), Produced.with(STRING_SERDE, PaymentSerdes.serdes()));        

2.

  • A KTable is used to aggregate the payment balances by the fromAccount name.

mergedStreams
   .map((key, payment) -> new KeyValue<>(payment.getFromAccount(), payment.getAmount()))
   .groupByKey(Grouped.with(STRING_SERDE, LONG_SERDE))
   .aggregate(new Initializer() {
       @Override
       public Long apply() {
           return 0L;
       }
   }, new Aggregator() {
       @Override
       public Long apply(final String key, final Long value, final Long aggregate) {
           return aggregate + value;
       }
   }, Materialized.with(STRING_SERDE, LONG_SERDE).as("balance"));        

State Store Query

A REST endpoint in?BalanceController?is exposed to allow clients to query the current balance by account:

@GetMapping("/balance/{account}")
public ResponseEntity getAccountBalance(@PathVariable String account) {
   KafkaStreams kafkaStreams = factoryBean.getKafkaStreams();
   ReadOnlyKeyValueStore balances = kafkaStreams.store(
           StoreQueryParameters.fromNameAndType("balance", QueryableStoreTypes.keyValueStore())
   );
   return ResponseEntity.ok(balances.get(account));
}        

Testing

The Kafka Streams API provides a?org.apache.kafka.streams.TopologyTestDriver?which is used to test the topology in a unit test. An example test utilising the driver is PaymentTopologyTest. This and other means to fully test a Kafka Streams application will be explored in a follow-on article.

Topology Description

Spring’s StreamsBuilderFactory Bean provides an option to describe the topology. In?TopologyController?this has been exposed as a REST endpoint to query.

@GetMapping("/topology")
public ResponseEntity getTopology() {
   return ResponseEntity.ok(factoryBean.getTopology().describe().toString());
}        

The topology description String in the REST response can be plugged into the tool at?https://zz85.github.io/kafka-streams-viz/?to give a visual representation of the topology. Here is a part of this topology:

No alt text provided for this image

Figure 2: Generated Streams topology

Source Code

The source code for the accompanying Kafka Streams application is available here:

https://github.com/lydtechconsulting/kafka-streams/tree/v1.3.0

More On Kafka Streams...

Resources

要查看或添加评论,请登录

Rob Golder的更多文章

  • Kafka Consume & Produce: Spring Boot Demo

    Kafka Consume & Produce: Spring Boot Demo

    Introduction The Kafka Consumer and Producer APIs provide the ability to read and write data from topics in the Kafka…

  • Kafka Streams: Transactions & Exactly-Once Messaging

    Kafka Streams: Transactions & Exactly-Once Messaging

    Introduction Kafka Transactions guarantee that when a message is received, processed, and resulting message or messages…

  • Kafka Streams: State Store

    Kafka Streams: State Store

    Introduction Kafka Streams stateful processing enables the grouping of related events that arrive at different times by…

  • Kafka Streams: Testing

    Kafka Streams: Testing

    Introduction Writing comprehensive tests for a Kafka Streams application is essential, and there are multiple types of…

  • Kafka Streams Introduction

    Kafka Streams Introduction

    Introduction Kafka Streams provides an API for message streaming that incorporates a framework for processing…

  • Kafka Consumer Group Rebalance (2 of 2)

    Kafka Consumer Group Rebalance (2 of 2)

    Introduction This is the second in a two part article on Consumer Group Rebalance. In the first part consumer groups…

    3 条评论
  • Kafka Consumer Group Rebalance (1 of 2)

    Kafka Consumer Group Rebalance (1 of 2)

    Introduction Consumer groups are an important characteristic of Kafka’s distributed message processing for managing…

  • Kafka Consumer Auto Offset Reset

    Kafka Consumer Auto Offset Reset

    Introduction The auto offset reset consumer configuration defines how a consumer should behave when consuming from a…

    1 条评论
  • Kafka Producer Configuration

    Kafka Producer Configuration

    Introduction From maximising throughput, to ensuring idempotency, to achieving exactly-once messaging, there are many…

  • Kafka Producer Message Batching

    Kafka Producer Message Batching

    Introduction Batching messages enables a Kafka producer to increase its throughput. Reducing the number of network…

    1 条评论

社区洞察

其他会员也浏览了