Kafka Transactions: Part 2: Spring Boot Demo
Rob Golder
Director & Co Founder at Lydtech Consulting - Consultant Lead Engineer & Architect
Introduction
In the?first part?of this article the use of Kafka Transactions to achieve exactly-once messaging semantics was explored. In this second part the behaviour is demonstrated via a Spring Boot application.
Demo Application
The?accompanying Spring Boot application?demonstrates Kafka's exactly-once message processing. An integration test uses a Test Kafka Producer to send in messages via the embedded in-memory Kafka broker that are consumed by the application. Test consumers then poll for the resulting messages that the application produces to the outbound topics both with and without Kafka transactions.
The integration test class source is located?here.
Test Flow
The following diagram shows the components in place for the Spring Boot Demo application and integration tests.
The application consumes a message and performs three resulting actions:
As described in the Wiremock section below, the call to the REST service that is sandwiched between the two outbound writes allows the tests to demonstrate failure scenario behaviour.
Application Kafka Configuration
These actions take place within a Kafka transaction. To enable this, the application Producer is configured with a transactionalId, and idempotence is enabled:
config.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transaction-id");
config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
The?KafkaTransactionManager?is wired into the Spring context:
@Bean
public KafkaTransactionManager kafkaTransactionManager(final ProducerFactory producerFactoryTransactional) {
return new KafkaTransactionManager<>(producerFactoryTransactional);
}
The full Kafka configuration source for the application is?here.
The service method is annotated?@Transactional:
@Transactional
public void processWithTransaction(String key, DemoInboundEvent event) {
kafkaClient.sendMessageWithTransaction(key, event.getData(), properties.getOutboundTopic1());
callThirdparty(key);
kafkaClient.sendMessageWithTransaction(key, event.getData(), properties.getOutboundTopic2());
}
The consumer is configured to disable auto commit of consumer offsets, as it is the Producer's responsibility to send the consumer offsets to be committed to the Consumer Coordinator during the transaction:
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
The consumer listener method itself is annotated with?@KafkaListener, and told which topic to listen to. In this case the topic is named?demo-transactional-inbound-topic?to clearly indicate this topic as the source for the flow that will use Kafka transactions:
@KafkaListener(topics = "demo-transactional-inbound-topic",
groupId = "kafkaConsumerGroup",
containerFactory = "kafkaListenerContainerFactory")
public void listen(@Header(KafkaClient.EVENT_ID_HEADER_KEY) String eventId,
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key,
@Payload final String payload) {
To contrast the behaviour between transactional and non-transactional flows, the service also has a consumer that consumes and processes a message, producing outbound messages using a non-transactional Producer. This consumer listens to the inbound topic?demo-non-transactional-inbound-topic, again to clearly indicate that processing messages from this topic will not utilise a transaction.
领英推荐
Wiremock
The REST call at step?(2)?of course is not included in the atomic transaction, but is a useful way of forcing a retryable failure exception via wiremock stubbing. On the first call to the wiremock, following the first outbound event write to?Topic 1, it returns a?500 Internal Server Error. Subsequent calls to the wiremock are stubbed to return a?200 Success.
stubWiremock("/api/kafkatransactionsdemo/" + key, 500, "Unavailable", "failOnce", STARTED, "succeedNextTime");
stubWiremock("/api/kafkatransactionsdemo/" + key, 200, "success", "failOnce", "succeedNextTime", "succeedNextTime");
This?500?exception percolates up and the poll ends unsuccessfully. This means the same event is re-consumed on the next poll. The first event is re-emitted to?Topic 1, and this time the REST call returns successfully with a?200, and then the second event is emitted to?Topic 2. The consumer offsets, sent by the Producer to the Consumer Coordinator, are written to the consumer offsets topic, marking the original event as successfully consumed.
The transaction completes, with Spring taking care of the boilerplate code, resulting in the Transaction Coordinator writing the commit markers to the two topics along with the consumer offsets topic. Processing completes, and the messages are committed to the topics.
SpringBoot Test Configuration
The SpringBoot integration test uses an embedded (in-memory) Kafka broker. The broker is configured via the following Spring Kafka annotation in the SpringBoot test, ensuring that there are three instances created to cater for the required replication factor of 3 to provide resilience for the transactions state topic.
@EmbeddedKafka(controlledShutdown = true, count=3,
topics = { "demo-transactional-inbound-topic", "demo-non-transactional-inbound-topic" })
The test then defines its own Kafka configuration to ensure it does not use that configured for the application itself. The test has four downstream consumers configured, two for each of the two topics. (Different consumer groups are used for the consumers on each topic, to ensure both consume any messages written). One of each pair has read?isolation.level?READ_COMMITTED:
config.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT));
...and the other READ_UNCOMMITTED:
config.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_UNCOMMITTED.toString().toLowerCase(Locale.ROOT));
With this test setup in place, a message is produced to the application inbound topic (as shown in the diagram above), which the application consumes and processes. The tests then assert on the counts of the number of events received by each downstream consumer.
Results
The READ_COMMITTED downstream consumer for?Topic 1?receives the message once. The READ_UNCOMMITTED consumer for?Topic 1?receives two messages, reflecting the fact the message was originally written to?Topic 1?(but not committed) but the processing failed in the retryable exception.
The two consumers for?Topic 2?both receive the one message, as in this test flow the message was only written to the topic once following the failure retry, and only then was it successfully committed.
So the READ_COMMITTED downstream consumer for?Topic 1?demonstrates the Kafka exactly-once messaging semantics. Although a message within the consume-process-produce flow was processed twice, it has only been consumed once by the downstream transaction aware consumer.
The REST call to the third party service also highlights that although the exactly-once delivery to the downstream consumer can be guaranteed, any further actions occurring during the processing of the original message may happen more than once. The ramifications of those duplicate calls (and writes in the case of a database) will need their own consideration.
Source Code
The source code for the accompanying demo application is available here:
Previous...
For the detailed exploration of how Kafka Transactions work to achieve exactly-once messaging, see the?first part?of this article.
Software Engineer
1 年I tried to test the example, but in the "transactional" case if the "third party service" fails, the message sent in the "Outbond Topic 1" is not roll-backed. I expected that message to be deleted, is this correct or is there some problem in my development environment?
Software Engineer, Architect, Hands-on, Backend, Microservices, Java, Linux, TLS / Security, ex-Apple, Symantec,
2 年Thanks Rob. QQ: what does the following mean? The consumer is configured to disable auto commit of consumer offsets, as it is the Producer's responsibility to send the consumer offsets to be committed to the Consumer Coordinator during the transaction... Suppose, I have a listener: public void onMessage(ConsumerRecord<String, EventData> data) { // does nothing } There is no producer in the mix here. What does that statement imply in this case? Thanks.