Implement Modern Real time ETL
Amit Priyadarshi
Director @ Synechron | Data Practice | Data Engineering | Big data & Analytics | DWH, ETL, Data Modelling | Cloud, DevOps
1. Context :
Modern data architecture has come a very long way from the earlier days of Data Warehouses and Data Integration. If we call initial days of data architecture as data1.0 then we are now at data3.0 level. Initial data1.0 was focused on building MIS reports and basic analytics for showing what's happening in the organisation, by creating a smaller DataMart(DM) or a larger Data warehouse(DWH) or Enterprise data warehouse(EDW). Data Transformation and Processing was the job of advanced ETL tools like Informatica Power Center, DataStage and Ab Initio who lead this space for more than 15yrs. Data2.0 started when Hadoop announced its arrival and with that, everything started shifting towards creating Data Lake(DL) using Hadoop / Cloudera / Hortonworks, wherein processing was taken care by MapReduce / Pig Latin / Hive in the beginning and later by Apache Spark. HDFS became the undisputed leader in distributed data storage for any data platform. Now with data3.0, we are talking about ESG which is Environmental, Social and Governance perspective of the very complex data architecture. We are now talking about Data Fabric and Data Mesh which is about improving the data consumption experience and making the investment in the data strategy a valuable attribute. I will talk more about these aspects in another blog but one thing I want to handle in this blog is how a major component of the complete data journey is being tackled by model data platforms, and that is the speed of the data movement and processing. This probably is the most important thing now in the current value realisation of the data platform, considering we are now dealing with real good data volume and variety and hence the modern analytical use cases need processed data in real fast way.
Traditionally a data platform always used to talk about throughput only and not latency of the data. Throughput is the number of records processed per second which can be calculated over hours or even days. This mean one record which is generated now and if that can be made available to end users even after 24 hours, can still be called a good throughput if we measure it over a day. Latency, however, is a different concept which is the time taken to travel end to end by one single record. So one record is made available in 1sec then latency is real good compared to 24hours loading. However a good latency is not always better than a good throughput because for traditional analytics, you don't care much about 1 record, but do care about the mass of the data which is batch and hence you want to see all your data together in a certain frequency which typically was one day.
However with the modern day use cases which is mostly around fraud analytics, churn analytics and customer behaviour, a new expectation from the data platform has arisen which is to provide the data in fastest possible manner i,e low latency. We still need batch data with good throughput, however any platform should still support low latency data movement and processing. There are certain architecture which are specialising in providing low latency quicker data as well as high throughout batch data, one example being of Lambda architecture.
In this article, I am going to show how low latency of the data can be achieved with few of the model tech stack we have at disposal. The problem which I am going to show is that of not only data transfer but also data transformation in real time.
2. Problem Statements :
The following are few of the problems of data architecture solutions which this blog is trying to solve
3. Usage :
The following are few of the implementation scenarios of the technical steps shown in this article
3.1 Low Latency On-Premise to Cloud Migration
3.2 Real time data availability?
3.3 Microservices Architecture
3.4 Analytical Use cases
4. Architecture & Technologies :
4.1 Logical Architecture
The following is the logical architecture of a simple modern data flow which involves data read from a source which normally is operational data and stored at some on-prem platform. Data gets generated and then pulled in real time using the ingestion framework. The same dataset is to be processed at multiple layers like raw zone, trusted zone and then changed as per the business / technical requirements. Data gets loaded to the target system which could be a cloud based solution or even On-prem. The difference here from a traditional architecture is the introduction of the data transformation on the fly.
The following are the major components of this logical layer
4.1.1 Source Layer - This is the left most block of the data flow and this is where source data generated and stored. This can be any of the RDBMS like Oracle, MySQL, SQL Server, or files like csv, excel, or any other data storage structure like Teradata, Neo4J, ELK etc.
4.1.2 Target Layer - This is the right most block of the data flow and this is where the transformed data has to be stored in real time. It could be some cloud based services, Databases, Files or any specific storage
4.1.3 ETL Layer - The central part of the architecture is where the ingestion and processing happens all the time. I am representing two inner layers here called Raw zone and Trusted zone. Raw is supposed to carry raw, unchanged data from the source. Trusted is the modified data which can be trusted by the business / users / downstream applications. In actual implementations there could be few more layers as well depending upon the segregation of data / consumers / level we are planning for, but reference of raw and trusted zones can be found in almost all of the new generation platforms.
The ingestion part is sometimes called CDC layer which is change data capture. The job of this layer is to keep looking for the source data change and the moment there is a change happens, it pulls the change records and pass to the next layer. Depending upon the tools and the configurations, it can pull Before Image and After Image changes, both set of records. Obviously in the case of a INSERT record, it would not have any Before image. Similarly in case of DELETE record, it would not have after image. CDC data is passed to the ETL engine which does apply some transformations which changes the data. It will also have the capability to apply governance rules like Data Quality or Data reconciliation rules. Once the ingestion and processing happens, then the records are moved to the target layer
4.2 Physical Architecture
The following is one physical implementation of the logical flow mentioned above. The tools selected could be different depending upon the vendors selected and other functional and non-functional benefits. The platform can be build using any Ubuntu / RHEL kind of machines taken from some public cloud companies like AWS and Azure. We can use the software components directly at the Ubuntu or can use docker platform for managing the containers easily. The ETL engine can be created using confluent kafka. Apache kafka is also a leading package in implementing similar solutions, however Confluent provides some really good options of creating data schema and also provides a management portal called control center.
4.2.1 CDC layer - Apache / Confluent Kafka has a very robust API for system integration called Kafka Connect. It comes out of box with many connectors for most of the available systems including Oracle, MySQL, ELK, Neo4J, Teradata etc. Few of these are regular connectors which works in micro batches however few are actual CDC. e,g Kafka connector for Oracle now comes in both the flavors, one regular micro batch based connector and also one in CDC mode. They can pick the data as per the defined configurations and then loads to a Kafka topic with different records.
4.2.2 Target Layer - Kafka connect can be used for data loading also from a Kafka topic to a target of your choice. The target could be RDBMS, Files, Redshift and most of the technologies you would have heard of.
4.2.3 ETL Layer - This is where the real action is happening. A Kafka topic is the central component which stores the data. Once a record goes to a Kafka topic, it normally does not have any structure. It has just some character array whose meaning can be anything depending upon how you read it. It is just like a fixed length delimiter file. In absence of a header file, you can read the delimited file in any way and try to get a meaning out of that. That could be correct or a disaster. This is where Kafka is different than other similar tools where you can actually create a structure out of the data in the Kafka topic. In case we are using Kafka just as a Pub-Sub where we need to push the data to target without deriving any meaning on the fly, we can use it without any structure. However when we really want to decode the meaning of the attributes then we need to put a structure. The Kafka API is called Kafka schema registry where you can create a schema of the Kafka topic data and then starts using for different evaluations. We can create a definition of Avro type, Json type, and even the new Protobuf ( protocol buffer) type. You have to understand all these types for getting the best benefits out which is in terms of ease of development and support, performance etc.
When Kafka connect loads the data to a Kafka topic, It creates a schema as well in the format mentioned. We can also do it programmatically by attaching a schema to a Kafka topic.
Kafka Stream - A Kafka topic data can not be changed. If we need to do a transformation of the topic data then we can not do it at the same Kafka topic. It has to be done on the fly and then to load to another Kafka topic. Kafka provides another API called Kafka Stream which is an abstraction layer on top of the Kafka topic. This is where it gives you all the capabilities of changing data and use it in another topic as changed dataset. We can use something called KStream or KSQL (KSQL provided by Confluent only) for the same. KStream is a rich set of APIs provided where we can programmatically control the Stream data ( Internally a kafka topic data) and then manipulate it using enterprise programming languages like Java, Java Spring boot or Dot net. Technically it seems little overwhelming for a beginner but it actually is not, for a good java / dotNet developer as the APIs are very rich in providing class and object level information. Confluent came up with a KSQLDB engine that allows you to write standard SQLs and use simpler DMLs and change the dataset. So the way we need to work here is that we create a Kafka stream from the source Kafka topic, then use KStream / KSQL to convert data to another Kafka Stream ( created on top of the target Kafka topic). This way we will have 2 Kafka streams, one which is replica of the source data and another one which has to converted data. The second Kafka topic is then used by Kafka connect to load to the target systems.
5. POC :
The above concepts are proved using a sample project created using the following technologies
领英推荐
a) AWS EC2 - AWS EC2 box with Ubuntu installation
b) Docker - Docker is installed to manage all the services in its containers
c) Confluent - Confluent Kafka is installed using the docker compose file taken from confluent documents page
d) MySQL - MySQL is used as a source for this PoC and the MySQL services are installed using its docker compose files
e) Redshift - Redshift instance is created to be used as a Target
f) MYSQL Workbench - Used for connecting to the MySQL DB
5.1 Source setup - A simple table called product is setup at MySQL. Some sample records are created and inserted by keeping id as Unique
5.2 Target setup - A redshift instance is created for the target setup
5.3 Data Ingestion - The following code is used to setup the kafka source connector for the MySQL table product
This one uses Kafka connect library called JdbcSourceConnector, which has to be installed as a prerequisites before using the following connector. Refer to my youtube channel(https://www.youtube.com/@datapower2022) for more details on how to install the same.
"name": "mysql_product",
"config": {
"name": "mysql_product",
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"tasks.max": "1",
"connection.url": "jdbc:mysql://<ec2_name>.compute.amazonaws.com:6603/<database>",
"connection.user": "<user>",
"connection.password": "<password>",
"connection.attempts": "1",
"table.whitelist": "product",
"mode": "incrementing",
"incrementing.column.name": "id",
"topic.prefix": "mysql_"
}
}
The above is the minimum configuration which has to be used. There are many more settings which needs to be configured in your environment depending upon the schema, or number of tables to be pulled etc. One thing to notice here incrementing.column.name which is set as id. That means Kafka keeps a track of the last id which is extracted and extracts only those records where id is more than last extracted id. If we end up in having a source record whose id is less than that of already extracted then that record would not be pulled.
5.4 Data Load
We are using Kafka connector library called RedshiftSinkConnector for loading data to the Redshift tables from Kafka topic
{
"name": "redshift-product",
"config": {
"connector.class": "io.confluent.connect.aws.redshift.RedshiftSinkConnector",
"tasks.max": "1",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"topics": "trusted_product",
"aws.redshift.domain": "<redshift_instance>.redshift.amazonaws.com",
"aws.redshift.port": "5439",
"aws.redshift.database": "dev",
"aws.redshift.user": "<user>",
"aws.redshift.password": "<password>",
"insert.mode": "insert",
"pk.mode": "none",
"confluent.license": "",
"auto.create": "true",
"confluent.topic.bootstrap.servers": "broker:29092",
"confluent.topic.replication.factor": "1",
"key.ignore": "true",
"schema.ignore": "true",
"value.converter.schema.registry.url": "https://schema-registry:8081",
"schema.registry.schema.version": "1",
"value.converter.schemas.enable": "true",
"key.converter.schema.registry.url": "https://schema-registry:8081"
}
}
There are few extra configuration items which are present here as compared with the MySql source connector. Important ones are related to the schema of the data as kafka topic is Avro based data and hence proper deserialization has to happen before data can be loaded
5.5 ETL Engine
The following code can be used to create the kafka stream from the raw kafka topic.
*******************Create raw stream***************
CREATE STREAM stream_mysql_product (
id BIGINT,
name VARCHAR,
sku VARCHAR,
details VARCHAR
) WITH (
KAFKA_TOPIC = 'mysql_product',
VALUE_FORMAT = 'AVRO', PARTITIONS = '1', REPLICAS = '1' );*
The following code can be used to create the kafka stream from the trusted kafka topic.
*******************Create trusted stream***************
CREATE STREAM stream_trusted_product (
id BIGINT,
name VARCHAR,
sku VARCHAR,
details VARCHAR,
country VARCHAR,
dq_check_name VARCHAR
) WITH (
KAFKA_TOPIC = 'trusted_product',
VALUE_FORMAT = 'AVRO', PARTITIONS = '1', REPLICAS = '1' );
The final part of converting the Raw data stream to trsuted data stream can be achieved by using the following KSQL code
*******************Apply ETL Rules***************
INSERT INTO stream_trusted_product
SELECT *, 'USA' as country,
CASE WHEN name = 'X' THEN 'N'
ELSE 'Y'
END as dq_check_name
FROM stream_mysql_product;
This is a very simple SQL statement which reads data from the first stream using the FROM statement. It then add some transformation to that data values by adding an extra column called country. Also it creates another column value by checking the quality of the data, thus representing DQ capabilities. The above is a simple SQL for the purpose of the PoC however you get the idea of how a complex SQLs with multiple SQL functions along with JOINS can be used to transform the data and load to another Kafka stream. Data which gets loaded to the Kafka stream actually gets loaded to the underlying Kafka topics.
5.6 Results :
5.6.1 Source Record - A source record is inserted as following in the mysql product table
5.6.2 Raw Zone data - The following command at the KSQL server command prompt shows the raw data that gets inserted to the kafka topic and kafka stream at Raw layer
5.6.3 Trusted Zone data - The following command at the KSQL server command prompt shows the trusted data that gets inserted to the kafka topic and kafka stream at Trusted layer. This shows the transformed and Data quality checked data
5.6.4 Final target data - The following shows data loaded at the final target system in real time which in this case is a table at a Redshift instance
5.7 Conclusion :
I have shown here how data records can be pulled in real time, processed and DQ checked in real time and then loaded to a target system all in real time only. We used MySQL as source, Redshift as target and used confluent kafka for the complete ETL engine. The steps shown in this step can be used to create an enterprise scale components which is scalable and distributable in nature. We have used some core concepts of kafka and its APIs like Topics, Connectors, Schema browser, KSQL, Kafka Stream etc.
**************************************************************************
About Author
I am Amit Priyadarshi, a B. Tech from Indian Institute of Technology ( IIT) - Roorkee. I am into IT services from last 21yrs and mostly worked with big giant IT services companies served some top level customers across the globe. The first half of my career was focused on traditional data areas like data warehousing, dimensional data modelling, ETL, Informatica and I spent a lot many yrs in USA and UK as a developer, data modeler and a designer. The second half of my career was spent around Big Data, Cloud, Analytics and DevOps side and I am settled in Bangalore and working in multiple role of Architect family e,g data architect and solution architect.
In my current role, I am working as a Head of Information Practice at Attra Infotech( now Synechron - Payments after acquisition) and I have handled few complex data implementations related with real time processing and migration in the capacity of delivery Architect of the solutions.
I am managing a youtube channel where my focus is to showcase complex data concepts and its implementations to wider audience who have challenges getting into hands-on exposure of data world. Please feel free to Like and subscribe and comments. You can reach to me for any technical discussions and mentorships.
Data Engineer | Publicis Sapient | Ex Microsoft
1 年Thanks for sharing!