Power Down Stream Relational Database Aurora Postgres from Apache Hudi Transactional Data Lake with CDC| Step by Step Guide
Author:
Soumil Nitin Shah
I earned a Bachelor of Science in Electronic Engineering and a double master’s in Electrical and Computer Engineering. I have extensive expertise in developing scalable and high-performance software applications in Python. I have a YouTube channel where I teach people about Data Science, Machine learning, Elastic search, and AWS. I work as lead Data Engineer where I spent most of my time developing Ingestion Framework and creating microservices and scalable architecture on AWS. I have worked with a massive amount of data which includes creating data lakes (1.2T) optimizing data lakes query by creating a partition and using the right file format and compression. I have also developed and worked on a streaming application for ingesting real-time streams data via kinesis and firehose to elastic search.
Introduction?
Apache Hudi (short for Hadoop Upserts, Deletes, and Incrementals) is an open-source data management framework designed to simplify the process of ingesting and managing large volumes of data in distributed computing environments. Hudi provides an efficient way of managing large datasets that are frequently updated with incremental changes. It allows for efficient upserts, deletes, and incremental data processing with support for ACID transactions.
Hudi is built on top of Apache Hadoop, and it provides support for a variety of data storage systems such as Hadoop Distributed File System (HDFS), cloud object stores like Amazon S3 and Microsoft Azure Blob Storage and distributed columnar data stores such as Apache Cassandra and Apache HBase. Hudi also provides support for different data processing frameworks such as Apache Spark, Apache Flink, and Presto, making it easy to integrate with existing big data workflows. Overall, Apache Hudi makes it easier for organizations to process, manage, and analyze large volumes of data with high velocity and accuracy.
Section II
Architecture?
Let's explore how to create an incremental processing pipeline that powers downstream applications and systems from a transactional data lake. This will demonstrate how to use incremental batch processing to feed an Aurora Posgres SQL database from Hudi. With the help of incremental batch processing, we will load the CDC events into Aurora landing from transactional datalake. From there, we'll DEDUP the data, clean it up, and enter the staging area. This way you are only brining data you need for operational purposes and you are saving of money on cost as PetaBytes Scale lake is on S3 which also gives you high availability. And further you can power your analytical workload by querying Petabyte scale datalake from S3 using either Athena or Redshift Spectrum?
Small Demo Video :
https://www.youtube.com/watch?v=JQmPbHP8cMQ
Steps :
Step 1: We will create transactional datalake
Define Imports
try
import os
import sys
import uuid
import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
from pyspark.sql.functions import col, asc, desc
from pyspark.sql.functions import col, to_timestamp, monotonically_increasing_id, to_date, when
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import datetime
from functools import reduce
from faker import Faker
except Exception as e:
pass:
Creating Spark Session
SUBMIT_ARGS = "--packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.13.0 pyspark-shell
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
spark = SparkSession.builder \
.config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
.config('className', 'org.apache.hudi') \
.config('spark.sql.hive.convertMetastoreParquet', 'false') \
.getOrCreate()
spark
Define Hudi Settings
db_name = "hudidb"
table_name = "hudi_cdc_table"
recordkey = 'uuid'
path = f"file:///C:/tmp/{db_name}/{table_name}"
precombine = "date"
method = 'upsert'
table_type = "COPY_ON_WRITE" # COPY_ON_WRITE | MERGE_ON_READ
hudi_options = {
'hoodie.table.name': table_name,
'hoodie.datasource.write.recordkey.field': recordkey,
'hoodie.datasource.write.table.name': table_name,
'hoodie.datasource.write.operation': method,
'hoodie.datasource.write.precombine.field': precombine,
'hoodie.table.cdc.enabled':'true',
'hoodie.table.cdc.supplemental.logging.mode': 'data_before_after',
}
Inserting Data Into Hudi
spark_df = spark.createDataFrame
data=[
(1, "insert 1", 111, "2020-01-06 12:12:12"),
(2, "insert 2", 22, "2020-01-06 12:12:12"),
],
schema=["uuid", "message", "precomb", "date"])
spark_df.write.format("hudi"). \
options(**hudi_options). \
mode("append"). \
save(path)(
Lets create Landing Tables in Aurora Postgres SQL. you can spin up Postgres for learning purposes through docker
version: "3.7
services:
postgres:
image: debezium/postgres:13
ports:
- 5432:5432
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
- POSTGRES_DB=postgres"
issue command docker-compose up --build
Next we need to create landing and stage tables in aurora Postgres. connect to Postgres using PGAdmin and create these tables
CREATE SCHEMA landing
CREATE SCHEMA stage;
CREATE TABLE IF NOT EXISTS landing.sample
(
sample_key bigint NOT NULL GENERATED ALWAYS AS IDENTITY ( INCREMENT 1 START 1 MINVALUE 1 MAXVALUE 9223372036854775807 CACHE 1 ),
op character varying(256),
ts_ms character varying(256),
date character varying(256),
message character varying(256),
precomb character varying(256),
uuid character varying(256),
CONSTRAINT sample_key PRIMARY KEY (sample_key)
)
CREATE TABLE IF NOT EXISTS stage.sample
(
uuid character varying(256),
date character varying(256),
message character varying(256),
precomb character varying(256)
);
Next I have a python utility function which will do all heavy lifting and it will insert data into Aurora postgres landing from Hudi by performing incremental query
if you are performing this lab you need to change following items in ENV File
领英推荐
AWS_ACCESS_KEY="XXX
AWS_SECRET_KEY='XXXXXX'
AWS_REGION='us-east-1'
DESTINATION_DATABASE="postgres"
DESTINATION_TABLE_NAME="landing.sample"
AURORA_USERNAME="postgres"
AURORA_PASSWORD="postgres"
JDBC="jdbc:postgresql://localhost:5432/postgres""
Lets run the template
""
SOUMIL SHAH
"""
try:
import sys
import ast
import datetime
from ast import literal_eval
import re
import boto3
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
import os
import json
from dataclasses import dataclass
from pyspark.sql.functions import from_json, col
import pyspark.sql.functions as F
except Exception as e:
pass
class AWSS3(object):
"""Helper class to which add functionality on top of boto3 """
def __init__(self, bucket):
self.BucketName = bucket
self.client = boto3.client("s3",
aws_access_key_id=os.getenv("AWS_ACCESS_KEY"),
aws_secret_access_key=os.getenv("AWS_SECRET_KEY"),
region_name=os.getenv("AWS_REGION"))
def put_files(self, Response=None, Key=None):
"""
Put the File on S3
:return: Bool
"""
try:
response = self.client.put_object(
Body=Response, Bucket=self.BucketName, Key=Key
)
return "ok"
except Exception as e:
raise Exception("Error : {} ".format(e))
def item_exists(self, Key):
"""Given key check if the items exists on AWS S3 """
try:
response_new = self.client.get_object(Bucket=self.BucketName, Key=str(Key))
return True
except Exception as e:
return False
def get_item(self, Key):
"""Gets the Bytes Data from AWS S3 """
try:
response_new = self.client.get_object(Bucket=self.BucketName, Key=str(Key))
return response_new["Body"].read()
except Exception as e:
print("Error :{}".format(e))
return False
def find_one_update(self, data=None, key=None):
"""
This checks if Key is on S3 if it is return the data from s3
else store on s3 and return it
"""
flag = self.item_exists(Key=key)
if flag:
data = self.get_item(Key=key)
return data
else:
self.put_files(Key=key, Response=data)
return data
def delete_object(self, Key):
response = self.client.delete_object(Bucket=self.BucketName, Key=Key, )
return response
def get_all_keys(self, Prefix=""):
"""
:param Prefix: Prefix string
:return: Keys List
"""
try:
paginator = self.client.get_paginator("list_objects_v2")
pages = paginator.paginate(Bucket=self.BucketName, Prefix=Prefix)
tmp = []
for page in pages:
for obj in page["Contents"]:
tmp.append(obj["Key"])
return tmp
except Exception as e:
return []
def print_tree(self):
keys = self.get_all_keys()
for key in keys:
print(key)
return None
def find_one_similar_key(self, searchTerm=""):
keys = self.get_all_keys()
return [key for key in keys if re.search(searchTerm, key)]
def __repr__(self):
return "AWS S3 Helper class "
@dataclass
class HUDISettings:
"""Class for keeping track of an item in inventory."""
table_name: str
path: str
class HUDIIncrementalReader(AWSS3):
def __init__(self, bucket, hudi_settings, spark_session):
AWSS3.__init__(self, bucket=bucket)
if type(hudi_settings).__name__ != "HUDISettings": raise Exception("please pass correct settings ")
self.hudi_settings = hudi_settings
self.spark = spark_session
def __check_meta_data_file(self):
"""
check if metadata for table exists
:return: Bool
"""
file_name = f"metadata/{self.hudi_settings.table_name}.json"
return self.item_exists(Key=file_name)
def __read_meta_data(self):
file_name = f"metadata/{self.hudi_settings.table_name}.json"
return ast.literal_eval(self.get_item(Key=file_name).decode("utf-8"))
def __push_meta_data(self, json_data):
file_name = f"metadata/{self.hudi_settings.table_name}.json"
self.put_files(
Key=file_name, Response=json.dumps(json_data)
)
def __get_begin_commit(self):
self.spark.read.format("hudi").load(self.hudi_settings.path).createOrReplaceTempView("hudi_snapshot")
commits = list(map(lambda row: row[0], self.spark.sql(
"select distinct(_hoodie_commit_time) as commitTime from hudi_snapshot order by commitTime asc").limit(
50).collect()))
"""begin from start """
begin_time = int(commits[0]) - 1
return begin_time
def __read_inc_data(self, commit_time):
incremental_read_options = {
'hoodie.datasource.query.type': 'incremental',
'hoodie.datasource.query.incremental.format': 'cdc',
'hoodie.datasource.read.begin.instanttime': commit_time,
}
print("***")
incremental_df = self.spark.read.format("hudi").options(**incremental_read_options).load(
self.hudi_settings.path).createOrReplaceTempView("hudi_incremental")
df = self.spark.sql("select * from hudi_incremental")
return df
def __get_last_commit(self):
commits = list(map(lambda row: row[0], self.spark.sql(
"select distinct(ts_ms) from hudi_incremental order by ts_ms asc").limit(
50).collect()))
last_commit = commits[len(commits) - 1]
return last_commit
def reset_bookmark(self):
file_name = f"metadata/{self.hudi_settings.table_name}.json"
self.delete_object(Key=file_name)
def __run(self):
"""Check the metadata file"""
flag = self.__check_meta_data_file()
"""if metadata files exists load the last commit and start inc loading from that commit """
if flag:
meta_data = json.loads(self.__read_meta_data())
print(f"""
******************LOGS******************
meta_data {meta_data}
last_processed_commit : {meta_data.get("last_processed_commit")}
***************************************
""")
read_commit = str(meta_data.get("last_processed_commit"))
df = self.__read_inc_data(commit_time=read_commit)
"""if there is no INC data then it return Empty DF """
if not df.rdd.isEmpty():
last_commit = self.__get_last_commit()
self.__push_meta_data(json_data=json.dumps({
"last_processed_commit": last_commit,
"table_name": self.hudi_settings.table_name,
"path": self.hudi_settings.path,
"inserted_time": datetime.datetime.now().__str__(),
}))
return df
else:
return df
else:
"""Metadata files does not exists meaning we need to create metadata file on S3 and start reading from begining commit"""
read_commit = self.__get_begin_commit()
df = self.__read_inc_data(commit_time=read_commit)
last_commit = self.__get_last_commit()
self.__push_meta_data(json_data=json.dumps({
"last_processed_commit": last_commit,
"table_name": self.hudi_settings.table_name,
"path": self.hudi_settings.path,
"inserted_time": datetime.datetime.now().__str__(),
}))
return df
def read(self):
"""
reads INC data and return Spark Df
:return:
"""
return self.__run()
def flatten_df(nested_df):
flat_cols = [c[0] for c in nested_df.dtypes if c[1][:6] != 'struct']
nested_cols = [c[0] for c in nested_df.dtypes if c[1][:6] == 'struct']
flat_df = nested_df.select(flat_cols +
[F.col(nc + '.' + c).alias(c)
for nc in nested_cols
for c in nested_df.select(nc + '.*').columns])
return flat_df
def main():
from dotenv import load_dotenv
load_dotenv(".env")
bucket = 'hudi-demos-emr-serverless-project-soumil'
SUBMIT_ARGS = "--packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.13.0,org.postgresql:postgresql:42.5.4 pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
db_name = "hudidb"
table_name = "hudi_cdc_table"
path = f"file:///C:/tmp/{db_name}/{table_name}"
spark = SparkSession.builder \
.config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
.config('className', 'org.apache.hudi') \
.config('spark.sql.hive.convertMetastoreParquet', 'false') \
.getOrCreate()
helper = HUDIIncrementalReader(bucket=bucket,
hudi_settings=HUDISettings(table_name='hudi_inc_table', path=path),
spark_session=spark
)
# helper.reset_bookmark()
df = helper.read()
if not df.rdd.isEmpty():
"""Post Processing"""
json_schema = spark.read.json(df.rdd.map(lambda row: row.after)).schema
df = df.withColumn('json', from_json(col('after'), json_schema))
df = flatten_df(df)
columns_drop = ["before", "after"]
for column in df.columns:
if "hoodie" in column: columns_drop.append(column)
df = df.drop(*columns_drop)
print(df.show())
df.write \
.mode('append') \
.format("jdbc") \
.option("url", os.getenv("JDBC")) \
.option("driver", "org.postgresql.Driver") \
.option("database", os.getenv("DESTINATION_DATABASE")) \
.option("dbtable", os.getenv("DESTINATION_TABLE_NAME")) \
.option("user", os.getenv("AURORA_USERNAME")) \
.option("password", os.getenv("AURORA_PASSWORD")) \
.save()
main()
Explanation of code:
When you run template first time it will get the begin commit time and code performs incremental query and get the CDC events from Hudi. hudi return data in Debezium format where you get before and after part
Read More RFC 51 : https://github.com/apache/hudi/blob/master/rfc/rfc-51/rfc-51.md
I store the last commit on S3 after creating an incremental query and loading the data into a Spark data frame so that the next time the code runs, it will only process incremental data
Running template gives following output
As you can see we got records 1 and 2 which was inserted into Hudi and that records can also be seen in landing tables
Lets Update record 1 and insert new Record 3 into Datalake
spark_df = spark.createDataFrame
data=[
(4, "insert 4", 44, "2020-02-07 12:12:44"),
(5, "insert 5", 55, "2020-02-07 12:12:32"),
],
schema=["uuid", "message", "precomb", "date"])
spark_df.write.format("hudi"). \
options(**hudi_options). \
mode("append"). \
save(path)(
Running Template Again
i should see record 1 and record 3 on console and in Postgres landing tables
Great Job so far !
lets take a look at landing tables
Look at that
now we will dedup the data using stored procedure as you can see record 1 was inserted and the later updated so lets run stored procedure which will dedup the data and later we shall perform upsert into stage area
-- PROCEDURE: landing.sample_dedup(
-- DROP PROCEDURE IF EXISTS landing.sample_dedup();
CREATE OR REPLACE PROCEDURE landing.sample_dedup()
LANGUAGE 'plpgsql'
AS $BODY$
begin
DELETE FROM landing.sample where op = 'r';
WITH CTE AS
(
SELECT
sample_key,
row_number() over(partition by op,uuid order by ts_ms desc, uuid desc) as rn
FROM
landing.sample
)
DELETE FROM
landing.sample as landing
using
cte
WHERE
landing.sample_key = CTE.sample_key and
cte.rn > 1;
DELETE FROM
landing.sample as landing
using
(select uuid from landing.sample where op='u') as update
where
landing.uuid = update.uuid and
landing.op='i';
end
$BODY$;)
Call Stored procedure
call landing.sample_dedup()
Great work
Now lets move data into stage area
INSERT INTO stage.sample
date,
message,
precomb,
uuid
)
select
l.date,
l.message,
l.precomb,
l.uuid
FROM
landing.sample as l
LEFT JOIN
stage.sample as s ON s.uuid = l.uuid
ON CONFLICT (uuid) DO UPDATE SET date=EXCLUDED.date, message=EXCLUDED.message, precomb=EXCLUDED.precomb;(
Now your cleansed data would be moved into stage area and now we need to prune landing tables when new data arrives you can repeat the process
truncate landing.sample
In this manner, operational data can now be transferred to Postgres, and historical data can be stored in a data lake to support ongoing business decisions.
Thank you for Reading
if you prefer video guide it can be found
https://www.youtube.com/watch?v=sUuvpQ49Y00
Code https://github.com/soumilshah1995/Incremental-Processing-Pipeline-to-power-Aurora-Postgres-SQL-from-Hudi-Transcational-Datalake-