Building a Real-Time Data Pipeline with Apache Kafka, ClickHouseDB, and AWS S3 for Data Integration and Normalization
Juliano Souza
Director of Information Technology | Technology Mentor for Startups in the EMEA Region.
For businesses dealing with complex data from various databases, implementing an effective real-time data pipeline is critical. This guide details how to set up a robust, scalable architecture using Apache Kafka for data streaming, ClickHouseDB for data processing and storage, and AWS S3 as a long-term data lake for backup, disaster recovery, and archival purposes. We’ll cover how these components work together to provide real-time data availability, seamless integration, and optimized performance for analytical queries, with a deep dive into exporting data from ClickHouseDB to S3.
Real-Time Data Pipeline Architecture Overview
The diagram provided represents a comprehensive real-time data pipeline architecture that integrates Apache Kafka, ClickHouseDB, and AWS S3 to manage data ingestion, processing, and storage from various source databases. Below is an in-depth breakdown of how each component in this architecture works, based on the image and the details from the article:
1. Source Databases
2. Apache Kafka: Real-Time Streaming Platform
3. ClickHouseDB: Real-Time Processing and Storage
4. AWS S3: Long-Term Data Storage
5. BI Dashboard (Tableau)
Workflow Summary:
Benefits of This Architecture:
This architecture provides a powerful and efficient solution for real-time data integration, processing, and reporting, leveraging modern tools and best practices for optimal performance and scalability.
1. Overview of Architecture
This architecture uses three main components: Apache Kafka, ClickHouseDB, and AWS S3. Kafka serves as the real-time streaming platform, ingesting data from various databases such as SQLServer, Oracle, PostgreSQL (PGSQL), and MySQL. ClickHouseDB processes and stores the data for immediate querying, while AWS S3 acts as a centralized data lake for long-term storage, backup, and disaster recovery.
Key Components:
Benefits of This Approach:
2. Benefits of Apache Kafka and ClickHouseDB
Apache Kafka: The Backbone of Real-Time Data Streaming
A. Scalability and Fault Tolerance
B. High Throughput and Low Latency
C. Versatile Integration
Clickhouse DB: The PowerHouse for Real-Time Data Processing
A. Superior Query Performance
B. High-Speed Ingestion and Processing
C. Scalability and Distributed Processing
领英推荐
D. Real-Time Data Integration with Kafka
3. Step-by-Step Implementation
Step 1: Setting Up Apache Kafka for Streaming
docker network create data-stream-network
2. Run Zookeeper:
docker run -d --name zookeeper --network data-stream-network -p 2181:2181 zookeeper
3. Deploy Kafka:
docker run -d --name kafka --network data-stream-network -p 9092:9092 -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092 -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 confluentinc/cp-kafka
4. Create Kafka topics for data ingestion:
docker exec -it kafka kafka-topics --create --topic source-database-1 --bootstrap-server kafka:9092 --partitions 3 --replication-factor 1
docker exec -it kafka kafka-topics --create --topic source-database-2 --bootstrap-server kafka:9092 --partitions 3 --replication-factor 1
Step 2: Configuring Kafka Connect for Database Integration
Kafka Connect bridges various databases and Kafka, enabling data flow with minimal coding.
Example Configuration for PostgreSQL (postgres-source-config.json):
{
"name": "postgres-source-connector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"tasks.max": "1",
"connection.url": "jdbc:postgresql://your-db-url:5432/your-db",
"connection.user": "your-db-user",
"connection.password": "your-db-password",
"mode": "incrementing",
"incrementing.column.name": "id",
"topic.prefix": "source-database-1-"
}
}
Example Configuration for SQLServer (sqlserver-source-config.json):
{
"name": "sqlserver-source-connector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"tasks.max": "1",
"connection.url": "jdbc:sqlserver://your-db-url:1433;databaseName=your-db",
"connection.user": "your-db-user",
"connection.password": "your-db-password",
"mode": "timestamp+incrementing",
"timestamp.column.name": "last_updated",
"incrementing.column.name": "id",
"topic.prefix": "source-database-2-"
}
}
Example Configuration for Oracle (oracle-source-config.json):
{
"name": "oracle-source-connector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"tasks.max": "1",
"connection.url": "jdbc:oracle:thin:@your-db-url:1521/your-db",
"connection.user": "your-db-user",
"connection.password": "your-db-password",
"mode": "incrementing",
"incrementing.column.name": "id",
"topic.prefix": "source-database-3-"
}
}
Deploy Kafka Connectors :
curl -X POST -H "Content-Type: application/json" --data @postgres-source-config.json https://localhost:8083/connectors
curl -X POST -H "Content-Type: application/json" --data @sqlserver-source-config.json https://localhost:8083/connectors
curl -X POST -H "Content-Type: application/json" --data @oracle-source-config.json https://localhost:8083/connectors
Step 3: Deploying ClickHouseDB for Data Ingestion and Processing
docker run -d --name clickhouse --network data-stream-network -p 8123:8123 -p 9000:9000 yandex/clickhouse-server
2. Create a Kafka-engine table in ClickHouseDB:
CREATE TABLE kafka_stream (
order_id UInt64,
customer_id UInt64,
product_id UInt64,
quantity UInt32,
unit_price Float32,
total_price Float32,
currency String,
order_date DateTime,
billing_address String,
shipping_address String,
status String
) ENGINE = Kafka()
SETTINGS kafka_broker_list = 'kafka:9092',
kafka_topic_list = 'source-database-1',
kafka_group_name = 'clickhouse-group',
kafka_format = 'JSONEachRow';
3. Create a materialized view for data normalization:
CREATE MATERIALIZED VIEW normalized_data TO kafka_data AS
SELECT
order_id,
customer_id,
product_id,
quantity,
unit_price,
(quantity * unit_price) AS total_price,
currency,
order_date,
billing_address,
shipping_address,
status,
now() AS created_at
FROM kafka_stream;
Deep Dive into Data Ingestion from Kafka to ClickHouseDB:
4. Exporting Data from ClickHouse to AWS S3
Exporting data from ClickHouseDB to AWS S3 is essential for long-term storage, backup, and disaster recovery. The s3 table function in ClickHouse makes it easy to export data directly to an S3 bucket in formats such as JSON or CSV.
How the Export Works:
Example of Exporting Data to S3:
INSERT INTO FUNCTION s3(
'https://your-bucket-name.s3.amazonaws.com/path/to/output-file.json',
'AWS_ACCESS_KEY_ID',
'AWS_SECRET_ACCESS_KEY',
'JSONEachRow'
)
SELECT
order_id,
customer_id,
product_id,
quantity,
unit_price,
total_price,
currency,
order_date,
billing_address,
shipping_address,
status,
created_at
FROM normalized_data;
Security and Best Practices:
Use Cases:
Conclusion
Implementing a data pipeline using Apache Kafka, ClickHouseDB, and AWS S3 provides a powerful, scalable solution for real-time data ingestion, processing, and storage. Kafka ensures efficient streaming from various databases, ClickHouseDB processes data for immediate analysis, and S3 offers a reliable and cost-effective solution for long-term data retention. This architecture enables businesses to leverage real-time insights, maintain data backups, and scale their data operations confidently.