Mongodb database migration and consolidation using Apache Kafka + debezium connectors

Mongodb database migration and consolidation using Apache Kafka + debezium connectors

In this article, we will use Apache Kafka and the Debezium Connector to demonstrate how to merge multiple source MongoDB databases into a single target database. I've set up three MongoDB replica clusters for this demonstration; two will serve as the source, and the third as the destination.

Existing MongoDB Due to the oplog's requirement that members of a replica set share a common history of changes, mongodb replica cannot simultaneously process changes from multiple master nodes. Therefore, we are conducting proof-of-concept research into how to combine data from two separate replica sets.

MongoDB is a popular NoSQL document-oriented database that stores data in JSON-like format. It is known for its scalability, flexibility, and ease of use in building modern applications.

MongoDB Replica Set is a set of MongoDB servers that provides high availability and fault tolerance for the database. It achieves this by maintaining multiple copies of the same data and automatically promoting a secondary node to primary in case of primary node failure.


No alt text provided for this image

Although the Debezium MongoDB connector does not become part of a replica set, it uses a similar replication mechanism to obtain oplog data. The main difference is that the connector does not read the oplog directly. Instead, it delegates the capture and decoding of oplog data to the MongoDB change streams feature. With change streams, the MongoDB server exposes the changes that occur in a collection as an event stream

In this setup we have hosted mangodb instances (cluster-1, cluster-2) in GCP GCE (Note : Cluster-1 and Cluster-2 we used as source). Also we have created cluster-3 as mongodb Target.

Architecture is look like below

No alt text provided for this image

We have used debezium connector to pull mongodb oplog data to kafka cluster which is hosted on top GKE.

About debezium connector : Debezium’s MongoDB connector tracks a MongoDB replica set or a MongoDB sharded cluster for document changes in databases and collections, recording those changes as events in Kafka topics. The connector automatically handles the addition or removal of shards in a sharded cluster, changes in membership of each replica set, elections within each replica set, and awaiting the resolution of communications problems.


I'm not going to go into detail about how to set up a kafka cluster here. We use strimizi kafka to set up pipelines.

for installation steps follow below link

  1. https://strimzi.io/docs/operators/in-development/full/deploying.html
  2. https://dzone.com/articles/grafana-and-prometheus-setup-with-strimzi-aka-kafk

Therefore, we must essentially deploy source and target connectors to the Kafka cluster. Each source should have at least one source connector, and if more flexibility is required, each collection can have its own source connector. In the ideal case, one mongodb source = one kafka source connector.

We have created the following.yaml file for the source connector.

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: source-mongodb-1
  annotations:
    strimzi.io/restart: "true"
  labels:
    strimzi.io/cluster: kafka-connect
spec:
  class: io.debezium.connector.mongodb.MongoDbConnector
  tasksMax: 1
  config:
    topic.creation.enable: true
    tasks.max: 1
    topic.prefix: mongodbserver1
    topic.creation.default.replication.factor: -1
    topic.creation.default.partitions: 10
    topic.creation.default.cleanup.policy: compact 
    topic.creation.default.compression.type: lz4
    mongodb.hosts: rs0/34.93.87.221:27017
    mongodb.user: deb_src_usr
    mongodb.password: abc1234
    database.include.list: sourcedb1
    schema.history.internal.kafka.bootstrap.servers : kafka:9092
    key.converter: org.apache.kafka.connect.json.JsonConverter
    key.converter.schemas.enable: true
    value.converter: org.apache.kafka.connect.json.JsonConverter
    value.converter.schemas.enable: true
    snapshot.mode: initial        

For second mongodb cluster (No configuration difference)

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: source-mongodb-2
  annotations:
    strimzi.io/restart: "true"
  labels:
    strimzi.io/cluster: kafka-connect
spec:
  class: io.debezium.connector.mongodb.MongoDbConnector
  tasksMax: 1
  config:
    topic.creation.enable: true
    tasks.max: 1
    topic.prefix: mongodbserver2
    topic.creation.default.replication.factor: -1
    topic.creation.default.partitions: 10
    topic.creation.default.cleanup.policy: compact 
    topic.creation.default.compression.type: lz4
    mongodb.hosts: rs1/34.93.197.152:27017
    mongodb.user: deb_src_usr2
    mongodb.password: abc1234
    database.include.list: sourcedb2
    schema.history.internal.kafka.bootstrap.servers : kafka:9092
    key.converter: org.apache.kafka.connect.json.JsonConverter
    key.converter.schemas.enable: true
    value.converter: org.apache.kafka.connect.json.JsonConverter
    value.converter.schemas.enable: true
    snapshot.mode: initial        

We used below command to deploy source connector and sink connector

kubectl apply -f "/path/file.yaml"        

After the source connector was set up, Kafka instantly made topics to store data. As a second step, we set up sink connectors.

Basically, each collection?needs a sink connector, so if we have n mongodb db collections,?we need n sink connectors.

Sink connector-1 configuration (which is coming from cluster-1 mongodb).

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: sink-mongodb-1
  annotations:
    strimzi.io/restart: "true"
  labels:
    strimzi.io/cluster: kafka-connect
spec:
  class: com.mongodb.kafka.connect.MongoSinkConnector
  tasksMax: 2
  config:
    tasks.max: 1
    connection.uri: mongodb://34.100.240.170:27017/?user=deb_tgt_usr&password=abc1234
    topics: mongodbserver1.sourcedb1.sales
    database: sinkdb1
    collection: sales
    key.converter: org.apache.kafka.connect.json.JsonConverter
    key.converter.schemas.enable: true    
    value.converter: org.apache.kafka.connect.json.JsonConverter
    value.converter.schemas.enable: true        

Sink connector-2 configuration (which is coming from cluster-2 mongodb).

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: sink-mongodb-2
  annotations:
    strimzi.io/restart: "true"
  labels:
    strimzi.io/cluster: kafka-connect
spec:
  class: com.mongodb.kafka.connect.MongoSinkConnector
  tasksMax: 2
  config:
    tasks.max: 1
    connection.uri: mongodb://34.100.240.170:27017/?user=deb_tgt_usr&password=abc1234
    topics: mongodbserver2.sourcedb2.training
    database: sinkdb1
    collection: training
    key.converter: org.apache.kafka.connect.json.JsonConverter
    key.converter.schemas.enable: true    
    value.converter: org.apache.kafka.connect.json.JsonConverter
    value.converter.schemas.enable: true2        

above two sink connector target instance is same, only difference is the connected topic.

After deployment of this sink connector, our pipeline started flowing data from source to target.

In a nutshell, the CDC feature allowed two different MongoDB sources to send data to a single MongoDB target. In general mongodb replica cant take two master node data. This is the rationale that underlies the poc.

By utilising the mechanisms described above, we were able to successfully achieve mongodb consolidation of several into one.

要查看或添加评论,请登录

Jisto Jose的更多文章

社区洞察

其他会员也浏览了