Secure Change Data Capture with Docker, PostgreSQL, MongoDB, Kafka, and Debezium
Hello Everyone,
?? It's me The Mad Scientist "Fidel Vetino" living under the hood mechanically providing real solutions NOT theory. In the lab today working on a project to setting up a secure Change Data Capture (CDC) pipeline using Docker, PostgreSQL, MongoDB, Kafka, and Debezium.
I've created basic framework with coding, scripts, and database queries, ensuring compliance with security standards such as FISMA, NIST, FedRAMP, GDPR, HIPAA, and SOC 2. Security measures include TLS 1.3, AES-256 encryption, Role-Based Access Control (RBAC), Transparent Data Encryption (TDE), and secure communications between services. As you can see I went heavy on the security;
So let's jump in:
1. Set Up Docker Environment with Security Measures
Create a docker-compose.yml file to define all services with security configurations.
yaml
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.3.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ZOOKEEPER_SSL_KEYSTORE_LOCATION: /etc/kafka/secrets/kafka.server.keystore.jks
ZOOKEEPER_SSL_KEYSTORE_PASSWORD: mypassword
ZOOKEEPER_SSL_TRUSTSTORE_LOCATION: /etc/kafka/secrets/kafka.server.truststore.jks
ZOOKEEPER_SSL_TRUSTSTORE_PASSWORD: mypassword
volumes:
- ./secrets:/etc/kafka/secrets
kafka:
image: confluentinc/cp-kafka:7.3.0
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,SSL://kafka:9093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,SSL:SSL
KAFKA_SSL_KEYSTORE_LOCATION: /etc/kafka/secrets/kafka.server.keystore.jks
KAFKA_SSL_KEYSTORE_PASSWORD: mypassword
KAFKA_SSL_TRUSTSTORE_LOCATION: /etc/kafka/secrets/kafka.server.truststore.jks
KAFKA_SSL_TRUSTSTORE_PASSWORD: mypassword
volumes:
- ./secrets:/etc/kafka/secrets
postgres:
image: postgres:14
environment:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: mysecurepassword
POSTGRES_DB: testdb
POSTGRES_INITDB_ARGS: "--data-checksums"
ports:
- "5432:5432"
volumes:
- pgdata:/var/lib/postgresql/data
command: >
postgres -c ssl=on
-c ssl_cert_file=/etc/postgresql/secrets/server.crt
-c ssl_key_file=/etc/postgresql/secrets/server.key
volumes:
- ./secrets:/etc/postgresql/secrets
mongo:
image: mongo:5.0
command: mongod --auth --sslMode requireSSL --sslPEMKeyFile /etc/mongo/secrets/mongodb.pem --sslCAFile /etc/mongo/secrets/ca.pem
ports:
- "27017:27017"
environment:
MONGO_INITDB_ROOT_USERNAME: root
MONGO_INITDB_ROOT_PASSWORD: mysecurepassword
volumes:
- ./secrets:/etc/mongo/secrets
debezium:
image: debezium/connect:1.9
ports:
- "8083:8083"
environment:
BOOTSTRAP_SERVERS: kafka:9093
GROUP_ID: 1
CONFIG_STORAGE_TOPIC: debezium_config
OFFSET_STORAGE_TOPIC: debezium_offset
STATUS_STORAGE_TOPIC: debezium_status
SSL_KEYSTORE_LOCATION: /etc/debezium/secrets/debezium.keystore.jks
SSL_KEYSTORE_PASSWORD: mypassword
SSL_TRUSTSTORE_LOCATION: /etc/debezium/secrets/debezium.truststore.jks
SSL_TRUSTSTORE_PASSWORD: mypassword
volumes:
- ./secrets:/etc/debezium/secrets
depends_on:
- kafka
- postgres
- mongo
volumes:
pgdata:
2. Configure PostgreSQL with TDE and RBAC
Log into the PostgreSQL container and set up TDE and RBAC.
Enable TDE in PostgreSQL
Since PostgreSQL does not natively support TDE, use pgcrypto for encrypting specific columns:
sh
docker exec -it <postgres-container-id> bash
psql -U postgres
Create a sample table with encrypted columns using pgcrypto:
sql
CREATE EXTENSION pgcrypto;
CREATE TABLE users (
id SERIAL PRIMARY KEY,
name BYTEA,
email BYTEA
);
INSERT INTO users (name, email) VALUES (
pgp_sym_encrypt('John Doe', 'mysecretkey'),
pgp_sym_encrypt('[email protected]', 'mysecretkey')
);
Implement RBAC
Create roles and assign permissions:
sql
CREATE ROLE read_access;
CREATE ROLE write_access;
GRANT SELECT ON TABLE users TO read_access;
GRANT INSERT, UPDATE, DELETE ON TABLE users TO write_access;
CREATE USER reader WITH PASSWORD 'readerpassword';
CREATE USER writer WITH PASSWORD 'writerpassword';
GRANT read_access TO reader;
GRANT write_access TO writer;
3. Configure Debezium Connector with Security Measures
Create the Debezium PostgreSQL connector, ensuring secure communication with TLS and encryption settings.
First, prepare the Debezium configuration with TLS settings:
sh
mkdir -p ./secrets/debezium
# Generate certificates and keystores if not already done
# Example using OpenSSL to generate a self-signed certificate
openssl req -new -x509 -keyout ./secrets/debezium/server.key -out ./secrets/debezium/server.crt -days 365 -nodes
openssl pkcs12 -export -in ./secrets/debezium/server.crt -inkey ./secrets/debezium/server.key -out ./secrets/debezium/server.p12 -name debezium
keytool -importkeystore -deststorepass mypassword -destkeypass mypassword -destkeystore ./secrets/debezium/debezium.keystore.jks -srckeystore ./secrets/debezium/server.p12 -srcstoretype PKCS12 -srcstorepass mypassword -alias debezium
# Create truststore
keytool -keystore ./secrets/debezium/debezium.truststore.jks -alias CARoot -import -file ./secrets/debezium/server.crt -storepass mypassword -noprompt
Then, create the Debezium connector:
sh
curl -X POST -H "Content-Type: application/json" --data '{
"name": "postgres-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "mysecurepassword",
"database.dbname": "testdb",
"database.server.name": "dbserver1",
"table.include.list": "public.users",
"plugin.name": "pgoutput",
"database.sslmode": "require",
"database.sslrootcert": "/etc/debezium/secrets/ca.pem",
"database.sslcert": "/etc/debezium/secrets/postgresql.crt",
"database.sslkey": "/etc/debezium/secrets/postgresql.key"
}
}' https://localhost:8083/connectors
4. Verify Kafka Messages with Encryption and Security
Use a Kafka console consumer with TLS to verify messages.
First, create the file 'consumer.config':
领英推荐
sh
mkdir -p ./secrets/kafka
cat <<EOF > ./secrets/kafka/consumer.config
security.protocol=SSL
ssl.truststore.location=/etc/kafka/secrets/kafka.client.truststore.jks
ssl.truststore.password=mypassword
ssl.keystore.location=/etc/kafka/secrets/kafka.client.keystore.jks
ssl.keystore.password=mypassword
ssl.key.password=mypassword
EOF
Use the Kafka console consumer:
sh
docker exec -it <kafka-container-id> bash
kafka-console-consumer --bootstrap-server kafka:9093 --topic dbserver1.public.users --from-beginning --consumer.config /etc/kafka/secrets/consumer.config
5. Configure MongoDB Sink Connector with Security Measures
Create the MongoDB sink connector, ensuring secure communication with TLS and encryption.
First, ensure MongoDB is set up with the appropriate certificates:
sh
mkdir -p ./secrets/mongo
# Generate MongoDB certificates
openssl req -new -x509 -keyout ./secrets/mongo/mongodb.key -out ./secrets/mongo/mongodb.pem -days 365 -nodes
cat ./secrets/mongo/mongodb.pem ./secrets/mongo/mongodb.key > ./secrets/mongo/mongodb.pem
Create the MongoDB sink connector:
sh
curl -X POST -H "Content-Type: application/json" --data '{
"name": "mongo-sink-connector",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
"tasks.max": "1",
"topics": "dbserver1.public.users",
"connection.uri": "mongodb://root:mysecurepassword@mongo:27017/?ssl=true&sslCAFile=/etc/mongo/secrets/ca.pem&sslPEMKeyFile=/etc/mongo/secrets/mongodb.pem",
"database": "cdcdb",
"collection": "users",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter"
}
}' https://localhost:8083/connectors
6. Verify MongoDB Data with Encryption
Log into the MongoDB container and check the data, ensuring the connection uses SSL.
sh
docker exec -it <mongo-container-id> bash
mongo --ssl --sslCAFile /etc/mongo/secrets/ca.pem --sslPEMKeyFile /etc/mongo/secrets/mongodb.pem --host localhost -u root -p mysecurepassword
Check the cdcdb database:
sh
use cdcdb
db.users.find().pretty()
I'm very big on security. I made sure This setup captures changes from a PostgreSQL database and streams them to Kafka using Debezium. Kafka then sends these changes to a MongoDB database using the MongoDB sink connector. Security measures include:
Replace <container-id> with the actual container IDs from your Docker environment.
My overall points: You can establish a robust and secure CDC system that ensures data integrity and compliance with various security standards. The integration of TLS, RBAC, and encryption protocols across Docker, PostgreSQL, MongoDB, Kafka, and Debezium provides a highly secure environment for real-time data replication and processing. This setup not only enhances security but also maintains high performance and reliability for your data infrastructure.
Thank you so much for taking the time to review my project.
Fidel Vetino (the Mad Scientist)
Technical Advisor || Solution Architect || Product Developer Security???AI???Systems???Cloud???Software
??Space * Technology * Energy * Manufacturing
?? Fidel V. - Technology Innovator & Visionary ??
#AI / #AI_mindmap / #AI_ecosystem / #ai_model / #Automation / #analytics / #automotive / #aviation / #LinkedIn / #genai / #gen_ai / #LLM / #ML / #SecuringAI / #python / #machine_learning / #machinelearning / #deeplearning / #artificialintelligence / #businessintelligence / #cloud / #Mobileapplications / #SEO / #Website / #Education / #engineering / #management / #security / #blockchain / #marketingdigital / #entrepreneur / #linkedin / #lockdown / #energy / #startup / #retail / #fintech / #tecnologia / #programing / #future / #technology / #creativity / #innovation / #data / #bigdata / #datamining / #strategies /