Kafka Idempotent Consumer & Transactional Outbox

Kafka Idempotent Consumer & Transactional Outbox

Introduction

Duplicate messages are an inevitable aspect of distributed messaging with Kafka. Ensuring your application is able to handle these is essential. Using the Idempotent Consumer pattern coupled with the Transactional Outbox pattern is a recommended approach to achieve this. This article looks at these patterns and how to implement them.

Duplicate Message Scenarios

There is always a risk that your consumer will receive a duplicate message in a distributed system. There are different failure scenarios and configurations that can lead to this, impacted by how the application is designed and architected. This is covered in detail in the?Deduplication Patterns article.

Duplicate messages can leave resources in unexpected and unintended states, as the duplicate events cause a resource to be processed multiple times. A database record may be updated multiple times, or multiple new entities created. External systems may be called multiple times. Events may be published multiple times to confirm an action has completed, causing further problems downstream.

Two example scenarios that can lead to duplicate messages are a consumer poll timeout, and a producer publishing duplicate events to a topic.

Consumer Poll Timeout

When a consumer polls for a batch of records it has a timeout within which it should process the batch and poll for the next batch of records. With the Java Apache Kafka client, this timeout is configured with the max.poll.interval.ms, and the maximum number of records to process in this period is configured with the max.poll.records.

Should the consumer instance not complete processing the batch within the timeout then the Kafka Broker assumes the consumer has died. It removes the consumer instance from the consumer group and rebalances the partitions. The partition that was assigned to this instance is now reassigned to another instance in the consumer group. Since the batch of records that were polled by the first instance have not been marked as consumed (via an update to the consumer offsets topic), these are the records that are polled by the newly assigned consumer instance.

No alt text provided for this image

Figure 1: Duplicate Message Delivery

If the Kafka Broker did not take these actions and the consumer had in fact died, then the topic partitions that the consumer instance was listening to would be blocked.

While the new consumer instance is processing this batch of records, perhaps performing REST calls to other services, publishing new events to Kafka, and writing to the database, the first consumer instance is doing the same. If any of those actions were not idempotent, then data will be left in an inconsistent state across the system.

Non-Idempotent Producer

A Producer that is not configured to be idempotent can result in writing duplicate events to Kafka which would be consumed by listening services. The scenario where this happens is with a Producer writing to the topic but due to a transient error it does not receive acknowledgement of the write. It therefore retries the write in case it had actually failed, but can instead result in a duplicate write. This is covered in detail in the Idempotent Producer article.

The Idempotent Consumer Pattern

An Idempotent Consumer pattern uses a Kafka consumer that can consume the same message any number of times, but only process it once. To implement the Idempotent Consumer pattern the recommended approach is to add a table to the database to track processed messages. Each message needs to have a unique messageId assigned by the producing service, either within the payload, or as a Kafka message header. When a new message is consumed the table is checked for the existence of this message Id. If present, then the message is a duplicate. The consumer updates its offsets to effectively mark the message as consumed to ensure it is not redelivered, and no further action takes place.

If the message Id is not present in the table then a database transaction is started and the message Id is inserted. The message is then processed performing the required business logic. Upon completion the transaction is committed.

Database Flush Strategy

When implementing the Idempotent Consumer pattern care must be taken to ensure that the correct flushing strategy is used. The default flush strategy is often the ‘transactional write-behind’ strategy, and is the default strategy when using Hibernate and calling the save() method. With this strategy the flush of the persistence context happens at the last possible moment.

In the case where two duplicate events are consumed and are being processed in parallel, then both are able to start a transaction, write (uncommitted) the message Id to the database, and perform their message processing. Only when the second thread comes to commit the transaction would the constraint violation be thrown and the transaction be rolled back. This is illustrated in the following diagram, which has been simplified by removing the broker and 3PP service, and it is assumed that both consumer service instances have consumed the same duplicated message.

No alt text provided for this image

Figure 2: Idempotent Consumer

