Kafka Transactions: Part 2: Spring Boot Demo

Kafka Transactions: Part 2: Spring Boot Demo

Introduction

In the?first part?of this article the use of Kafka Transactions to achieve exactly-once messaging semantics was explored. In this second part the behaviour is demonstrated via a Spring Boot application.

Demo Application

The?accompanying Spring Boot application?demonstrates Kafka's exactly-once message processing. An integration test uses a Test Kafka Producer to send in messages via the embedded in-memory Kafka broker that are consumed by the application. Test consumers then poll for the resulting messages that the application produces to the outbound topics both with and without Kafka transactions.

The integration test class source is located?here.

Test Flow

The following diagram shows the components in place for the Spring Boot Demo application and integration tests.

No alt text provided for this image

The application consumes a message and performs three resulting actions:

No alt text provided for this image

As described in the Wiremock section below, the call to the REST service that is sandwiched between the two outbound writes allows the tests to demonstrate failure scenario behaviour.

Application Kafka Configuration

These actions take place within a Kafka transaction. To enable this, the application Producer is configured with a transactionalId, and idempotence is enabled:

config.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transaction-id");
config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);        

The?KafkaTransactionManager?is wired into the Spring context:

@Bean
public KafkaTransactionManager kafkaTransactionManager(final ProducerFactory producerFactoryTransactional) {
   return new KafkaTransactionManager<>(producerFactoryTransactional);
}        

The full Kafka configuration source for the application is?here.

The service method is annotated?@Transactional:

@Transactional
public void processWithTransaction(String key, DemoInboundEvent event) {
   kafkaClient.sendMessageWithTransaction(key, event.getData(), properties.getOutboundTopic1());
   callThirdparty(key);
   kafkaClient.sendMessageWithTransaction(key, event.getData(), properties.getOutboundTopic2());
}        

The consumer is configured to disable auto commit of consumer offsets, as it is the Producer's responsibility to send the consumer offsets to be committed to the Consumer Coordinator during the transaction:

config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");        

The consumer listener method itself is annotated with?@KafkaListener, and told which topic to listen to. In this case the topic is named?demo-transactional-inbound-topic?to clearly indicate this topic as the source for the flow that will use Kafka transactions:

@KafkaListener(topics = "demo-transactional-inbound-topic", 
	groupId = "kafkaConsumerGroup", 
	containerFactory = "kafkaListenerContainerFactory")
public void listen(@Header(KafkaClient.EVENT_ID_HEADER_KEY) String eventId, 
	@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key, 
	@Payload final String payload) {        

To contrast the behaviour between transactional and non-transactional flows, the service also has a consumer that consumes and processes a message, producing outbound messages using a non-transactional Producer. This consumer listens to the inbound topic?demo-non-transactional-inbound-topic, again to clearly indicate that processing messages from this topic will not utilise a transaction.

Wiremock

The REST call at step?(2)?of course is not included in the atomic transaction, but is a useful way of forcing a retryable failure exception via wiremock stubbing. On the first call to the wiremock, following the first outbound event write to?Topic 1, it returns a?500 Internal Server Error. Subsequent calls to the wiremock are stubbed to return a?200 Success.

stubWiremock("/api/kafkatransactionsdemo/" + key, 500, "Unavailable", "failOnce", STARTED, "succeedNextTime");
stubWiremock("/api/kafkatransactionsdemo/" + key, 200, "success", "failOnce", "succeedNextTime", "succeedNextTime");        

This?500?exception percolates up and the poll ends unsuccessfully. This means the same event is re-consumed on the next poll. The first event is re-emitted to?Topic 1, and this time the REST call returns successfully with a?200, and then the second event is emitted to?Topic 2. The consumer offsets, sent by the Producer to the Consumer Coordinator, are written to the consumer offsets topic, marking the original event as successfully consumed.

The transaction completes, with Spring taking care of the boilerplate code, resulting in the Transaction Coordinator writing the commit markers to the two topics along with the consumer offsets topic. Processing completes, and the messages are committed to the topics.

SpringBoot Test Configuration

The SpringBoot integration test uses an embedded (in-memory) Kafka broker. The broker is configured via the following Spring Kafka annotation in the SpringBoot test, ensuring that there are three instances created to cater for the required replication factor of 3 to provide resilience for the transactions state topic.

@EmbeddedKafka(controlledShutdown = true, count=3, 
    topics = { "demo-transactional-inbound-topic", "demo-non-transactional-inbound-topic" })        

The test then defines its own Kafka configuration to ensure it does not use that configured for the application itself. The test has four downstream consumers configured, two for each of the two topics. (Different consumer groups are used for the consumers on each topic, to ensure both consume any messages written). One of each pair has read?isolation.level?READ_COMMITTED:

config.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT));        

