Using Postgres DB Debezium connector for Change Data Capture in kafka.

In this tutorial, it demonstrates how to use Debezium to monitor a postgres database. As the data in the database changes, you will see the resulting event streams.

?

Debezium Architecture

?

Most commonly, you deploy Debezium by means of Apache?Kafka Connect. Kafka Connect is a framework and runtime for implementing and operating:

·?????? Source connectors such as Debezium that send records into Kafka

·?????? Sink connectors that propagate records from Kafka topics to other systems

The following image shows the architecture of a change data capture pipeline based on Debezium:

?


Kafka Connect operates as a separate service besides the Kafka broker.

By default, changes from one database table are written to a Kafka topic whose name corresponds to the table name. If needed, you can adjust the destination topic name by configuring Debezium’s?topic routing transformation. For example, you can:

·?????? Route records to a topic whose name is different from the table’s name

·?????? Stream change event records for multiple tables into a single topic

After change event records are in Apache Kafka, different connectors in the Kafka Connect eco-system can stream the records to other systems and databases such as Elasticsearch, data warehouses and analytics systems, or caches such as Infinispan. Depending on the chosen sink connector, you might need to configure Debezium’s?new record state extraction?transformation. This Kafka Connect SMT propagates the?after?structure from Debezium’s change event to the sink connector. This is in place of the verbose change event record that is propagated by default.

?

?

?

Start zookeeper and kafka.

?

Install and use Debezium connectors.

?

Download Debezium connector plug-in archives (see below), extract their files into your Kafka Connect environment, and add the parent directory of the extracted plug-in(s) to Kafka Connect’s plugin path.?

?

Connector Archive

https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/2.3.2.Final/debezium-connector-postgres-2.3.2.Final-plugin.tar.gz

?

In our case, we downloaded the Debezium MySQL connector archive and extracted its contents to /software/plugins/.

?

#tar -xvf debezium-connector-postgres-2.3.2.Final-plugin.tar -C /software/plugins/

?


?

Then you’d specify the following in the worker config:

?

plugin.path=/software/plugins/

?

?

At this point, you have started ZooKeeper and Kafka, but you still need a database server from which Debezium can capture changes. In this procedure, you will start a Postgres DB server with an example database.

?

?

?

?

On the postgres DB server (14V).

?

Start Postgress DB CLI:

?

You can verify the postgres DB server status.

?

systemctl status postgresql-14

?

Debezium requires wal_level to be?logical:

?

su? postgres

?

ALTER SYSTEM SET wal_level = logical;

?


?

After restart

?

?

?

?


?

?

Create a DB, mydb

?

su? postgres

?

createdb mydb

?


?

psql mydb or

?

Use mydb

?

Create a table,leads and insert few records: Execute all the bellows SQL commands using Postgresdb CLI.

?

#CREATE TABLE leads (id INTEGER PRIMARY KEY, name VARCHAR);

?

Insert few records:

?

insert into leads values (1,'Histrik P');

insert into leads values (2,'Riz P');

insert into leads values (3,'Henvor P');

insert into leads values (4,'Tidrik P');

Verify the records:

?

select * from leads;

?


Starting the JDBC Connector – On the Kafka Node.

?

Start the schema registry

?

#schema-registry-start /opt/confluent/etc/schema-registry/schema-registry.properties

?


?

Start the kafka connector – On a separate terminal.

?

Go to the bin folder of the kafka installation or specify the full path.

?

#/opt/kafka/bin/connect-distributed.sh /software/connect-distributed.properties

?


?

The configuration for the plugin is stored in ?/software/deb-source.json?file on kafka broker server.

It contents is as follows: ( replace localhost with container IP of postgress DB if you are using docker)

?