This could be acceptable, as the second database transaction would be rolled back once it tries to commit so any other database updates included in the transaction would be rolled back with this. With the Transactional Outbox pattern in place too (more on this below), there would be no risk of a duplicate resulting event being published. However, any processing that cannot be rolled back would still be duplicated, such as a REST call to an external service, or events published without the Transactional Outbox pattern.

By employing a strategy where the data is flushed to the database at the point of save then this synchronizes the model state with the database (although it is still not committed). This results in a different duplicate handling flow. The second consumer instance consumes the duplicate event, starts a transaction, and attempts to write the message Id to the database. Now however it is not able to acquire a lock on the database row until the transaction started by the first consumer either commits or rolls back. If that first transaction commits successfully then at that point the second transaction is aborted. If the first transaction rolls back then the processing of the second duplicate is able to proceed, and itself can complete successfully or roll back.

No alt text provided for this image

Figure 3: Idempotent Consumer - Data Flush Strategy

With Spring JPA and Hibernate it is simply a case of using the saveAndFlush() call when persisting the messageId within an?@Transactional?method.

private void deduplicate(UUID eventId) throws DuplicateEventException {
   try {
       processedEventRepository.saveAndFlush(new ProcessedEvent(eventId));
   } catch (DataIntegrityViolationException e) {
       throw new DuplicateEventException(eventId);
   }
}        

This then is the recommended approach, with the advantage of minimising duplicate actions occurring. A simple scenario might be where the event is triggering the call to a third party bank rail service to send a payment. Without the flush strategy then two duplicate payments would be sent (which is illustrated by using save() instead of saveAndFlush() ), with this strategy in place then just the one would.

If the first consumer failed before committing the database transaction, then the second consumer would now acquire the lock on the messageId and be able to process it.

No alt text provided for this image

Figure 4: Idempotent Consumer - Failure Scenario

As a caveat it is important to remember that no matter what patterns are in place there is still the risk of a failure scenario that can lead to duplicate actions occurring, so the ramifications of this must be understood and catered for. This is covered in detail in the Deduplication Patterns article.

The Transactional Outbox Pattern

When a message is consumed and processed, resulting in an event being published to Kafka, implementing the Idempotent Consumer on its own is not sufficient to ensure this will not result in duplicate events being emitted.

While Kafka offers the option of Kafka Transactions, messages published to Kafka in a Kafka transaction are not atomic with resources written using database transactions. Therefore a failure occurring between a Database transaction being committed and a Kafka transaction being committed will leave resources in an inconsistent state. Attempting to tie the commit of the two transactions together using a two-phase commit should also be avoided as this can fail under certain scenarios leaving the resources in an inconsistent state, adds complexity, and impacts performance by increasing the latency of all the transactions.

This can be addressed by effectively tying the publishing of an event to the database transaction by using the Transactional Outbox pattern. A new outbox table is created, and any event that is being published is inserted to this table.

No alt text provided for this image

Figure 5: Idempotent Consumer & Transactional Outbox

The write of the outbound event to the outbox table happens as part of the same database transaction that includes the write of the consumed message Id by the Idempotent Consumer. This means that the consume of the message, any resulting outbound event publishing, and any other database writes, are all atomic. They either all succeed or all fail. This provides the strongest guarantee that no duplicate processing will occur.

CDC and Kafka Connect

Change Data Capture (CDC) with a Kafka Connect connector can then be used to write the event to the outbound Kafka topic. The outbox event is written to the database commit log, and the connector reads this, transforms the data, and writes it to the Kafka topic.

Kafka Connect is a Kafka component for providing simple integration between a datasource (in this case a database) and Kafka. An off the shelf connector such as Debezium?could be employed here, connecting a Postgres or MySQL database to Kafka. This should only require configuration in order to use, from connecting to the source and sink datasources, simple message transforms, and error handling. The option to code and plug in more complex transforms is available, as is the option to code a new connector if necessary. Many connectors are available for different databases, so writing your own is usually not required, and would increase the application complexity.

There are other options for implementing the Transactional Outbox pattern besides using CDC. For example, a simple poller could be implemented to poll the outbox for new events, writing them to Kafka. Kafka Connect however comes with all the advantages that Kafka brings to the table such as resilience, fault tolerance, error handling, and scalability.

