Microservices: Consistent State Propagation with Debezium Engine

Microservices: Consistent State Propagation with Debezium Engine

This article outlines the main challenges of state propagation across different (micro)services together with architectural patterns for facing those challenges, providing an actual implementation using Debezium Engine.

But first of all, lets define some of the basic concepts that will be used during the article.

  • (Micro)service: I personally do no like this term, I prefer talking about bounded contexts and their internal domain / sub-domains. Running the bounded context and/or its domains as different runtimes (aka (micro)services) is a deployment decision, meaning that domain and sub-domains within a bounded context must be decoupled independently of how they are deployed (microservices vs modular monolith). For the sake of simplicity, in this article I will use the word microservice to represent isolated runtimes that need to react to changes in the state maintained by other microservices.
  • State: Within a microservice, its bounded context, domain an subdomains define an internal state in the form of aggregates (entities and value objects). In this article, changes in the microservice's state shall be propagated to another runtime.
  • State Propagation: In a scenario where the application runtimes (i.e. microservices) are decoupled, independently of how they are deployed (pod in kubernetes, docker container, OS service / process), state propagation represents the mechanism for ensuring that state mutation happened in a bounded context are communicated to the dowstream bounded contexts.

Setting the Stage

As playground for this article, we will use two bounded context defined as part of an e-learning platform:

  • Course Management: Managing courses, including operations for course authoring.
  • Rating System: Enable platform users to rate courses.

Between both contexts there is an asynchronous, event-driven, relationship, so that whenever a course is created in the Course Management bounded context, an event is published and evententually received by the Rating System, which adapts the inbound event to its internal domain model using an anti-corruption layer (ACL) and creates an empty rating for that Course. Next image outlines this (simplified) context mapping:

Bounded Context Mapping of the two services

The Challenge

The scenario is apparently pretty simple, we can just publish a message (CourseCreated) to a message broker (e.g. RabbitMQ) whenever a Course is created, having the Rating System subscribed to that event, it will be eventually received and processed by the downstream service. However, it is not so simple and we have several "what-if" situations, like:

  1. What if the Course creation transaction is eventually rolled back (e.g. the database does not accept the operation) but the message is correctly published to the message broker.
  2. What if the Course is created and persisted, but the message broker do not accept the message (or it is not available).

In a nutshell, the main challenge here is how to ensure, in a distributed solution, that both, the domain state mutation and the corresponding event publication happens in a consistent and atomic operation, so that both or none should happen.

There are certainly solutions that can be implemented and message producer or message consumer sides to try to solve these situations, like retries, publishing compensation events, manually revert the write operation in the database...

However, most of them requires the software engineers having too many scenarios in mind, which is error prone and reduces codebase maintainability.

Another alternative is implementing a 2-phase-commit solution at infrastructure level, making more complex the deployment and operations of the underlying infrastructure, and most likely forcing the adoption of expensive commercial solutions.

During the rest of the article we will focus on a solution based on the combination of two important patterns in distributes systems: Transactional Outbox and Change Data Capture, providing a reference implementation that will allow software engineers to focus on what really matters: Providing domain value.

Patterns Can Help

As described above, we need to ensure that state mutation and the publication of a domain event are atomic operations. This can be achieved by the combination of two patterns which are nicely explained by Chris Richardson in his must-read Microservices Patterns book, therefore I will not explain them in detail here.

Transactional Outbox Pattern

Transactional Outbox focuses on persisting both state mutation and corresponding event(s) in an atomic database operation. In our case, we will leverage ACID capabilities of a relational database, with one transaction that includes two write operations, one in the domain specific table, and another that persists the events in an outbox (supporting) table.

This ensures than we will achieve a consistent state of both domain the corresponding events. This is shown in the next figure:

Transactional Outbox Pattern

Change Data Capture with Trasaction Log Tailing

Once the events are available in the Oubox table, we need a mechanism to detect new events stored in the outbox (Change Data Capture - CDC) and publish them to external consumers (Transaction Log Tailing Message Relay).

The Message Relay is responsible for detecting (i.e. CDC) new events available in the outbox table and publish those events for external consumers via message broker (e.g. RabbitMQ, SNS + SQS) or event stream (e.g. Kafka, Kinesis).

