Building Transaction Apache Hudi Data Lake with Streaming ETL from Multiple Kinesis Streams and Joining using Apache Flink | Hands on Lab
Project Overview:
We are attempting to execute ETL on streaming data in this project. You've used the Database per Service design pattern. Each service has an own database. Assume you have two service orders and Customers. These services keep their own local databases, but businesses want to access stitched data for some insights, thus we'll use the SAGA Pattern, in which each microservice broadcasts events on its own streams. We will cleanse the data, join it with Apache Flink, and insert ?the curated data into the next stream where the glue streaming task may take the curated data and execute UPSERT on Apache Hudi data lakes. This way we are performing streaming ETL and data is available for use within < 5 minutes
Architecture:
Explanation:
Database-per-service pattern is a software architecture pattern that creates a separate database for each service instead of having one single database. This approach is usually used in microservices architectures to ensure that each service is decoupled from the other and that each service is free to choose its own database technology and schema. This approach also allows for easier scaling and maintenance of the system as each service can be independently managed and updated. To bring data from this system you can use either debezium or AWS DMS or leverage streams option if available. In the given scenarios both the team who are operating this microservices have opted for DynamoDB as their preferred choice of database.
We shall leverage DynamoDB Streams and captures changes happening into database and capture these changes and process them by lambda functions and once preprocessing is done we can now insert the data into its respective streams. We shall leverage the use of Apache Flink to stich (JOIN) the data and output the curated data into next streams.?
Glue streaming job can now consume the messages from curated streams and then perform UPSERT into hudi tables. This is where we will have curated data where there would be no duplicates and hence this would be out transaction datalake. User can run ad hoc query and build BI dashboards using Quicksights and create Views in Athena if needed from Hudi Tables
There are several window option available in Apache Flink
Tumbling window:
When a windowed query processes each window in a non-overlapping manner, the window is referred to as a?tumbling window. In this case, each record on an in-application stream belongs to a specific window. It is processed only once (when the query processes the window to which the record belongs).
Sliding Window :
In a sliding window,?tuples are grouped within a window that slides across the data stream according to a specified interval. A time-based sliding window with a length of ten seconds and a sliding interval of five seconds contains tuples that arrive within a ten-second window.
Session Window:
Session windows group events that arrive at similar times, filtering out periods of time where there is no data. Session window function has three main parameters: timeout, maximum duration, and partitioning key (optional).
JOINS (Important )
Flink SQL APIs support different types of join conditions, like inner join, outer join, and interval join. You want to limit the resource utilization from growing indefinitely, and run joins effectively.
For that reason, in our example, we use table joins using an interval join. An interval join requires one equi-join predicate and a join condition that bounds the time on both sides.
In this example, we join the dataset of two Kinesis Data Streams tables based on the customer_id, which is a common field between the two stream datasets. The filter condition in the query is based on a time constraint, which restricts resource utilization from growing.
Read More on Interval Join :
Video Guide :
Hands on guide step by step
Why Apache HUDI ?
Apache Hudi (pronounced “hoodie”) is the next generation?streaming data lake platform. Apache Hudi brings core warehouse and database functionality directly to a data lake. Hudi provides?tables,?transactions,?efficient upserts/deletes,?advanced indexes,?streaming ingestion services, data?clustering/compaction?optimizations, and?concurrency?all while keeping your data in open source file formats.
Not only is Apache Hudi great for streaming workloads, but it also allows you to create efficient incremental batch pipelines. Read the docs for more?use case descriptions?and check out?who's using Hudi, to see how some of the largest data lakes in the world including?Uber,?Amazon,?ByteDance,?Robinhood?and more are transforming their production data lakes with Hudi.
Apache Hudi can easily be used on any?cloud storage platform. Hudi’s advanced performance optimizations, make analytical workloads faster with any of the popular query engines including, Apache Spark, Flink, Presto, Trino, Hive, etc.
Steps:
Step 1: Change the Access keys and secret keys in ENV and deploy the stack?
git clione https://github.com/soumilshah1995/dynamodb-streaming-etl-kinesis-.git
cd PART1
npx sls deploy –region=us-east-1
Step 2: ?Create Glue Database
Step 3:?Create Kinesis Analytics Application with Flink?
Execute the cells and code given to you?in Notebook
%flink.ssq
DROP TABLE if exists tbl_orders;
CREATE TABLE tbl_orders (
orderid VARCHAR,
customer_id VARCHAR,
ts TIMESTAMP(3),
order_value DOUBLE,
priority VARCHAR,
WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
)
WITH (
'connector' = 'kinesis',
'stream' = 'order_streams',
'aws.region' = 'us-east-1',
'scan.stream.initpos' = 'LATEST',
'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601'
);
DROP TABLE if exists tbl_customers;
CREATE TABLE tbl_customers (
customer_id VARCHAR,
name VARCHAR,
state VARCHAR,
city VARCHAR,
email VARCHAR,
ts TIMESTAMP(3),
WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
)
WITH (
'connector' = 'kinesis',
'stream' = 'customers_streams',
'aws.region' = 'us-east-1',
'scan.stream.initpos' = 'LATEST',
'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601'
);
DROP TABLE if exists orders_customer_stitched;
CREATE TABLE orders_customer_stitched (
orderid VARCHAR,
ts TIMESTAMP(3),
order_value DOUBLE,
priority VARCHAR,
name VARCHAR,
state VARCHAR,
city VARCHAR,
email VARCHAR
)
WITH (
'connector' = 'kinesis',
'stream' = 'orders_customer_stitched',
'aws.region' = 'us-east-1',
'scan.stream.initpos' = 'LATEST',
'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601'
);
Step 5: Observe the streaming data in console to make sure pipline is working fine?
Execute the following query
%flink.ssql(type=update
SELECT
??? tbl_orders.orderid,
??? tbl_customers.ts,
??? tbl_orders.order_value,
??? tbl_orders.priority,
??? tbl_customers.name,
??? tbl_customers.state,
??? tbl_customers.city,
??? tbl_customers.email
?
FROM tbl_orders INNER JOIN tbl_customers ON tbl_orders.customer_id = tbl_customers.customer_id
WHERE tbl_orders.ts BETWEEN tbl_customers.ts - INTERVAL '1' MINUTE AND tbl_customers.ts;)
Run the python File
python dynamo-db-insert.py
Step 6: Insert into Output stitched streams?
%flink.ssql(type=update
INSERT INTO orders_customer_stitched
SELECT
??? tbl_orders.orderid,
??? tbl_customers.ts,
??? tbl_orders.order_value,
??? tbl_orders.priority,
??? tbl_customers.name,
??? tbl_customers.state,
??? tbl_customers.city,
??? tbl_customers.email
?
FROM tbl_orders INNER JOIN tbl_customers ON tbl_orders.customer_id = tbl_customers.customer_id
WHERE tbl_orders.ts BETWEEN tbl_customers.ts - INTERVAL '1' MINUTE AND tbl_customers.ts;)
Go to kinesis and go and check the data if it arrived in output streams?
PART 2 we will integrate this with glue streaming job to perform UPSERT into Lakes