Efficient Data Ingestion with Glue Concurrency: Using a Single Template for Multiple S3 Tables into a Transactional Hudi Data Lake

Efficient Data Ingestion with Glue Concurrency: Using a Single Template for Multiple S3 Tables into a Transactional Hudi Data Lake

Managing a data lake with multiple tables can be challenging, especially when it comes to writing ETL or Glue jobs for each table. Fortunately, there is a templated approach that can help streamline the process and save time and effort.

By creating a single job that can be used for multiple tables, you can reduce the amount of infrastructure code needed to manage your data lake. This approach allows you to save time by reusing the same job code for multiple tables, rather than creating separate jobs for each table.

By creating a single job that can be used for multiple tables, you can reduce the amount of infrastructure code needed to manage your data lake. This approach allows you to save time by reusing the same job code for multiple tables, rather than creating separate jobs for each table.

Using a templated approach provides a more streamlined and efficient way of managing a large number of tables in a data lake. It enables the creation of a Hudi transactional data lake, which provides more robust and scalable data management capabilities.

In summary, a templated approach for ETL jobs in a data lake can provide numerous benefits, including:

  • Reducing the amount of infrastructure code needed to manage the data lake
  • Saving time by reusing the same job code for multiple tables
  • Reducing the risk of errors and inconsistencies
  • Making it easier to scale the data lake
  • Customizing configuration settings for each table using parameters
  • Automating repetitive tasks and reducing manual effort
  • Enabling the creation of a Hudi transactional data lake, providing more robust and scalable data management capabilities.

If you're looking for ways to streamline your data lake and improve its efficiency, consider adopting a templated approach for your ETL jobs.


Labs:

Step 1: Create a S3 Bucket and run Python file. this will create multiple folder ie tables with some fake data


try
    import datetime
    import json
    import random
    import boto3
    import os
    import uuid
    import time
    from datetime import datetime
    from faker import Faker
    from dotenv import load_dotenv

    load_dotenv(".env")
except Exception as e:
    pass


class AWSS3(object):
    """Helper class to which add functionality on top of boto3 """

    def __init__(self, bucket, aws_access_key_id, aws_secret_access_key, region_name):

        self.BucketName = bucket
        self.client = boto3.client(
            "s3",
            aws_access_key_id=aws_access_key_id,
            aws_secret_access_key=aws_secret_access_key,
            region_name=region_name,
        )

    def put_files(self, Response=None, Key=None):
        """
        Put the File on S3
        :return: Bool
        """
        try:

            response = self.client.put_object(
                ACL="private", Body=Response, Bucket=self.BucketName, Key=Key
            )
            return "ok"
        except Exception as e:
            print("Error : {} ".format(e))
            return "error"

    def item_exists(self, Key):
        """Given key check if the items exists on AWS S3 """
        try:
            response_new = self.client.get_object(Bucket=self.BucketName, Key=str(Key))
            return True
        except Exception as e:
            return False

    def get_item(self, Key):

        """Gets the Bytes Data from AWS S3 """

        try:
            response_new = self.client.get_object(Bucket=self.BucketName, Key=str(Key))
            return response_new["Body"].read()

        except Exception as e:
            print("Error :{}".format(e))
            return False

    def find_one_update(self, data=None, key=None):

        """
        This checks if Key is on S3 if it is return the data from s3
        else store on s3 and return it
        """

        flag = self.item_exists(Key=key)

        if flag:
            data = self.get_item(Key=key)
            return data

        else:
            self.put_files(Key=key, Response=data)
            return data

    def delete_object(self, Key):

        response = self.client.delete_object(Bucket=self.BucketName, Key=Key, )
        return response

    def get_all_keys(self, Prefix=""):

        """
        :param Prefix: Prefix string
        :return: Keys List
        """
        try:
            paginator = self.client.get_paginator("list_objects_v2")
            pages = paginator.paginate(Bucket=self.BucketName, Prefix=Prefix)

            tmp = []

            for page in pages:
                for obj in page["Contents"]:
                    tmp.append(obj["Key"])

            return tmp
        except Exception as e:
            return []

    def print_tree(self):
        keys = self.get_all_keys()
        for key in keys:
            print(key)
        return None

    def find_one_similar_key(self, searchTerm=""):
        keys = self.get_all_keys()
        return [key for key in keys if re.search(searchTerm, key)]

    def __repr__(self):
        return "AWS S3 Helper class "


global faker
global helper

faker = Faker()
helper = AWSS3(
    aws_access_key_id=os.getenv("DEV_ACCESS_KEY"),
    aws_secret_access_key=os.getenv("DEV_SECRET_KEY"),
    region_name=os.getenv("DEV_REGION"),
    bucket=os.getenv("BUCKET")
)


def run():
    for i in range(1,100):

        order_id = uuid.uuid4().__str__()
        customer_id = uuid.uuid4().__str__()
        partition_key = uuid.uuid4().__str__()

        orders = {
            "orderid": order_id,
            "customer_id": customer_id,
            "ts": datetime.now().isoformat().__str__(),
            "order_value": random.randint(10, 1000).__str__(),
            "priority": random.choice(["LOW", "MEDIUM", "URGENT"])
        }
        helper.put_files(Response=json.dumps(orders), Key='raw/orders/{}.json'.format(uuid.uuid4().__str__()))

        customers = {
            "customer_id": customer_id,
            "name": faker.name(),
            "state": faker.state(),
            "city": faker.city(),
            "email": faker.email(),
            "ts": datetime.now().isoformat().__str__()
        }
        helper.put_files(Response=json.dumps(customers), Key='raw/customers/{}.json'.format(uuid.uuid4().__str__()))


