Change Data Capture (CDC)

Change Data Capture (CDC)

Change data capture utilizes the SQL Server Agent to log insertions, updates, and deletions occurring in a table. So, it makes these data changes accessible to easily consumed using a relational format. Click here for more details.

Configure for CDC

Note: We are using SQL Server as a database for all our CDC activity with sysadmin permission.

Pre-requisite

Change data capture is only available in the Enterprise, Developer, Enterprise Evaluation, and Standard editions.

Create or Find the database in which you want to enable CDC and execute the below script (easier when you use Microsoft SQL Server Management Studio. If you don't have one, install it from here https://aka.ms/ssmsfullsetup). Just make sure SQL Server Agent is enabled which you can do from Windows Services (type services.msc in run window [? + R])

Note: All future references in this document include this database and tables.

USE cdcexample 
GO
EXEC sys.sp_cdc_enable_db
GO        

Note: Here cdcexample is the name of the database. Please replace it with your database name.

If you wish to disable the CDC, you can use the below script:

USE cdcexample
GO
EXEC sys.sp_cdc_disable_db
GO        

Now your database has enabled CDC, this is the time to identify which table that you want to capture activities (insert, update, delete DML operations) and use the below script to enable CDC on the table:

USE cdcexample
GO
EXEC sys.sp_cdc_enable_table
@source_schema = N'dbo',
@source_name   = N'Employee', --table name
@role_name     = N'MyRole', --A role for controlling access to a change table, if you don't want to grant any role, simply assign NULL
@supports_net_changes = 1 -- if it is set to 1, a net changes function is also generated for the capture instance. This function returns only one change for each distinct row changed in the interval specified in the call.
GO        

Disabling CDC

If you want to disable the CDC on the table, use the below script:

USE cdcexample
GO
EXEC sys.sp_cdc_disable_table
@source_schema = N'dbo',
@source_name   = N'Employee',
@capture_instance = N'dbo_Employee' -- this you can find in the change_tables entity by expanding your database explorer and System Tables folder in the tree view.
GO        


Change Data Capture Setup

After enabling CDC on both you should see a minimum set of tables created under System Tables with cdc schema:

1. cdc.captured_columns

2. cdc.change_tables

3. cdc.dbo_Employee_CT

4. cdc.ddl_history

5. cdc.index_columns

6. cdc.lsn_time_mapping

7. dbo.systransschemas

Try to insert a row in the employee table which has the following schema definitions

Table: Employee
[Id] [int] IDENTITY(1,1) NOT NULL,
[FirstName] [nvarchar](50) NOT NULL,
[MiddleName] [nvarchar](50) NULL,
[LastName] [nvarchar](50) NOT NULL,
[DesignationId] [int] NOT NULL,        

Now insert a row to the Employee table

INSERT INTO dbo.Employee VALUES('John','N.','Doe',1);
INSERT INTO dbo.Employee VALUES('Sally','J.','Smith',1);        

Output:

Now let's update one:

Update [dbo].[Employee] SET
FirstName = 'Johnn',
LastName = 'Doee'
WHERE ID = 1        

Output:

That's all we need to just enable a basic change data capture feature on a database and tables.

In case if you would like to know the which column exactly has changed, use below query:

DECLARE @begin_time datetime
DECLARE @end_time datetime
DECLARE @from_lsn binary(10)
DECLARE @to_lsn binary(10);
SET @begin_time = GETDATE()-1;
SET @end_time = GETDATE();
SELECT @from_lsn = sys.fn_cdc_map_time_to_lsn('smallest greater than', @begin_time);
SELECT @to_lsn = sys.fn_cdc_map_time_to_lsn('largest less than or equal', @end_time);
DECLARE @Col_Id INT
DECLARE @Col_FirstName INT
DECLARE @Col_MiddleName INT
DECLARE @Col_LastName INT
DECLARE @Col_DesignationId INT
SET @Col_Id = sys.fn_cdc_get_column_ordinal('dbo_Employee','Id')
SET @Col_FirstName = sys.fn_cdc_get_column_ordinal('dbo_Employee','FirstName')
SET @Col_MiddleName = sys.fn_cdc_get_column_ordinal('dbo_Employee','MiddleName')
SET @Col_LastName = sys.fn_cdc_get_column_ordinal('dbo_Employee','LastName')
SET @Col_DesignationId = sys.fn_cdc_get_column_ordinal('dbo_Employee','DesignationId')
SELECT *, sys.fn_cdc_is_bit_set(@Col_Id, __$update_mask) AS 'Updated_Id'
,sys.fn_cdc_is_bit_set(@Col_FirstName, __$update_mask) AS 'Updated_FirstName'
,sys.fn_cdc_is_bit_set(@Col_MiddleName, __$update_mask) AS 'Updated_MiddleName'
,sys.fn_cdc_is_bit_set(@Col_LastName, __$update_mask) AS 'Updated_LastName'
,sys.fn_cdc_is_bit_set(@Col_DesignationId, __$update_mask) AS 'Updated_DesignationId'
FROM cdc.fn_cdc_get_all_changes_dbo_Employee(@from_lsn, @to_lsn, 'all');        

Data Utilization

Problem

Let's look at what we have been doing. Whenever we need to gather change information, we transfer batches of data from source to destination at regular intervals or take a backup of tables where data are residing and later we restore to the location where we want. The value we are missing here is real-time analytics and this might be the issue in terms of resource utilization and data inconsistencies in some cases.

Solution

Consume Change Data

Debezium is an open-source distributed platform to capture changes on your database. Debezium supports many relational and non-relational databases. It can report any big data storage systems, like Azure SQL, Big Data etc. The good thing with Change Data Capture is that you have data available in real time; now we need to make it feasible to consume the data in real time. For more info follow Debezium doc.

There are the following supported database list that works with Debezium:

1. MySQL

2. SQL Server

3. Oracle DB

4. DB2

5. MongoDB

6. Cassandra

7. PostgreSQL

8. Vitess

Why Choose Debezium for CDC and Database Replication?

Debezium’s flexibility, lightweight architecture, and low latency streaming make it a popular choice for CDC. It is also fairly easy to integrate into modern data stacks. Key benefits include:

1. Support for a wide range of databases: Debezium has connectors for MongoDB, MySQL, PostgreSQL, SQL Server, Oracle, Db2, and Cassandra, with additional sources currently incubating.

2. Open source: Debezium is open source under the Apache 2.0 license and backed by a strong community.

3. Low latency: The architecture is lightweight and specifically designed for streaming data pipelines.

4. Pluggable: Debezium works with popular infrastructure tools such as Kafka and Docker.

5. Handling schema changes: Depending on the specific database connector, Debezium will typically provide some level of automation for handling schema changes. Note this is only on the source level and is not propagated downstream (as we explain below).

Debezium In Action

To bring Debezium into action you need the following services.

1. Zookeeper: ZooKeeper is used in distributed systems for service synchronization and as a naming registry. When working with Apache Kafka, ZooKeeper is primarily used to track the status of nodes in the Kafka cluster and maintain a list of Kafka topics and messages.

2. Kafka: Apache Kafka is a distributed event store and stream-processing open-source platform.

ZooKeeper isn’t memory intensive when it’s working solely with Kafka. Much like memory, ZooKeeper doesn’t consume CPU resources heavily. However, it is best practice to provide a dedicated CPU core for ZooKeeper to ensure there are no issues with context switching.

With the popular gain of docker, spinning such systems has become easy, and don't have maintain dedicated systems to spin those services. This is one of the cost-effective solutions that uses docker configuration to spin containers and utilize them for our needs. Below is the docker-compose file to run the containers and establish communication with each other.

docker-compose.yaml

version: '3'
services:
  # Zookeeper, single node
  zookeeper:
    image: wurstmeister/zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - 2181:2181
      - 2888:2888
      - 3888:3888
  # kafka single node     
  kafka:
    image: wurstmeister/kafka:latest
    restart: "no"
    links:
      - zookeeper
    ports:
      - 9992:9992
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENERS: INTERNAL://:29092,EXTERNAL://:9992
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:29092,EXTERNAL://localhost:9992
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
  #kafdrop for topic/msg visualization
  kafdrop:
    image: obsidiandynamics/kafdrop
    restart: "no"
    environment:
      KAFKA_BROKERCONNECT: "kafka:29092"
    ports:
      - 9010:9000
    depends_on:
      - kafka

  #Refer https://hub.docker.com/r/debezium/example-postgres/tags?page=1&name=1.9 for basic idea
  sqlserverdebezium:
    image: quay.io/debezium/connect:latest
    links:
     - kafka
    ports:
      - 8083:8083
    environment:
     - BOOTSTRAP_SERVERS=kafka:29092
     - GROUP_ID=1
     - CONFIG_STORAGE_TOPIC=my_connect_configs
     - OFFSET_STORAGE_TOPIC=my_connect_offsets
     - STATUS_STORAGE_TOPIC=my_connect_statuses
    volumes:
     #https://debezium.io/documentation/reference/2.5/connectors/sqlserver.html
     - ./connectors/debezium-connector-sqlserver:/kafka/connect/debezium-connector-sqlserver #first part (before colon), it is the local directory relative connector path of your docker-compose        

To spin the container from the above docker-compose.yml file use docker-compose up -d command to execute from your terminal.

If all goes well you can see all containers running successfully and looks like almost similar to below screen. If you don't have Docker for Windows, then you need one that can download when you click here.

Once all services are running, this is the time to configure the connector. We are using Microsoft SQL connector which you can download when you click here

Configure the Connector

To configure the connector, you can use any API client tool for ex. Postman, Bruno etc. and configure the endpoint with below given details:

POST - https://localhost:8083/connectors        

Send the below JSON to Request Body

{
    "name": "cdcexample-connector",
    "config": {
        "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
        "database.hostname": "<Sql Server Name>",
        "database.port": "1433",
        "database.user": "<Sql Server User Name>",
        "database.password": "<Password>",
        "database.dbname": "cdcexample",
        "database.names": "cdcexample",
        "database.server.name": "<Sql Server Name>",
        "table.include.list": "<Table Name>", //in case of multiple, you separate them with comma
        "schema.history.internal.kafka.bootstrap.servers": "kafka:29092",
        "schema.history.internal.kafka.topic": "schemahistory.fullfillment",
        "topic.prefix": "cdcexample",
        "database.trustServerCertificate": true //not suggested for production envs.
    }
}        

After the connection is established, you can verify the connection using https://localhost:8083/connectors/cdcexample-connector/status endpoint using GET http method.

You can verify your message cluster in your kafka message explorer which was spun with your docker-compose as a container called kafdrop. If you are using the same name as this example, you should be able to explore your message from the below URL.

https://localhost:9010/topic/cdcexample.cdcexample.dbo.Employee/messages?partition=0&offset=0&count=100&keyFormat=DEFAULT&format=DEFAULT


Mirror to Azure Event Hub

About MirrorMaker 2.0 (MM2)

Apache Kafka MirrorMaker 2.0 (MM2) is designed to make it easier to mirror or replicate topics from one Kafka cluster to another. Mirror Maker uses the Kafka Connect framework to simplify configuration and scaling. For more detailed information on Kafka MirrorMaker, see the Kafka Mirroring/MirrorMaker guide

As Azure Event Hubs is compatible with Apache Kafka protocol, you can use Mirror Maker 2 to replicate data between an existing Kafka cluster and an Event Hubs namespace.

Mirror Maker 2 dynamically detects changes to topics and ensures source and target topic properties are synchronized, including offsets and partitions. It can be used to replicated data bi-directionally between Kafka cluster and Event Hubs namespace.

Create an Event Hubs namespace

An Event Hubs namespace is required to send and receive from any Event Hubs service. See Creating an event hub for instructions to create a namespace and an event hub. Make sure to copy the Event Hubs connection string for later use.

Clone the example project

Now that you have an Event Hubs connection string, clone the project Azure Event Hubs for Kafka repository and navigate to the kafka\config subfolder:

Configure Kafka Mirror Maker 2

That project (Apache Kafka distribution) comes with kafka-console-consumer.bat and kafka-console-producer.bat scripts that are bundled with the Kafka library that implements a distributed Mirror Maker 2 cluster. It manages the Connect workers internally based on a configuration file. Internally MirrorMaker driver creates and handles pairs of each connector – MirrorSource Connector, MirrorSink Connector, MirrorCheckpoint Connector and MirrorHeartbeat Connector.

1. To configure Mirror Maker 2 to replicate data, you need to update Mirror Maker 2 configuration file kafka-to-eh-connect-mirror-maker.properties to define the replication topology.

2. In the kafka-to-eh-connect-mirror-maker.properties config file, define cluster aliases that you plan to use for your Kafka cluster(source) and Event Hubs (destination).

3. Then specify the connection information for your source, which is your Kafka cluster.

 source.bootstrap.servers = your-kafka-cluster-hostname:9092
    #source.security.protocol=SASL_SSL
    #source.sasl.mechanism=PLAIN
    #source.sasl.jaas.config=<replace sasl jaas config of your Kafka cluster>;        

4. Specify connection information for destination, which is the Event Hubs namespace that you created.

destination.bootstrap.servers = <your-enventhubs-namespace>.servicebus.windows.net:9093
destination.security.protocol=SASL_SSL
destination.sasl.mechanism=PLAIN
destination.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='$ConnectionString' password='<Your Event Hubs namespace connection string.';        

5. Enable replication flow from source Kafka cluster to destination Event Hubs namespace.

source->destination.enabled = true
source->destination.topics = .*        

6. Update the replication factor of the remote topics and internal topics that Mirror Maker creates at the destination.

 replication.factor=3
    checkpoints.topic.replication.factor=3
    heartbeats.topic.replication.factor=3
    offset-syncs.topic.replication.factor=3    
    offset.storage.replication.factor=3
    status.storage.replication.factor=3
    config.storage.replication.factor=3        

7. Then you copy kafka-to-eh-connect-mirror-maker.properties configuration file to the Kafka distribution's config directory and can run the Mirror Maker 2 script using the following command.

.\kafka\bin\kafka-console-consumer.bat --bootstrap-server cdceventhubns.servicebus.windows.net:9093 --topic cdcexample.cdcexample.dbo.Employee --consumer.config .\config\kafkaToAzureEventHub.properties

.\kafka\bin\kafka-console-producer.bat --bootstrap-server cdceventhubns.servicebus.windows.net:9093 --topic cdcexample.cdcexample.dbo.Employee --producer.config .\kafka\config\kafkaToAzureEventHub.properties        


8. Upon the successful execution of the script, you should see the Kafka topics and events getting replicated to your Event Hubs namespace.

9. To verify that events are making it to the Kafka-enabled Event Hubs, check out the ingress statistics in the Azure portal, or run a consumer against the Event Hubs.

Verify the event in Azure

1. Login to your Azure Credential

2. Choose your Event Hubs Namespace

3. There should be Event Hubs created with the name cdcexample.cdcexample.dbo.employee

5. Create a logic app or Azure function and use an Event hub trigger to consume payload.

要查看或添加评论,请登录

Rajeev Singh的更多文章

  • Your local AI Chat

    Your local AI Chat

    Are you concern about your data security and also wanted to use power of ChatGPT kind of great tool but fearing to give…

  • Master-Slave Databases

    Master-Slave Databases

    When data grows in your application, the performance deteriorates and that's true. You can't stop data from developing…

  • Sharding

    Sharding

    As you know, any system backbone is data and data grows everyday. According to statistics, by year 2028, the projected…

  • How Well Do You Define Your APIs?

    How Well Do You Define Your APIs?

    API stands for Application Programming Interface, but have you ever wondered how the concept originated? Let’s take a…

  • HTTP Protocol

    HTTP Protocol

    What is HTTP and how it works? Http developed by Tim Berners-Lee when he was in CERN year 1989-1991. Everyone has…

    1 条评论
  • On-Prem Database Deployment (DevOps)

    On-Prem Database Deployment (DevOps)

    Introduction If you are a developer and in the tech world, you must have heard about databases. This is the heart of…

    1 条评论
  • Docker - Easy Way

    Docker - Easy Way

    Definition Docker is a powerful tool that provides a platform to package solutions for deployment. We can think like a…

    1 条评论