Data Consistency In Failure Scenarios

As the consumer offsets are only updated after the database transaction commits, then if the consumer dies after writing the outbox event but before the database transaction commits, then no outbound message is sent, and the message will be redelivered.

If the consumer dies after the database transaction commits but before the consumer offsets are written, then the message is redelivered, but deduplicated by the processed messageIds check, so no duplication occurs. In this case the consumer offsets must be updated by the second consumer to ensure the message is not continually redelivered.

Revisiting the original diagram (Figure 1) showing the duplicate message delivery, this time with the Idempotent Consumer and Transactional Outbox patterns in place:

No alt text provided for this image

Figure 6: Idempotent Consumer & Transactional Outbox - Duplicate Delivery

Conclusion

Coupling the Idempotent Consumer using the data flush strategy with the Transactional Outbox and CDC provides the gold standard in managing duplicate messages in applications using Kafka for distributed messaging. Guaranteeing deduplication using these patterns comes at the cost of extra complexity and more components in place in the Architecture, so it will be a trade-off to determine whether the advantages out-weigh the disadvantages for each use case.

Source Code

The source code is available here: https://github.com/lydtechconsulting/kafka-idempotent-consumer

The project demonstrates event deduplication with the Idempotent Consumer pattern, and emitting events via the Transactional Outbox pattern. It uses Debezium as the Kafka Connect component for Change Data Capture (CDC) in order to publish outbound events.

It contains SpringBoot integration tests using the embedded Kafka Broker and H2 database that demonstrate event deduplication when duplicate events are consumed by the application.

It also contains component tests that use TestContainers to manage spinning up a dockerised Kafka broker, a dockerised Kafka Connect using Debezium for CDC, a dockerised Postgres database, and a dockerised wiremock to represent a third party service. This call to the third party service simulates transient errors that can be successful on retry. The delay caused by retry can cause duplicate message delivery, enabling demonstration of event deduplication.

要查看或添加评论,请登录

Rob Golder的更多文章

  • Kafka Consume & Produce: Spring Boot Demo

    Kafka Consume & Produce: Spring Boot Demo

    Introduction The Kafka Consumer and Producer APIs provide the ability to read and write data from topics in the Kafka…

  • Kafka Streams: Transactions & Exactly-Once Messaging

    Kafka Streams: Transactions & Exactly-Once Messaging

    Introduction Kafka Transactions guarantee that when a message is received, processed, and resulting message or messages…

  • Kafka Streams: State Store

    Kafka Streams: State Store

    Introduction Kafka Streams stateful processing enables the grouping of related events that arrive at different times by…

  • Kafka Streams: Testing

    Kafka Streams: Testing

    Introduction Writing comprehensive tests for a Kafka Streams application is essential, and there are multiple types of…

  • Kafka Streams Spring Boot Demo

    Kafka Streams Spring Boot Demo

    Introduction This is part of a series of articles focussing on Kafka Streams. The first article gave an introduction to…

  • Kafka Streams Introduction

    Kafka Streams Introduction

    Introduction Kafka Streams provides an API for message streaming that incorporates a framework for processing…

  • Kafka Consumer Group Rebalance (2 of 2)

    Kafka Consumer Group Rebalance (2 of 2)

    Introduction This is the second in a two part article on Consumer Group Rebalance. In the first part consumer groups…

    3 条评论
  • Kafka Consumer Group Rebalance (1 of 2)

    Kafka Consumer Group Rebalance (1 of 2)

    Introduction Consumer groups are an important characteristic of Kafka’s distributed message processing for managing…

  • Kafka Consumer Auto Offset Reset

    Kafka Consumer Auto Offset Reset

    Introduction The auto offset reset consumer configuration defines how a consumer should behave when consuming from a…

    1 条评论
  • Kafka Producer Configuration

    Kafka Producer Configuration

    Introduction From maximising throughput, to ensuring idempotency, to achieving exactly-once messaging, there are many…

社区洞察

其他会员也浏览了