Building Transaction Apache Hudi Data Lake with Streaming ETL from Multiple Kinesis Streams and Joining using Apache Flink | Hands on Lab

Building Transaction Apache Hudi Data Lake with Streaming ETL from Multiple Kinesis Streams and Joining using Apache Flink | Hands on Lab


Project Overview:

We are attempting to execute ETL on streaming data in this project. You've used the Database per Service design pattern. Each service has an own database. Assume you have two service orders and Customers. These services keep their own local databases, but businesses want to access stitched data for some insights, thus we'll use the SAGA Pattern, in which each microservice broadcasts events on its own streams. We will cleanse the data, join it with Apache Flink, and insert ?the curated data into the next stream where the glue streaming task may take the curated data and execute UPSERT on Apache Hudi data lakes. This way we are performing streaming ETL and data is available for use within < 5 minutes

Architecture:

No alt text provided for this image

Explanation:

Database-per-service pattern is a software architecture pattern that creates a separate database for each service instead of having one single database. This approach is usually used in microservices architectures to ensure that each service is decoupled from the other and that each service is free to choose its own database technology and schema. This approach also allows for easier scaling and maintenance of the system as each service can be independently managed and updated. To bring data from this system you can use either debezium or AWS DMS or leverage streams option if available. In the given scenarios both the team who are operating this microservices have opted for DynamoDB as their preferred choice of database.

We shall leverage DynamoDB Streams and captures changes happening into database and capture these changes and process them by lambda functions and once preprocessing is done we can now insert the data into its respective streams. We shall leverage the use of Apache Flink to stich (JOIN) the data and output the curated data into next streams.?

Glue streaming job can now consume the messages from curated streams and then perform UPSERT into hudi tables. This is where we will have curated data where there would be no duplicates and hence this would be out transaction datalake. User can run ad hoc query and build BI dashboards using Quicksights and create Views in Athena if needed from Hudi Tables

There are several window option available in Apache Flink

Tumbling window:

When a windowed query processes each window in a non-overlapping manner, the window is referred to as a?tumbling window. In this case, each record on an in-application stream belongs to a specific window. It is processed only once (when the query processes the window to which the record belongs).

Sliding Window :

In a sliding window,?tuples are grouped within a window that slides across the data stream according to a specified interval. A time-based sliding window with a length of ten seconds and a sliding interval of five seconds contains tuples that arrive within a ten-second window.

Session Window:

Session windows group events that arrive at similar times, filtering out periods of time where there is no data. Session window function has three main parameters: timeout, maximum duration, and partitioning key (optional).

JOINS (Important )

Flink SQL APIs support different types of join conditions, like inner join, outer join, and interval join. You want to limit the resource utilization from growing indefinitely, and run joins effectively.

For that reason, in our example, we use table joins using an interval join. An interval join requires one equi-join predicate and a join condition that bounds the time on both sides.

In this example, we join the dataset of two Kinesis Data Streams tables based on the customer_id, which is a common field between the two stream datasets. The filter condition in the query is based on a time constraint, which restricts resource utilization from growing.

Read More on Interval Join :


Video Guide :

Hands on guide step by step

Why Apache HUDI ?

Apache Hudi (pronounced “hoodie”) is the next generation?streaming data lake platform. Apache Hudi brings core warehouse and database functionality directly to a data lake. Hudi provides?tables,?transactions,?efficient upserts/deletes,?advanced indexes,?streaming ingestion services, data?clustering/compaction?optimizations, and?concurrency?all while keeping your data in open source file formats.

Not only is Apache Hudi great for streaming workloads, but it also allows you to create efficient incremental batch pipelines. Read the docs for more?use case descriptions?and check out?who's using Hudi, to see how some of the largest data lakes in the world including?Uber,?Amazon,?ByteDance,?Robinhood?and more are transforming their production data lakes with Hudi.

Apache Hudi can easily be used on any?cloud storage platform. Hudi’s advanced performance optimizations, make analytical workloads faster with any of the popular query engines including, Apache Spark, Flink, Presto, Trino, Hive, etc.

