Power Down Stream Relational Database Aurora Postgres from Apache Hudi Transactional Data Lake with CDC| Step by Step Guide

Power Down Stream Relational Database Aurora Postgres from Apache Hudi Transactional Data Lake with CDC| Step by Step Guide

Author:

Soumil Nitin Shah

I earned a Bachelor of Science in Electronic Engineering and a double master’s in Electrical and Computer Engineering. I have extensive expertise in developing scalable and high-performance software applications in Python. I have a YouTube channel where I teach people about Data Science, Machine learning, Elastic search, and AWS. I work as lead Data Engineer where I spent most of my time developing Ingestion Framework and creating microservices and scalable architecture on AWS. I have worked with a massive amount of data which includes creating data lakes (1.2T) optimizing data lakes query by creating a partition and using the right file format and compression. I have also developed and worked on a streaming application for ingesting real-time streams data via kinesis and firehose to elastic search.

YouTube: https://www.youtube.com/channel/UC_eOodxvwS_H7x2uLQa-svw

Introduction?

Apache Hudi (short for Hadoop Upserts, Deletes, and Incrementals) is an open-source data management framework designed to simplify the process of ingesting and managing large volumes of data in distributed computing environments. Hudi provides an efficient way of managing large datasets that are frequently updated with incremental changes. It allows for efficient upserts, deletes, and incremental data processing with support for ACID transactions.

Hudi is built on top of Apache Hadoop, and it provides support for a variety of data storage systems such as Hadoop Distributed File System (HDFS), cloud object stores like Amazon S3 and Microsoft Azure Blob Storage and distributed columnar data stores such as Apache Cassandra and Apache HBase. Hudi also provides support for different data processing frameworks such as Apache Spark, Apache Flink, and Presto, making it easy to integrate with existing big data workflows. Overall, Apache Hudi makes it easier for organizations to process, manage, and analyze large volumes of data with high velocity and accuracy.


Section II

Architecture?

No alt text provided for this image

Let's explore how to create an incremental processing pipeline that powers downstream applications and systems from a transactional data lake. This will demonstrate how to use incremental batch processing to feed an Aurora Posgres SQL database from Hudi. With the help of incremental batch processing, we will load the CDC events into Aurora landing from transactional datalake. From there, we'll DEDUP the data, clean it up, and enter the staging area. This way you are only brining data you need for operational purposes and you are saving of money on cost as PetaBytes Scale lake is on S3 which also gives you high availability. And further you can power your analytical workload by querying Petabyte scale datalake from S3 using either Athena or Redshift Spectrum?

Small Demo Video :

https://www.youtube.com/watch?v=JQmPbHP8cMQ

Steps :

Step 1: We will create transactional datalake

Define Imports

try

    import os
    import sys
    import uuid

    import pyspark
    from pyspark.sql import SparkSession
    from pyspark import SparkConf, SparkContext
    from pyspark.sql.functions import col, asc, desc
    from pyspark.sql.functions import col, to_timestamp, monotonically_increasing_id, to_date, when
    from pyspark.sql.functions import *
    from pyspark.sql.types import *
    from datetime import datetime
    from functools import reduce
    from faker import Faker


except Exception as e:
    pass:        

Creating Spark Session


SUBMIT_ARGS = "--packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.13.0 pyspark-shell
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

spark = SparkSession.builder \
    .config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
    .config('className', 'org.apache.hudi') \
    .config('spark.sql.hive.convertMetastoreParquet', 'false') \
    .getOrCreate()

spark        
No alt text provided for this image

Define Hudi Settings


db_name = "hudidb"
table_name = "hudi_cdc_table"

recordkey = 'uuid'
path = f"file:///C:/tmp/{db_name}/{table_name}"
precombine = "date"
method = 'upsert'
table_type = "COPY_ON_WRITE"  # COPY_ON_WRITE | MERGE_ON_READ

hudi_options = {
    
    'hoodie.table.name': table_name,
    'hoodie.datasource.write.recordkey.field': recordkey,
    'hoodie.datasource.write.table.name': table_name,
    'hoodie.datasource.write.operation': method,
    'hoodie.datasource.write.precombine.field': precombine,
    
    'hoodie.table.cdc.enabled':'true',
    'hoodie.table.cdc.supplemental.logging.mode': 'data_before_after',
    
}        

Inserting Data Into Hudi


spark_df = spark.createDataFrame
    data=[
    (1, "insert 1",  111,  "2020-01-06 12:12:12"),
    (2, "insert 2",  22, "2020-01-06 12:12:12"),
], 
    schema=["uuid", "message", "precomb", "date"])