There are different Change Data Capture (CDC) techniques for the Message Relay to detect new events available in the outbox. In this article we will use the Log Scanners approach, named Transaction Log Tailing by Chris Richardson, where the Message Relay with be tailing the database transation log to detect the new messages that have been appended to the Outbox table. I personally prefer this approach since it reduces the amoung of manual work, but might not be available for all databases.

Next image illustrates how the Message Relay integrates with the Transactional Outbox:

Transactional Outbox in combionation with Transaction Log Tailing CDC

Once of the main goals of this solution, is ensuring that the software engineers of the two bounded context only need to focus on the elements with orange color in the diagram above, the grey components are just infrastructure elements that shall be transparent for the developers.

So, how do we implement the Transaction Log Tailing Message Relay? Debezium Engine is the answer.

Debezium

Debezium is a Log Scanner type change data capture solution that provides connectors for several databases, creating a stream of messages out of the changes detected in the database's transaction log. Debezium comes with two flavors:

  • Debezium Server: This is a full features version of Debezium which leverages Apache Kafka and Kafka Connectors to stream data from the source database to the target system.
  • Debezium Embedded / Engine: Simplied version that can be embedded as a library in your product, do not require an Apache Kafka service, but still makes use of Kafka Connectors to detect changes in the data sources.

In this example we will use Debezium Embedded, due to its simplicity (i.e. no Kafka instance is needed) but at the same time it is robuts enough to provided a suitable solution.

The first time a Debezium instance starts to track a database, it takes an snapshot of the current data to be used as a basis, once completed, only the delta of changes from the latest stored offset will be processed.

Debezium is highly configurable, being possible to shape its behaviour to meet different expectations, allowing for instance to define:

  • The database operations to be tracked (updates, inserts, deletions, schema modifications)
  • The database tables to be tracked.
  • Offset backing store solution (in-memory, file-based, Kafka-based)
  • Offset flush internal.

Some of this properties will be analyzed later in the article.

All Pieces Together - Deployment Architecture

All the components will be deployed as Docker containers in Docker Compose. Those components requiring durable storage will use a Docker volume.

  • Services: In this example, we will use Spring Boot for building the Course Management and Rating System bounded contexts. Each bounded context will be deployed as separate runtimes. The persistence solutions for both services will be PostgreSQL having each of the services a dedicated schema.
  • Persistence: PostgreSQL is also deployed as a Docker container.
  • Message Broker: For the message broker we will use RabbitMQ also running as a Docker container.
  • Message Relay: Laveraging Spring Boot and Debezium Embedded, provides the Change Data Capture (CDC) solution for detecting new records added in the oubox table of the Course Management service.

The overall (simplified) deployment architecture is shown below:

Deployment Architecture of the solution proposed in the article

Show Me The Code

All the code described in this article can be found in my personal GitHub repository.

Overall Project Structure

The code provided to implement this example is structured as a multi-module java maven project, leveraing Spring Boot and following hexagonal-architecture like structure.

There are three main package groups:

Toolkit Supporting Context

Modules providing shared components and infrastructure related elements used by the functional bounded contexts (in this example Course Management and Rating System). For instance, the transactional outbox and the Debezium-based change data capture are shared concerns and therefore their code belongs to these modules.

Modules of the Shared Context (toolkit)

Where:

  • toolkit-core: Core classes an interfaces, to be used by the functional contexts.
  • toolkit-outbox-jpa-postgres: Implementation of the transactional outbox using JPA for Postgres.
  • toolkit-cdc-debezium-postgres: Implementation of the message relay as CDC with Debezium embedded for Postgres.
  • toolkit-message-publisher-rabbitmq: Message publishers implementation for RabbitMQ.
  • toolkit-tx-spring: Provides programmatic transaction management with Spring.
  • toolkit-state-propagation-debezium-runtime: Runtime (service) responsible of the CDC and message publication to the RabbitMQ instance.

Course Management Bounded Context

These modules conform the Course Management bounded context. The module adhere to hexagonal architecture principles, similar to the structure already used in my previous article about repository testing.

Modules of the Course Management Bounded Context