{

??? "name": "deb_source_connector_postgresql_01",

??? "config": {

????? "connector.class": "io.debezium.connector.postgresql.PostgresConnector",

????? "tasks.max": "1",

????? "database.hostname": "postgresdb0",

????? "database.port": "5432",

????? "database.user": "postgres",

????? "database.password": "postgres",

????? "database.dbname" : "mydb",

????? "database.server.name": "postgresdb0",

????? "topic.prefix": "postgres-01",

????? "plugin.name": "pgoutput",

????? "slot.name" : "myslot"

? }

}

?

As we operate on distributed mode; we run the connectors by calling REST endpoints with the configuration JSON. We can specify the configuration payload from a file for?curl?command. The following command starts the connector. Execute the following command from the directory you have store the configuration json file.

On a separate terminal.

#cd /software

#curl -d @"deb-source.json" \

-H "Content-Type: application/json" \

-X POST https://localhost:8083/connectors

?


?

On the connector console.

?


?

After running the connector we can confirm that connector's REST endpoint is accessible, and we can confirm that JDBC connector is in the plugin list by calling

https://localhost:8083/connector-plugins

?

# curl https://localhost:8083/connector-plugins

?


?

Get a list of active connectors.

?

#curl https://localhost:8083/connectors

?

You can get the status of the connectors

?

#curl https://localhost:8083/connectors/deb_source_connector_postgresql_01/status

?

Output should be as shown below:

?


?

We can see that in postgres database table leads is loaded to? kafka topic- postgres-01-leads:

?

#/opt/kafka/bin/kafka-topics.sh --list --bootstrap-server kafka0:9092

?


?

?

Understand the configuration of the topic.

?

#/opt/kafka/bin/kafka-topics.sh --describe --bootstrap-server kafka0:9092 --topic postgres-01.public.leads

?

?

?

And each row in the tables is loaded as a message. You can verify using the consumer console.

?

#/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server kafka0:9092 --topic postgres-01.public.leads --from-beginning

?


?

The event has two parts: a?schema?and a?payload. The?schema?contains a Kafka Connect schema describing what is in the payload.

?


?

The?payload?has a id?field, with a value of?2. And name field with a value Riz P.

?



?

Op : A required field that contains a string value describing the type of operation. r = Snapshot

?

?

?

?

?

Updating the database and viewing the update event.

?

#update leads set name = 'Riz P I' where id = 2;

?

Verify the table data.

?


?

?

By changing a record in the?leads?table, the Debezium postgresdb connector generated a new event. You should see two new JSON documents: one for the event’s?key, and one for the new event’s?value.

Here are the details of the?key?for the?update?event:

?


?

You can observe that op = u -> represents update.

?

Insert a new record.

?

#insert into leads values (5,'Del R');

?



?

You can observe in the consumer output console. Op-> C – create or insert.

?

Now that you have seen how the Debezium connector captured the?create?and?update events in the?leads?table, you will now delete one of the records and see how the connector captures it.

?

Delete the record.

?

# delete from leads where id = 5;

?


?

On the topic consumer console.

?


?

Operation -> d ; delete.

Now that you have seen how the Debezium connector captures?create,?update, and?delete?events, you will now see how it can capture change events even when it is not running.

?

The Kafka Connect service automatically manages tasks for its registered connectors. Therefore, if it goes offline, when it restarts, it will start any non-running tasks. This means that even if Debezium is not running, it can still report changes in a database.

?

Stop the kaka connect console.

?


?

Insert two values.

?

insert into leads values (6,'New R 1');

insert into leads values (7,'New R 2');

?


?

The records are added to the database. However, because Kafka Connect is not running , topic console does not record any updates.

?

# select * from leads;

?


?

Restart the conector.

?


?

Switch to the terminal running?consumer?to see events for the two new records you created when Kafka Connect was offline:

?


?

These events are?create?events that are similar to what you saw previously. As you see, Debezium still reports all of the changes in a database even when it is not running.

要查看或添加评论,请登录

Henry Potsangbam的更多文章

社区洞察

其他会员也浏览了