spark_df.write.format("hudi"). \
    options(**hudi_options). \
    mode("append"). \
    save(path)(        

Lets create Landing Tables in Aurora Postgres SQL. you can spin up Postgres for learning purposes through docker


version: "3.7
	

	services:
	  postgres:
	    image: debezium/postgres:13
	    ports:
	      - 5432:5432
	    environment:
	      - POSTGRES_USER=postgres
	      - POSTGRES_PASSWORD=postgres
	      - POSTGRES_DB=postgres"        

issue command docker-compose up --build


Next we need to create landing and stage tables in aurora Postgres. connect to Postgres using PGAdmin and create these tables


CREATE SCHEMA landing
	CREATE SCHEMA stage;
	

	CREATE TABLE IF NOT EXISTS landing.sample
	(
	    sample_key bigint NOT NULL GENERATED ALWAYS AS IDENTITY ( INCREMENT 1 START 1 MINVALUE 1 MAXVALUE 9223372036854775807 CACHE 1 ),
	    op character varying(256),
	    ts_ms character varying(256),
	    date character varying(256),
	    message character varying(256),
	    precomb character varying(256),
	    uuid character varying(256),
	    CONSTRAINT sample_key PRIMARY KEY (sample_key)
	    )
	

	CREATE TABLE IF NOT EXISTS stage.sample
	(
	    uuid character varying(256),
	    date character varying(256),
	    message character varying(256),
	    precomb character varying(256)
	);        

Next I have a python utility function which will do all heavy lifting and it will insert data into Aurora postgres landing from Hudi by performing incremental query

if you are performing this lab you need to change following items in ENV File


AWS_ACCESS_KEY="XXX
AWS_SECRET_KEY='XXXXXX'
AWS_REGION='us-east-1'
DESTINATION_DATABASE="postgres"
DESTINATION_TABLE_NAME="landing.sample"
AURORA_USERNAME="postgres"
AURORA_PASSWORD="postgres"
JDBC="jdbc:postgresql://localhost:5432/postgres""        

Lets run the template


""

SOUMIL SHAH

"""

try:
    import sys
    import ast
    import datetime
    from ast import literal_eval
    import re
    import boto3
    from pyspark.sql import SparkSession
    from pyspark import SparkConf, SparkContext
    import os
    import json
    from dataclasses import dataclass
    from pyspark.sql.functions import from_json, col
    import pyspark.sql.functions as F
except Exception as e:
    pass


class AWSS3(object):
    """Helper class to which add functionality on top of boto3 """

    def __init__(self, bucket):

        self.BucketName = bucket
        self.client = boto3.client("s3",
                                   aws_access_key_id=os.getenv("AWS_ACCESS_KEY"),
                                   aws_secret_access_key=os.getenv("AWS_SECRET_KEY"),
                                   region_name=os.getenv("AWS_REGION"))

    def put_files(self, Response=None, Key=None):
        """
        Put the File on S3
        :return: Bool
        """
        try:
            response = self.client.put_object(
                Body=Response, Bucket=self.BucketName, Key=Key
            )
            return "ok"
        except Exception as e:
            raise Exception("Error : {} ".format(e))

    def item_exists(self, Key):
        """Given key check if the items exists on AWS S3 """
        try:
            response_new = self.client.get_object(Bucket=self.BucketName, Key=str(Key))
            return True
        except Exception as e:
            return False

    def get_item(self, Key):

        """Gets the Bytes Data from AWS S3 """

        try:
            response_new = self.client.get_object(Bucket=self.BucketName, Key=str(Key))
            return response_new["Body"].read()

        except Exception as e:
            print("Error :{}".format(e))
            return False

    def find_one_update(self, data=None, key=None):

        """
        This checks if Key is on S3 if it is return the data from s3
        else store on s3 and return it
        """

        flag = self.item_exists(Key=key)

        if flag:
            data = self.get_item(Key=key)
            return data

        else:
            self.put_files(Key=key, Response=data)
            return data

    def delete_object(self, Key):

        response = self.client.delete_object(Bucket=self.BucketName, Key=Key, )
        return response

    def get_all_keys(self, Prefix=""):

        """
        :param Prefix: Prefix string
        :return: Keys List
        """
        try:
            paginator = self.client.get_paginator("list_objects_v2")
            pages = paginator.paginate(Bucket=self.BucketName, Prefix=Prefix)

            tmp = []

            for page in pages:
                for obj in page["Contents"]:
                    tmp.append(obj["Key"])

            return tmp
        except Exception as e:
            return []

    def print_tree(self):
        keys = self.get_all_keys()
        for key in keys:
            print(key)
        return None

    def find_one_similar_key(self, searchTerm=""):
        keys = self.get_all_keys()
        return [key for key in keys if re.search(searchTerm, key)]

    def __repr__(self):
        return "AWS S3 Helper class "


@dataclass
class HUDISettings:
    """Class for keeping track of an item in inventory."""

    table_name: str
    path: str


class HUDIIncrementalReader(AWSS3):
    def __init__(self, bucket, hudi_settings, spark_session):
        AWSS3.__init__(self, bucket=bucket)
        if type(hudi_settings).__name__ != "HUDISettings": raise Exception("please pass correct settings ")
        self.hudi_settings = hudi_settings
        self.spark = spark_session

    def __check_meta_data_file(self):
        """
        check if metadata for table exists
        :return: Bool
        """
        file_name = f"metadata/{self.hudi_settings.table_name}.json"
        return self.item_exists(Key=file_name)

    def __read_meta_data(self):
        file_name = f"metadata/{self.hudi_settings.table_name}.json"

        return ast.literal_eval(self.get_item(Key=file_name).decode("utf-8"))

    def __push_meta_data(self, json_data):
        file_name = f"metadata/{self.hudi_settings.table_name}.json"
        self.put_files(
            Key=file_name, Response=json.dumps(json_data)
        )

    def __get_begin_commit(self):
        self.spark.read.format("hudi").load(self.hudi_settings.path).createOrReplaceTempView("hudi_snapshot")
        commits = list(map(lambda row: row[0], self.spark.sql(
            "select distinct(_hoodie_commit_time) as commitTime from  hudi_snapshot order by commitTime asc").limit(
            50).collect()))

        """begin from start """
        begin_time = int(commits[0]) - 1
        return begin_time

    def __read_inc_data(self, commit_time):
        incremental_read_options = {
            'hoodie.datasource.query.type': 'incremental',
            'hoodie.datasource.query.incremental.format': 'cdc',
            'hoodie.datasource.read.begin.instanttime': commit_time,
        }

        print("***")

        incremental_df = self.spark.read.format("hudi").options(**incremental_read_options).load(
            self.hudi_settings.path).createOrReplaceTempView("hudi_incremental")

        df = self.spark.sql("select * from  hudi_incremental")

        return df

    def __get_last_commit(self):

        commits = list(map(lambda row: row[0], self.spark.sql(
            "select distinct(ts_ms) from  hudi_incremental order by ts_ms asc").limit(
            50).collect()))

        last_commit = commits[len(commits) - 1]

        return last_commit

    def reset_bookmark(self):
        file_name = f"metadata/{self.hudi_settings.table_name}.json"
        self.delete_object(Key=file_name)

    def __run(self):
        """Check the metadata file"""
        flag = self.__check_meta_data_file()

        """if metadata files exists load the last commit and start inc loading from that commit """
        if flag:
            meta_data = json.loads(self.__read_meta_data())
            print(f"""
            ******************LOGS******************
            meta_data {meta_data}
            last_processed_commit : {meta_data.get("last_processed_commit")}
            ***************************************
            """)

            read_commit = str(meta_data.get("last_processed_commit"))
            df = self.__read_inc_data(commit_time=read_commit)

            """if there is no INC data then it return Empty DF """
            if not df.rdd.isEmpty():
                last_commit = self.__get_last_commit()
                self.__push_meta_data(json_data=json.dumps({
                    "last_processed_commit": last_commit,
                    "table_name": self.hudi_settings.table_name,
                    "path": self.hudi_settings.path,
                    "inserted_time": datetime.datetime.now().__str__(),

                }))
                return df
            else:
                return df

        else:

            """Metadata files does not exists meaning we need to create  metadata file on S3 and start reading from begining commit"""

            read_commit = self.__get_begin_commit()

            df = self.__read_inc_data(commit_time=read_commit)
            last_commit = self.__get_last_commit()

            self.__push_meta_data(json_data=json.dumps({
                "last_processed_commit": last_commit,
                "table_name": self.hudi_settings.table_name,
                "path": self.hudi_settings.path,
                "inserted_time": datetime.datetime.now().__str__(),

            }))

            return df

    def read(self):
        """
        reads INC data and return Spark Df
        :return:
        """

        return self.__run()


def flatten_df(nested_df):
    flat_cols = [c[0] for c in nested_df.dtypes if c[1][:6] != 'struct']
    nested_cols = [c[0] for c in nested_df.dtypes if c[1][:6] == 'struct']

    flat_df = nested_df.select(flat_cols +
                               [F.col(nc + '.' + c).alias(c)
                                for nc in nested_cols
                                for c in nested_df.select(nc + '.*').columns])
    return flat_df


def main():
    from dotenv import load_dotenv
    load_dotenv(".env")

    bucket = 'hudi-demos-emr-serverless-project-soumil'
    SUBMIT_ARGS = "--packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.13.0,org.postgresql:postgresql:42.5.4 pyspark-shell"
    os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
    os.environ['PYSPARK_PYTHON'] = sys.executable
    os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

    db_name = "hudidb"
    table_name = "hudi_cdc_table"
    path = f"file:///C:/tmp/{db_name}/{table_name}"

    spark = SparkSession.builder \
        .config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
        .config('className', 'org.apache.hudi') \
        .config('spark.sql.hive.convertMetastoreParquet', 'false') \
        .getOrCreate()

    helper = HUDIIncrementalReader(bucket=bucket,
                                   hudi_settings=HUDISettings(table_name='hudi_inc_table', path=path),
                                   spark_session=spark
                                   )
    # helper.reset_bookmark()
    df = helper.read()

    if not df.rdd.isEmpty():

        """Post Processing"""
        json_schema = spark.read.json(df.rdd.map(lambda row: row.after)).schema
        df = df.withColumn('json', from_json(col('after'), json_schema))
        df = flatten_df(df)

        columns_drop = ["before", "after"]
        for column in df.columns:
            if "hoodie" in column: columns_drop.append(column)

        df = df.drop(*columns_drop)
        print(df.show())

        df.write \
            .mode('append') \
            .format("jdbc") \
            .option("url", os.getenv("JDBC")) \
            .option("driver", "org.postgresql.Driver") \
            .option("database", os.getenv("DESTINATION_DATABASE")) \
            .option("dbtable", os.getenv("DESTINATION_TABLE_NAME")) \
            .option("user", os.getenv("AURORA_USERNAME")) \
            .option("password", os.getenv("AURORA_PASSWORD")) \
            .save()


main()        

Explanation of code:

When you run template first time it will get the begin commit time and code performs incremental query and get the CDC events from Hudi. hudi return data in Debezium format where you get before and after part

Read More RFC 51 : https://github.com/apache/hudi/blob/master/rfc/rfc-51/rfc-51.md

I store the last commit on S3 after creating an incremental query and loading the data into a Spark data frame so that the next time the code runs, it will only process incremental data

Running template gives following output

No alt text provided for this image

As you can see we got records 1 and 2 which was inserted into Hudi and that records can also be seen in landing tables

Lets Update record 1 and insert new Record 3 into Datalake


spark_df = spark.createDataFrame
    data=[
    (4, "insert 4",  44,  "2020-02-07 12:12:44"),
    (5, "insert 5",  55, "2020-02-07 12:12:32"),
], 
    schema=["uuid", "message", "precomb", "date"])

spark_df.write.format("hudi"). \
    options(**hudi_options). \
    mode("append"). \
    save(path)(        

Running Template Again

i should see record 1 and record 3 on console and in Postgres landing tables

No alt text provided for this image

Great Job so far !

lets take a look at landing tables

No alt text provided for this image

Look at that

now we will dedup the data using stored procedure as you can see record 1 was inserted and the later updated so lets run stored procedure which will dedup the data and later we shall perform upsert into stage area


-- PROCEDURE: landing.sample_dedup(
	

	-- DROP PROCEDURE IF EXISTS landing.sample_dedup();
	

	CREATE OR REPLACE PROCEDURE landing.sample_dedup()
	

	LANGUAGE 'plpgsql'
	AS $BODY$
	

	begin
	DELETE  FROM landing.sample where op = 'r';
	WITH CTE AS
	         (
	             SELECT
	                 sample_key,
	                 row_number() over(partition by op,uuid order by ts_ms desc, uuid desc) as rn
	             FROM
	                 landing.sample
	         )
	DELETE FROM
	    landing.sample as landing
	    using
	    cte
	WHERE
	    landing.sample_key = CTE.sample_key and
	    cte.rn > 1;
	DELETE  FROM
	    landing.sample as landing
	    using
	    (select uuid from landing.sample where op='u') as update
	                                                              where
	                                                              landing.uuid = update.uuid and
	                                                                                 landing.op='i';
	

	end
	$BODY$;)        


Call Stored procedure

call landing.sample_dedup()        


No alt text provided for this image

Great work

Now lets move data into stage area


INSERT INTO stage.sample
	    date,
	    message,
	    precomb,
	    uuid
	)
	select
	    l.date,
	    l.message,
	    l.precomb,
	    l.uuid
	FROM
	    landing.sample as l
	LEFT JOIN
	    stage.sample as s ON s.uuid = l.uuid
	ON CONFLICT (uuid) DO UPDATE SET date=EXCLUDED.date, message=EXCLUDED.message, precomb=EXCLUDED.precomb;(        

Now your cleansed data would be moved into stage area and now we need to prune landing tables when new data arrives you can repeat the process

truncate landing.sample        

In this manner, operational data can now be transferred to Postgres, and historical data can be stored in a data lake to support ongoing business decisions.

Thank you for Reading

if you prefer video guide it can be found

https://www.youtube.com/watch?v=sUuvpQ49Y00

Code https://github.com/soumilshah1995/Incremental-Processing-Pipeline-to-power-Aurora-Postgres-SQL-from-Hudi-Transcational-Datalake-

要查看或添加评论,请登录

社区洞察

其他会员也浏览了