Implementing CDC in Spring Boot using PostgreSQL and Debezium
Introduction
In microservices based architectures, it is common for changes in service data to be published as change events to trigger actions on other services. Transaction log tailing, also known as Change Data Capture (CDC), is an orchestration of processes to read database transaction logs and publish data change events to a message broker. As you may have guessed, mining database transaction logs is not an easy feat. Luckily there are multiple well known CDC software to solve this problem. In this article, I will walk you through a proof of concept CDC implementation using Spring Boot, PostgreSQL, and Debezium. You will gain exposure to CDC, PostgreSQL logical replication, Debezium, and Apache Kafka Connect. I hope this article helps you in your software development journey.
Background and Context
?While the use of CDC within microservices is a relatively new concept, some of the underlying mechanisms to support CDC processes have already been in place. Database vendors solved high availability, disaster recovery, performance problems with data replicated to multiple instances. Developments in data replication space was first started with snapshot replication where a snapshot of the data got simply copied over from one database to another. This was the go to method for data warehousing, disaster recovery, and point-in-time based backups. Emergence of distributed systems and cloud computing required real-time (or near real time) data synchronization to support high availability across multiple regions. This is when the industry came up with transaction log based streaming replication models and later CDC. In CDC, changes are monitored at the source database and applied to destination databases in real-time. This is important because Debezium make use of these replication mechanisms to monitor changes in the source database.
?Debezium PostgreSQL connector relies on PostgreSQL's logical replication architecture. Logical replication can be fine-tuned to include/exclude tables/databases and DDL changes are not published due to lack of logical decoding support. Instead of sending every change pre/post commit, logical replication only publish committed changes. Write Ahead Logs are stored in a raw format and are not human readable by default. In order to publish events for a logical replication publication, PostgreSQL uses logical decoding output plug-ins to decode Write Ahead Log (WAL) contents. There are many logical decoding plug-ins available for PostgreSQL, however, Debezium is only compatible with pgoutput and decoderbufs plug-ins. Pgoutput is the default logical decoding plug-in that comes with PostgreSQL. Decoderbufs is a protobuf based library maintained by Debezium community. For the sake of simplicity, I chose to use the pgoutput plug-in in my implementation. Now that we have a better understanding of how Debezium utilizes PostgreSQL logical replication, let's move on to the project setup.
Components
?
Architecture
System requirements and dependencies
You will need a computer that is good enough to run 8 default spec'd docker containers along with a spring boot application. You will also need to install the dependencies below. ?
I started from a template Spring Boot application from Spring Initializer with Lombok, Spring Data JPA, Spring Web, and Flyway Migration dependencies as the base. Next item to tackle is setting up the infrastructure.
PostgreSQL configuration for CDC?
Let's create a docker-compose file, add a postgresql container, and configure the application for database connection. Before we create a logical replication publication, we need to instruct our PostgreSQL server to use logical decoding with Write Ahead Logs. This is done by setting the wal_level configuration parameter of database server to 'logical'. We need to be cognizant of our replication slots and closely monitor them for status and replication lags. A replication slot can lead to significant memory consumption if not managed properly. When a replication consumer is inactive, replication slot will hold on to WAL data and cause disk space to fill up on the server. 'max_slot_wal_keep_size' is used to limit the amount of WAL data a replication slot can retain, default value is –1. This can cause unlimited amount of WAL files to be retained if a consumer becomes unavailable indefinitely. Let's limit max retained WAL data for a replication slot to 1024MB. 'wal_keep_size' sets minimum size of past WAL files, default is zero, meaning no WAL data is retained once the consumer reads from the publication. To allow for graceful recovery in cases where Debezium temporarily pause or crash, let's retain 256MB of past WAL data. While it is not implemented in this POC, another important setting to mention is the 'heartbeat.interval.ms'. WALs are shared by the database, if there is a high traffic database and Debezium is tracking a tiny amount of the changes, WALs are retained until Debezium is invoked. To solve this, a heartbeat table and publication have to be created for Debezium to periodically invoke change events so database can be relieved of extra WAL segments.
In this POC, Debezium is configured to access the database as the superuser. It is strongly encouraged that you provide only required? grants to Debezium user in real implementations. See postgresql-permissions section of the Debezium documentation for more information.?
By default, Debezium connector tries to create publications for all databases and tables automatically. This is obviously not desired in production setting. So let's simulate a real word requirement and say that we are only interested in changes from 'order' table. To facilitate that, we need to create a publication in the database for 'order' table prior to configuring Debezium connector. At this point, we don't even have a database or a table yet. Luckily, we can configure flyway to run a migration script to create the 'order' table and 'order_publication' on the database when application starts.
Apache Kafka Cluster Setup?
Apache Kafka is an event streaming platform and is used by Debezium to publish CDC events. After a series of unsuccessful attempts to use Debezium provided Kafka images, I moved on to official Kafka docker image repository and used multi-node isolated broker/controller cluster setup. As a requirement for our Debezium Kafka integration, Kafka broker instances have to be configured to allow automatic creation of topics. This is the only additional setting we need for our Kafka cluster.?
Debezium Connector Setup?
Official Debezium connector images are hosted in quay. Let's add a Debezium connect container and configure it. Debezium Connect instance MUST be able to connect to Kafka brokers to start. This is because Debezium use Kafka topics to manage connector state and distribute work across multiple brokers.?
CONFIG_STORAGE_TOPIC
Stores connector and task configurations, this allows Debezium to restore configurations after restarts.
OFFSET_STORAGE_TOPIC
Tracks the position in source database log. Debezium use Log Sequence Number (LSN) for PostgreSQL log tracking, this allows Debezium to continue publishing from last position after restarts.
STATUS_STORAGE_TOPIC
Tracks runtime status of connectors and tasks.
?Adding Debezium Connect instance is the final item of our infrastructure setup. We can run docker-compose up and confirm everything is up and running. After running the application, flyway will run and create the order table and publication. At this point, Debezium Connect instance needs to be configured for reading changes from a source database. Let's break down our Debezium connector configuration.
{
"name": "order-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1", // PostgreSQL connector always uses a single task
// DB connection settings
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname": "orderdb",
// Replication settings
"database.server.name": "orderdb_server", // database server name
"plugin.name": "pgoutput", // PostgreSQL logical decoding plugin
"slot.name": "debezium_slot", // Replication slot name
"publication.name": "order_publication", // publication name
// Filtering publications
"publication.autocreate.mode": "filtered", // do not auto create publications
"schema.include.list": "orderdb", // include only these schemas
"table.include.list": "orderdb.order", // include only these tables
"topic.prefix": "cdc" // Kafka topic prefix for generated events
}
}
After sending the configuration to Debezium connect instance as a request, we have a fully operational CDC pipeline.
Spring Boot Application Structure?
Purpose of this POC is to display CDC events resulting from changes to 'order' table of the PostgreSQL database to user. OrderService provides CRUD endpoints to manipulate order data. OrderEventListener logs CDC events received from Kafka. In real implementations, this is where your application would react to a data change event. OrderRepository is the CRUD repository for Order entity.?
Error Handling and Recovery Strategies
Debezium is impressively fault tolerant against common failure scenarios and provide at-least-once delivery guarantees.?
Apache Kafka
When Kafka brokers become unavailable, Debezium connector repeatedly tries to reconnect to Kafka and when connection becomes available again, it continues publishing from last stored offset in Kafka as configured by the OFFSET_STORAGE_TOPIC. If Kafka brokers terminated gracefully, offsets are properly handed over from one Kafka broker to another, however, sometimes brokers crash and next broker may continue from an offset that was stored prior to crash. This may cause consumers seeing duplicate events. Debezium recommend consumers to be idempotent and use LSN in PostgreSQL CDC messages as the idempotency key.?
PostgreSQL
This POC use a standalone PostgreSQL instance. When working with a PostgreSQL cluster, handling replication slot failures varies by PostgreSQL version. Prior to version 16, replication slots are only available on primary servers and a new primary have to be promoted in case of a failure. For version 16 and newer, replication slots can be created on replicas as well but replication slots have to be kept in sync manually between the primary and the replica. Since PostgreSQL 17, replication slots are configurable for automatic failover from a primary server to replica.
Demonstration of CDC Event
Sending an OrderRequest to 'https://localhost:8081/order-api/orders' OrderService result in following change event logged by the OrderEventListener. Follow the instructions in README to test other endpoints and change events.
{
"schema": { … }, // contains CDC event data definition
"payload": {
"before": null, // data before change
"after": { // data after change
"id": "975dd3d4-399a-4c0b-ac2d-5dd7bf8af097",
"order_date": 1741544097702314,
"customer_name": "Omer Kocaoglu",
"customer_email": "[email protected]",
"customer_address": "{\"zip\": \"62701\", \"city\": \"Springfield\", \"state\": \"IL\", \"street\": \"1234 Elm St\"}",
"status": "PENDING"
},
"source": { // information about the event source
"version": "3.0.7.Final",
"connector": "postgresql",
"name": "cdc",
"ts_ms": 1741558497703, // time change was recorded in database
"snapshot": "false",
"db": "orderdb",
"sequence": "[\"26474864\",\"26474864\"]",
"ts_us": 1741558497703185,
"ts_ns": 1741558497703185000,
"schema": "orderdb",
"table": "order",
"txId": 748,
"lsn": 26474864, // PostgreSQL WAL LSN
"xmin": null
},
"transaction": null, // transaction information in case this change happened within a transaction
"op": "c", // type of operation, 'c' stands for create
"ts_ms": 1741558497856, // time Debezium processed change event
"ts_us": 1741558497856396,
"ts_ns": 1741558497856396671
}
}
Update and delete events can be observed the same way by sending requests to OrderService. 'op' field is the operation type and values are the following.?
'source.ts_ms' is the timestamp of change recorded in PostgreSQL and 'ts_ms' is the timestamp of Debezium processing the change event. In the example above, we can see a 153 miliseconds lag between Debezium and PostgreSQL. This is most likely because we did not set up 'hearbeat.interval.ms'. Another important thing to note here is the 'source.lsn', each change event have a unique LSN on PostgreSQL. Consumers should use LSN for idempotency in real world implementations.?
Operational Considerations?
PostgreSQL upgrade process removes replication slots, follow this procedure to prevent data loss or other potential undesired outcomes. While Debezium is convenient, developers must make sure that configurations for all services are correct and proper monitoring is in place. This POC only demonstrate health and status endpoints of Debezium Connect container. See PostgreSQL monitoring to learn about monitoring for production environments.?
Summary and Conclusion?
Debezium allow developers to implement CDC using existing infrastructure (maybe plus a Kafka cluster) for their microservices relatively easily. However, trade-off is that running Debezium in production requires us to be well versed in PostgreSQL WAL configurations, logical replication, and Apache Kafka. I hope this article was helpful to you. Let me know in the comments!