Where:

  • course-management-domain: Module with the Course Management domain definition, including entities, value objects, domain events, ports, etc. This module has no dependencies with frameworks, being as pure Java as possible.
  • course-management-application: Following hexagonal architecture, this module orchestrates invocations to the domain model using commands and command handlers.
  • course-management-repository-test: Contains the repository test definitions, with no dependencies to frameworks, only verifies the expectation of the repository ports defined in the course-management-domain module.
  • course-management-repository-jpa: JPA implementation of the repository interface CourseDefinitionRepository defined in the course-management-domain module. Leverages Spring Boot JPA.
  • course-management-repository-jpa-postgres: Specialization of the repository JPA implementation of the previous module, adding postgres specific concerns (e.g. postgres database migration scripts)
  • course-management-rest: Inbound web-based adapter exposing HTTP endpoints for course creation.
  • course-management-runtime: Runtime (service) of the Course Management context.

Rating System Bounded Context

For the sake of simplicity, this bounded context is partially implemented with only an inbound AMPQ-based adapter for receiving the messages created by the Course Management service when a new course is created and published by the CDC service (toolkit-state-propagation-debezium-runtime)

Modules of the Rating System Bounded Context

Where:

  • rating-system-amqp-listener: AMQP listener leveraging Spring Boot AMQP, subscribes to messages of the Course Management context.
  • rating-system-domain: No domain has been defined for the Rating System context.
  • rating-system-application: No application layer has been defined for the Rating System context.
  • rating-system-runtime: Runtime (service) of the Rating System context, starting the AMQP listener defined in rating-system-amqp-listener.

Request Flow

This section outlines the flow of a request for creating a course, starting when the user requests the creation of a course to the backend API and finalizing with the (consistent) publication of the corresponding event in the message broker.

This flow has been splitted in three phases.

Phase 1: State Mutation and Domain Events Creation

Starts with the request for creating a new Course Definition. The HTTP post request is mapped to a domain command and processed by its corresponding command handler defined in course-management-application. Command handlers are automatically injected in the provided CommandBus implementation, in this example the CommandBusInProcess defined in the toolkit-core module:


The command handler creates an instance of the CourseDefinition entity. The business logic and invariants (if any) of creating a Course Definition are encapsulated within the domain entity. The creating of a new instance of the domain entity also comes with the corresponding CourseDefinitionCreated domain event:

The event is "recorded" into the created course definition instance. This method is defined in the abstract class Entity of the toolkit-core module:

Once the course definition instance is in place, the command handler will persists the instance in the course definition repository, starting the second phase of the processing flow.

Phase 2: Persisting the State: Course Definition and Events in the Outbox

Whenever a domain entity is saved in the repository, the domain events associated to the domain state mutation (in this example, the creating of a CourseDefinition entity) are temporarly appended to an in-memory, ThreadLocal, buffer. This buffer resides in the DomainEventAppender of the toolkit-core.

Events are placed in this buffer from an aspect executed around methods annotated with AppendEvents. The pointcut and aspect (both in toolkit-core) look like:

The command handlers are automatically decorated before "injected" in the command bus. One of the decorator is ensuring the command handlers is transactional, and another ensures when the transaction completes the events in the in-memory-thread-local buffer are published to the outbox table consistently with the on going transaction. Next sequence diagram shows the decorators applied to the domain-specific command handler.

High Level Processing Flow

The outbox is an append only buffer (as a postgres table in this example) where a new entry for each event is added. The outbox entry has the following structure:

Where the actual event payload is serialized as a json string in the payload property. The outbox interface is straightforward:

The postgres implementation of the Outbox interface is placed the toolkit-outbox-jpa-postgres module:

The phase 2 is completed, now both the domain state and the corresponding event are consistently persisted, under the same transaction, in our postgres database.

Phase 3: Events Publication

In order to make the domain events generated in the previous phase available to external consumers, the message relay implementation based on Debezium Embedded is monitoring the outbox table, so that whenever a new record is added to the outbox, the message relay creates a Cloud Event and publishes it to the RabbitMQ instance following the Cloud Event AMQP binding specification.

The following code snippet shows the Message Relay specification as defined in the toolkit-core module:

And the Debezium Embedded based implementation:

As can be seen in the code snippet above, the DebeziumEngine is configured to notify a private method handleChangeEvent when a change in the database is detected. In this method a Consumer of CdcRecord is used as a wrapper of the internal Debezium model represented by the Struct class. Initial configuration must be provided to the Debezium Engine, in the example this is done witht he DebeziumConfigurationProvider class:

The most relevant properties are outlied below:

  • connector.class: The name of the connector to use, usually this is related to the database technology being tracked for changes. In this example we are using io.debezium.connector.postgresql.PostgresConnector.
  • offset. storage: Type of storage for maintaining the offset. In this example we are using org.apache.kafka.connect.storage.MemoryOffsetBackingStore, so that offsets are lost after restarting the service. See below.
  • offset.flush.interval. ms: Number of milliseconds for the offsets to be persisted in the offset store.
  • database.*: These properties refer to the database being tracked (CDC) for changes by Debezium.
  • skipped.operations: If not specified, all the oprations will be tracked. For our example, since we only want to detect newly created events, all the operations but (i)nserts are skipped.
  • table.include.list: List of the tables to include for the CDC. In this example only the table where events are stored during the Phase 2 are relevant (i.e. outbox_schema.outbox)

The first thing Debezium will do after starting is taking a snapshot of the current data and generating the corresponding events. After that the offset is updated to the latest record and the deltas (newly added, updated or deleted records) are processed, updating the offset accordingly. Since in the provided example we are using an in-memory based offset store, the snapshot is performed always after starting the service. Therefore, this is not a production ready implementation yet, there are two options:

  • Use a durable offset store (file based or Kafka based are supported by Debezium Embeeded)
  • Delete the outbox table after the events are being processed, ensuring the delete operations are skipped by Debezium in its configuration.,

The Message Relay is configured and initialized in the toolkit-state-propagation-debezium-runtime module. The values of the configuration properties needed by Debezium Embedded are defined in the Spring Boot properties.yaml file:

The engine is started using the Spring Boot lifecycle events:

The change data capture records (CdcRecord) are processed by the CloudEventRecordChangeConsumer which creates the Cloud Event representation of the CDC record and publishes it through the MessagePublisher.

The provided MessagePublisher is a simple RabbitMQ outbound adapter converting the Cloud Event to the corresponding AMQP message as per the Cloud Event AMQP protocol binding.

After Phase 3, the events are published to the producer's (Course Management Service) message relay RabbitMQ exchange defined under the convention __MR_<APP_NAME>, in our example: __MR_course-management, messages are routed to the right exchange based on a custom Cloud Event extensions as shown in the previous code snippet.

Alternative Solutions

This example makes use of Debezium Embedded for providing a change data capture and message relay solution. This works fine for technologies supported by Debezium through its connectors.

For non-supported providers, alternative approaches can be applied:

  • DynamoDB Streams: Suitable for DynamoDB databases, in combination with Kinesis, it can be used for subscribing to changes in a DynamoDB (outbox) table.
  • Custom database polling: Could be implemented for supporting databases with no connectors for Debezium.

Adding any of those alternatives in this example, would imply just providing specific implementations of the MessageRelay interface, without additional changes in any of the other services.

Conclusion

Ensuring consistency in the state propagation and data exchange between services is key for providing a reliable distributed solution. Usually this is not carefully considered when designing a distributed, event-driven software, leading to undersired states and situations especially when those systems are already in production.

By the combination of the transactional outbox pattern and message relay, we have seen how this consistency can be enforced, and by using hexagonal architecture style, the additional complexity of implementing those pattern can be easily hidden and reused across bounded context implementations.

The code of this article is not yet production ready, concerns like observability, retries, scalability (e.g. with partitioning), proper container orchestration, etc. are still pending. Subsequent articles will go through those concerns using the provided code as a basis.


Emeric Tabakhoff

Remote Database performance and HA expert for Postgres & MySQL | I help your company scale to thousands of users ?? keep existing users ?? & protect their data ??? #Postgres #PostgreSQL #MariaDB #MySQL #DBA #Freelance

9 个月

sounds like a fun read. leveraging debezium engine is clutch for consistency. David Cano

Adam Vegvari

Creating affordable websites for business owners to enhance their online presence ?? Specializing in architects ??? | Co-Founder of @FlowPhoenix | Webflow developer and designer ??

9 个月

Great insights on state propagation in microservices! Debezium Engine and DDD concepts are invaluable.

要查看或添加评论,请登录

David Cano的更多文章

  • Clean code repository testing

    Clean code repository testing

    Many of the software solutions that are usually developed in the enterprise context, have some sort of state that needs…

  • Tracking Software Architecture Decisions

    Tracking Software Architecture Decisions

    Maybe this sounds familiar to you. Joining a new software engineering company, or moving from your current team to a…

社区洞察

其他会员也浏览了