Steps:

Step 1: Change the Access keys and secret keys in ENV and deploy the stack?

git clione https://github.com/soumilshah1995/dynamodb-streaming-etl-kinesis-.git

cd PART1

npx sls deploy –region=us-east-1        

Step 2: ?Create Glue Database

No alt text provided for this image

Step 3:?Create Kinesis Analytics Application with Flink?

No alt text provided for this image
No alt text provided for this image
No alt text provided for this image

Execute the cells and code given to you?in Notebook


%flink.ssq

DROP TABLE if exists tbl_orders;
CREATE TABLE tbl_orders (
    orderid VARCHAR,
    customer_id VARCHAR,
    ts TIMESTAMP(3),
    order_value DOUBLE,
    priority VARCHAR,
    WATERMARK FOR ts AS ts - INTERVAL '5' SECOND

)
WITH (
    'connector' = 'kinesis',
    'stream' = 'order_streams',
    'aws.region' = 'us-east-1',
    'scan.stream.initpos' = 'LATEST',
    'format' = 'json',
    'json.timestamp-format.standard' = 'ISO-8601'
    );

DROP TABLE if exists tbl_customers;
CREATE TABLE tbl_customers (
    customer_id VARCHAR,
    name  VARCHAR,
    state  VARCHAR,
    city  VARCHAR,
    email  VARCHAR,
    ts TIMESTAMP(3),
    WATERMARK FOR ts AS ts - INTERVAL '5' SECOND

)
WITH (
    'connector' = 'kinesis',
    'stream' = 'customers_streams',
    'aws.region' = 'us-east-1',
    'scan.stream.initpos' = 'LATEST',
    'format' = 'json',
    'json.timestamp-format.standard' = 'ISO-8601'
    );

DROP TABLE if exists orders_customer_stitched;
CREATE TABLE orders_customer_stitched (
        orderid VARCHAR,
        ts TIMESTAMP(3),
        order_value DOUBLE,
        priority VARCHAR,
        name  VARCHAR,
        state  VARCHAR,
        city  VARCHAR,
        email  VARCHAR
)
WITH (
    'connector' = 'kinesis',
    'stream' = 'orders_customer_stitched',
    'aws.region' = 'us-east-1',
    'scan.stream.initpos' = 'LATEST',
    'format' = 'json',
    'json.timestamp-format.standard' = 'ISO-8601'
);
        

Step 5: Observe the streaming data in console to make sure pipline is working fine?

Execute the following query


%flink.ssql(type=update

SELECT

??? tbl_orders.orderid,

??? tbl_customers.ts,

??? tbl_orders.order_value,

??? tbl_orders.priority,

??? tbl_customers.name,

??? tbl_customers.state,

??? tbl_customers.city,

??? tbl_customers.email

?

FROM tbl_orders INNER JOIN tbl_customers ON tbl_orders.customer_id = tbl_customers.customer_id

WHERE tbl_orders.ts BETWEEN tbl_customers.ts - INTERVAL '1' MINUTE AND tbl_customers.ts;)        

Run the python File


python dynamo-db-insert.py        
No alt text provided for this image

Step 6: Insert into Output stitched streams?


%flink.ssql(type=update

INSERT INTO orders_customer_stitched

SELECT

??? tbl_orders.orderid,

??? tbl_customers.ts,

??? tbl_orders.order_value,

??? tbl_orders.priority,

??? tbl_customers.name,

??? tbl_customers.state,

??? tbl_customers.city,

??? tbl_customers.email

?

FROM tbl_orders INNER JOIN tbl_customers ON tbl_orders.customer_id = tbl_customers.customer_id

WHERE tbl_orders.ts BETWEEN tbl_customers.ts - INTERVAL '1' MINUTE AND tbl_customers.ts;)        

Go to kinesis and go and check the data if it arrived in output streams?

No alt text provided for this image

PART 2 we will integrate this with glue streaming job to perform UPSERT into Lakes

要查看或添加评论,请登录

Soumil S.的更多文章

社区洞察

其他会员也浏览了