...and the other READ_UNCOMMITTED:

config.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_UNCOMMITTED.toString().toLowerCase(Locale.ROOT));        

With this test setup in place, a message is produced to the application inbound topic (as shown in the diagram above), which the application consumes and processes. The tests then assert on the counts of the number of events received by each downstream consumer.

Results

The READ_COMMITTED downstream consumer for?Topic 1?receives the message once. The READ_UNCOMMITTED consumer for?Topic 1?receives two messages, reflecting the fact the message was originally written to?Topic 1?(but not committed) but the processing failed in the retryable exception.

The two consumers for?Topic 2?both receive the one message, as in this test flow the message was only written to the topic once following the failure retry, and only then was it successfully committed.

So the READ_COMMITTED downstream consumer for?Topic 1?demonstrates the Kafka exactly-once messaging semantics. Although a message within the consume-process-produce flow was processed twice, it has only been consumed once by the downstream transaction aware consumer.

The REST call to the third party service also highlights that although the exactly-once delivery to the downstream consumer can be guaranteed, any further actions occurring during the processing of the original message may happen more than once. The ramifications of those duplicate calls (and writes in the case of a database) will need their own consideration.

Source Code

The source code for the accompanying demo application is available here:

https://github.com/lydtechconsulting/kafka-transactions

Previous...

For the detailed exploration of how Kafka Transactions work to achieve exactly-once messaging, see the?first part?of this article.

Filippo Forcina

Software Engineer

1 年

I tried to test the example, but in the "transactional" case if the "third party service" fails, the message sent in the "Outbond Topic 1" is not roll-backed. I expected that message to be deleted, is this correct or is there some problem in my development environment?

回复
Gaurav Khanna

Software Engineer, Architect, Hands-on, Backend, Microservices, Java, Linux, TLS / Security, ex-Apple, Symantec,

2 年

Thanks Rob. QQ: what does the following mean? The consumer is configured to disable auto commit of consumer offsets, as it is the Producer's responsibility to send the consumer offsets to be committed to the Consumer Coordinator during the transaction... Suppose, I have a listener: public void onMessage(ConsumerRecord<String, EventData> data) { // does nothing } There is no producer in the mix here. What does that statement imply in this case? Thanks.

回复

要查看或添加评论,请登录

Rob Golder的更多文章

  • Kafka Consume & Produce: Spring Boot Demo

    Kafka Consume & Produce: Spring Boot Demo

    Introduction The Kafka Consumer and Producer APIs provide the ability to read and write data from topics in the Kafka…

  • Kafka Streams: Transactions & Exactly-Once Messaging

    Kafka Streams: Transactions & Exactly-Once Messaging

    Introduction Kafka Transactions guarantee that when a message is received, processed, and resulting message or messages…

  • Kafka Streams: State Store

    Kafka Streams: State Store

    Introduction Kafka Streams stateful processing enables the grouping of related events that arrive at different times by…

  • Kafka Streams: Testing

    Kafka Streams: Testing

    Introduction Writing comprehensive tests for a Kafka Streams application is essential, and there are multiple types of…

  • Kafka Streams Spring Boot Demo

    Kafka Streams Spring Boot Demo

    Introduction This is part of a series of articles focussing on Kafka Streams. The first article gave an introduction to…

  • Kafka Streams Introduction

    Kafka Streams Introduction

    Introduction Kafka Streams provides an API for message streaming that incorporates a framework for processing…

  • Kafka Consumer Group Rebalance (2 of 2)

    Kafka Consumer Group Rebalance (2 of 2)

    Introduction This is the second in a two part article on Consumer Group Rebalance. In the first part consumer groups…

    3 条评论
  • Kafka Consumer Group Rebalance (1 of 2)

    Kafka Consumer Group Rebalance (1 of 2)

    Introduction Consumer groups are an important characteristic of Kafka’s distributed message processing for managing…

  • Kafka Consumer Auto Offset Reset

    Kafka Consumer Auto Offset Reset

    Introduction The auto offset reset consumer configuration defines how a consumer should behave when consuming from a…

    1 条评论
  • Kafka Producer Configuration

    Kafka Producer Configuration

    Introduction From maximising throughput, to ensuring idempotency, to achieving exactly-once messaging, there are many…

社区洞察

其他会员也浏览了