Kafka Streams Spring Boot Demo
Rob Golder
Director & Co Founder at Lydtech Consulting - Consultant Lead Engineer & Architect
Introduction
This is part of a series of articles focussing on Kafka Streams. The?first article?gave an introduction to the Kafka Streams API and its architecture, benefits and usage. This article details the accompanying Spring Boot application that demonstrates stateless and stateful message processing using the API.
Spring Boot Application
For this a demo a Spring Boot application has been created that simulates receiving payment events from Kafka and processes these payments including currency conversion using stateless KStream processors. The application tracks the account balances by aggregating the payment amounts using a stateful KTable processor, using RocksDB as the state store. The payments are then emitted to the relevant outbound rails topic. The rails topics relate to the bank rails that would then pick up the payments to make the actual money transfers. The code repository is?here.
Topology
The Kafka Streams topology is as follows:
Figure 1: Payment Kafka Streams topology
Configuration
Spring handles the wiring of the Kafka Streams components and their configuration, with the following Kafka Streams configuration defined in KafkaStreamsDemoConfiguration?being wired in:
@EnableKafkaStreams
public class KafkaStreamsDemoConfiguration {
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kafkaStreamsConfig(@Value("${kafka.bootstrap-servers}") final String bootstrapServers) {
Map props = new HashMap<>();
props.put(APPLICATION_ID_CONFIG, "kafka-streams-demo");
props.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass().getName());
return new KafkaStreamsConfiguration(props);
}
Processing Flow
The processors are defined in?PaymentTopology. This Spring component is wired with a StreamsBuilder:
@Autowired
public void buildPipeline(StreamsBuilder streamsBuilder) {
The PaymentTopology then defines the following processing flow that consumed events will pass through:
- Payment events are received from the inbound payment-topic.
KStream messageStream = streamsBuilder
.stream(properties.getPaymentInboundTopic(), Consumed.with(STRING_SERDE, PaymentSerdes.serdes()))
- A custom serdes class?PaymentSerdes?(for serializing/deserializing) is provided here as the inbound JSON must be deserialized into a PaymentEvent PoJO.
public static Serde serdes() {
JsonSerializer serializer = new JsonSerializer<>();
JsonDeserializer deserializer = new JsonDeserializer<>(PaymentEvent.class);
return Serdes.serdeFrom(serializer, deserializer);
}
- Any payments that use unsupported rails are filtered out.
.filter((key, payment) -> SUPPORTED_RAILS.contains(payment.getRails()))
- The payments are branched into two streams, one for GBP payments, and one for USD payments.
KStream[] currenciesBranches = messageStream.branch(
(key, payment) -> payment.getCurrency().equals(Currency.GBP.name()),
(key, payment) -> payment.getCurrency().equals(Currency.USD.name())
);
- The USD payments are transformed to GBP payments by applying the FX rate.
KStream fxStream = currenciesBranches[1].mapValues(
// Use mapValues() as we are transforming the payment, but not changing the key.
(payment) -> {
// Perform FX conversion.
double usdToGbpRate = 0.8;
PaymentEvent transformedPayment = PaymentEvent.builder()
.paymentId(payment.getPaymentId())
.amount(Math.round(payment.getAmount() * usdToGbpRate))
.currency(Currency.GBP.name())
.fromAccount(payment.getFromAccount())
.toAccount(payment.getToAccount())
.rails(payment.getRails())
.build();
return transformedPayment;
});
- The branched streams are merged back into one.
领英推è
KStream mergedStreams = currenciesBranches[0].merge(fxStream)
Two things now occur:
1.
- The stream is branched by the payments’ bank rails.
KStream[] railsBranches = mergedStreams.branch(
(key, payment) -> payment.getRails().equals(Rails.BANK_RAILS_FOO.name()),
(key, payment) -> payment.getRails().equals(Rails.BANK_RAILS_BAR.name()));
- The payment events are serialized and emitted to their respective outbound bank rails topics.
railsBranches[0].to(properties.getRailsFooOutboundTopic(), Produced.with(STRING_SERDE, PaymentSerdes.serdes()));
railsBranches[1].to(properties.getRailsBarOutboundTopic(), Produced.with(STRING_SERDE, PaymentSerdes.serdes()));
2.
- A KTable is used to aggregate the payment balances by the fromAccount name.
mergedStreams
.map((key, payment) -> new KeyValue<>(payment.getFromAccount(), payment.getAmount()))
.groupByKey(Grouped.with(STRING_SERDE, LONG_SERDE))
.aggregate(new Initializer() {
@Override
public Long apply() {
return 0L;
}
}, new Aggregator() {
@Override
public Long apply(final String key, final Long value, final Long aggregate) {
return aggregate + value;
}
}, Materialized.with(STRING_SERDE, LONG_SERDE).as("balance"));
State Store Query
A REST endpoint in?BalanceController?is exposed to allow clients to query the current balance by account:
@GetMapping("/balance/{account}")
public ResponseEntity getAccountBalance(@PathVariable String account) {
KafkaStreams kafkaStreams = factoryBean.getKafkaStreams();
ReadOnlyKeyValueStore balances = kafkaStreams.store(
StoreQueryParameters.fromNameAndType("balance", QueryableStoreTypes.keyValueStore())
);
return ResponseEntity.ok(balances.get(account));
}
Testing
The Kafka Streams API provides a?org.apache.kafka.streams.TopologyTestDriver?which is used to test the topology in a unit test. An example test utilising the driver is PaymentTopologyTest. This and other means to fully test a Kafka Streams application will be explored in a follow-on article.
Topology Description
Spring’s StreamsBuilderFactory Bean provides an option to describe the topology. In?TopologyController?this has been exposed as a REST endpoint to query.
@GetMapping("/topology")
public ResponseEntity getTopology() {
return ResponseEntity.ok(factoryBean.getTopology().describe().toString());
}
The topology description String in the REST response can be plugged into the tool at?https://zz85.github.io/kafka-streams-viz/?to give a visual representation of the topology. Here is a part of this topology:
Figure 2: Generated Streams topology
Source Code
The source code for the accompanying Kafka Streams application is available here:
More On Kafka Streams...
- The?Kafka Streams Introduction?article delves into the API and covers its benefits and characteristics.
Resources
- Kafka Streams Topology Visualizer?by Joshua Koo, Than Hedman, Marc L?he