if __name__ == "__main__":
    run()
:        
No alt text provided for this image

Again we are simulating there are 100+ tables for demo i am taking two tables

Step 2: Upload Hudi Templated Glue Job


import sy
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

args = getResolvedOptions(
    sys.argv, ['JOB_NAME',
               'SOURCE_S3_PATH',
               'GLUE_DATABASE', 'GLUE_TABLE_NAME', 'HUDI_PRECOMB_KEY', 'HUDI_RECORD_KEY',
               'TARGET_S3_PATH']
)

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

S3bucket_node1 = glueContext.create_dynamic_frame.from_options(
    format_options={"multiline": False},
    connection_type="s3",
    format="json",
    connection_options={
        "paths": [args['SOURCE_S3_PATH']],
        "recurse": True,
    },
    transformation_ctx="S3bucket_node1",
)

# Script generated for node S3 bucket
additional_options = {
    "hoodie.table.name": "customers",
    "hoodie.datasource.write.table.type": "MERGE_ON_READ",
    "hoodie.datasource.write.operation": "upsert",
    "hoodie.datasource.write.recordkey.field": args['HUDI_RECORD_KEY'],
    "hoodie.datasource.write.precombine.field": args['HUDI_PRECOMB_KEY'],
    "hoodie.datasource.write.hive_style_partitioning": "true",
    "hoodie.parquet.compression.codec": "gzip",
    "hoodie.datasource.hive_sync.enable": "true",
    "hoodie.datasource.hive_sync.database": args['GLUE_DATABASE'],
    "hoodie.datasource.hive_sync.table": args['GLUE_TABLE_NAME'],
    "hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor",
    "hoodie.datasource.hive_sync.use_jdbc": "false",
    "hoodie.datasource.hive_sync.mode": "hms",
}
S3bucket_node3_df = S3bucket_node1.toDF()
S3bucket_node3_df.write.format("hudi").options(**additional_options).mode(
    "append"
).save(args['TARGET_S3_PATH'])

job.commit()
        

in this code you will see i have put place holder at places

Make sure to set Glue Concurrency to 4

No alt text provided for this image

Step 3: Fire Jobs in parallel using Template


try
    import datetime
    import json
    import random
    import boto3
    import os
    import uuid
    import time
    from datetime import datetime
    from faker import Faker
    from dotenv import load_dotenv

    load_dotenv(".env")
except Exception as e:
    pass

global helper

glue = boto3.client(
    "glue",
    aws_access_key_id=os.getenv("DEV_ACCESS_KEY"),
    aws_secret_access_key=os.getenv("DEV_SECRET_KEY"),
    region_name=os.getenv("DEV_REGION"),
)

payloads = [
    {
        'JOB_NAME': 'customers',
        'SOURCE_S3_PATH': 's3://delta-streamer-demo-hudi/raw/customers/',
        'GLUE_DATABASE': 'hudi_db',
        'GLUE_TABLE_NAME': 'customers',
        'HUDI_PRECOMB_KEY': 'ts',
        'HUDI_RECORD_KEY': 'customer_id',
        'TARGET_S3_PATH': 's3://delta-streamer-demo-hudi/hudi/customers/'
    },
    {
        'JOB_NAME': 'orders',
        'SOURCE_S3_PATH': 's3://delta-streamer-demo-hudi/raw/orders/',
        'GLUE_DATABASE': 'hudi_db',
        'GLUE_TABLE_NAME': 'orders',
        'HUDI_PRECOMB_KEY': 'ts',
        'HUDI_RECORD_KEY': 'orderid',
        'TARGET_S3_PATH': 's3://delta-streamer-demo-hudi/hudi/orders/'
    }

]

for payload in payloads:
    job_name = 'hudi-template-ingestion-s3'

    fire_payload = {}
    for key, value in payload.items(): fire_payload[f"--{key}"] = value

    response = glue.start_job_run(
        JobName=job_name,
        Arguments=fire_payload
    )
    print(response)
:        

As you can see, I can easily add as many tables as I need to an array by utilizing the same template as I do when creating a Hudi data lake.?

Results

No alt text provided for this image

Video Guide


Looking for way to get alerts when job fails go event driven approach

Solution

Benefits of using this Approach

  • Reduces the amount of infrastructure code needed to manage the data lake
  • Saves time by allowing you to reuse the same job code for multiple tables
  • Reduces the risk of errors and inconsistencies that can occur when manually creating separate jobs for each table
  • Makes it easier to scale your data lake by adding new tables without having to create new ETL jobs from scratch
  • Allows for custom configuration settings for each table using parameters
  • Simplifies the ETL process by automating repetitive tasks and reducing manual effort
  • Provides a more streamlined and efficient way of managing a large number of tables in a data lake
  • Enables the creation of a Hudi transactional data lake, providing more robust and scalable data management capabilities.

Thank you

要查看或添加评论,请登录

Soumil S.的更多文章

社区洞察

其他会员也浏览了