Fully Automated Data Ingestion Pipeline (Ingest 1.2TB) To Elastic Search using AWS Step function and Lambda and Firehose

Fully Automated Data Ingestion Pipeline (Ingest 1.2TB) To Elastic Search using AWS Step function and Lambda and Firehose

Soumil Nitin Shah (Data collection and Processing Team lead)

I have a bachelor's degree in electronics engineering and a master's degree in electrical and computer engineering. In Python, I have a lot of experience designing scalable and high-performance software systems. I teach people about Data Science, Machine Learning, Elastic Search, and AWS on my YouTube channel. I works as the Data Collection and Processing Team Lead, and I spend the most of my time on AWS designing Ingestion Framework and microservices and scalable architecture. I've worked with a large quantity of data, including establishing data lakes (1.2T) and improving data lakes queries by partitioning and utilizing the appropriate file format and compression. I've also created and worked on a streaming application that uses kinesis to consume real-time stream data. I've worked on and maintained a production Elastic Search cluster with about 2TB of data, resulting in a 55-fold increase in search response time. I created a high-availability microservice with a multi-region Active-Active Backend using Route 53 and DynamoDB global tables, as well as an API Gateway. On a daily basis, I work with AWS lambda, SQS queue, Event Bridge, AWS batch, SNS, ECS, Open Search, DynamoDB, RDS, Route 53, and many more AWS services.

Here are some popular Articles I have published?

Batch framework (An Internal Data Ingestion Framework that process 1TB of data in Month and run 200+ Jobs)

Link: https://www.dhirubhai.net/pulse/batch-frameworkan-internal-data-ingestion-framework-process-shah/

How we got 50X faster Speed for querying Data Lake using Athena Query & saved Thousands of dollars | Case Study

Link https://www.dhirubhai.net/pulse/how-we-got-50x-faster-speed-querying-data-lake-using-athena-shah/

Elastic Search Performance Tuning and Optimization How We Got 80X Faster Searches a Case Study

https://www.dhirubhai.net/pulse/elastic-search-performance-tuning-optimization-how-we-soumil-shah/

Birendra Singh (Python Developer | Search Engineer | Data Specialist)

Hi, I am Birendra Singh I have completed Bachelor in Electronic Engineering. I love and enjoy working on an elastic search I have 6 years of professional experience in software development lifecycle spanning across multiple sectors including Telecommunications, Geospatial, Oil, and Gas with a focus on quality and on-time delivery

Hari Om Dubey (Consultant Software Engineer, Python developer)

I have completed a Master’s in Computer Application and I have 5 years of experience in developing software applications using Python and Django framework. I love to code in Python and creating a solution for a problem by coding excites me. I have been working at Jobtarget for like past 2 months as a Software Engineer in a Data Team.

Project Summary:

We regularly receive many files, about 7800 GZ files. Each GZ file has around 100000 records. Each file must be read, and pre-processing must be completed. Because these files are large, processing them takes longer, resulting in a bottleneck. We have an average of 100 million records. This is a labor-intensive and time-consuming task. We used to have a large codebase that read the file, processed it, and then uploaded it in bulk to elastic search, which took 5-7 days. I didn't like how tedious and labor-intensive this operation was, so we decided to create a Fully Automated Pipeline that could load all of this data in a fraction of the time. Hence we chose serverless components

Tech Stack:

·??????AWS Step Functions

·??????AWS Lambdas

·??????AWS Kinesis Data Streams

·??????AWS Firehose

·??????Open Search

·??????API Gateway

·??????AWS S3

Why Serverless?

Serverless is a cloud native design that allows you to delegate more of your operational tasks to AWS, resulting in increased agility and creativity. Serverless computing allows you to create and execute apps and services without having to worry about servers. Infrastructure management duties including server or cluster provisioning, patching, operating system maintenance, and capacity provisioning are all eliminated. You may create them for almost any form of application or backend service, and they take care of everything you need to run and scale your app with high availability (Ashish Patel. “AWS — Serverless Services on AWS.” medium. Accessed May 27, 2022)

Architecture:

No alt text provided for this image

Figure 1: Shows High-level Architecture Block?

No alt text provided for this image

Figure 2: Shows AWS Step Function Workflow?

Explanation:

The entire workflow is triggered by a single HTTP POST endpoint. The JSON instructs the Step Function where to get the data and all the metadata it needs to complete the task.

When the Step function is called, the first thing it does is examine the JSON body and header to make sure it's valid JSON, and if it's not, it switches to Error mode and broadcasts an Event to SNS, indicating that the step function pipeline has failed with appropriate Error Message. Once the JSON has been checked and verified, a sample of the first few files will be taken to ensure that the S3 path supplied is legitimate; if it is not, the pipeline will fail, and an email notification will be sent explaining why.

Next Step in the workflow is to validate the hardware Specification before starting ingestion we make sure we have enough space on the cluster and if we do have enough space then we create mappings to New Elastic Search Cluster. We Do not hard code Mapping in code. The Mapping is stored in a JSON file that resides in AWS S3 which gives us the ability to change things very easily and make the pipeline generic enough.

The next step of this process is to check if we should start from first file. This is where we check if we have already processed the file we don’t want to process it again which is why after every file is processed we store the meta-information on AWS S3 which gives the ability to query the Lake with Filename and see if this file name already exists and status is successful do not process the file again. This block ensures that the same files are not processed again. Once we do decide which file, we want to process we then pass those keys to the next Step Function which is Map Operator, and process the Files. This is the most important step where files are processed in parallel.

The Lambda Functions has Pandas, DDOG, and Logging library which allows us to process and monitor process logs in Datadog. We have used a serverless framework to deploy our Lambda code which makes things very easy.

No alt text provided for this image

Figure:?Shows Retry Scenarios?

When a Step Fails, we have retry scenarios which make orchestration very easy here we attempt to process the files for maximum of 3 Times after which an appropriate Error is caught and moved to SQS so that metadata can be updated on AWS S3. We run Glue Crawler which identifies schema in AWS S3 and allows us to query the Meta?Data on Athena to see which?files have processed and which?files have been failed. AWS quick Sight provides us BI Dashboard which shows which file has been processed and which file has failed on beautiful Dashboard.

Metrics:?

No alt text provided for this image

Figure: We Fire 10-20 Lambdas in parallel which process the files.?

Each file reads the data from AWS S3 and processes the files and dumps the data to the firehose. The reason we had to set reserved concurrency for 15 Files as there is a limitation on the kinesis Firehose side.

No alt text provided for this image

Figure: Incoming Bytes to Kinesis?

Challenges with Kinesis?

When Direct PUT is configured as the data source, each Kinesis Data Firehose delivery stream provides the following combined quota for PutRecord and PutRecordBatch requests

For US East (N. Virginia), US West (Oregon), and Europe (Ireland): 500,000 records/second, 2,000 requests/second, and 5 MiB/second.

The maximum size of a record sent to Kinesis Data Firehose, before base64-encoding, is 1,000 KiB.

The PutRecordBatch operation can take up to 500 records per call or 4 MiB per call, whichever is smaller. This quota cannot be changed.

The buffer sizes hints range from 1 MiB to 128 MiB for Amazon S3 delivery. For Amazon OpenSearch Service (OpenSearch Service) delivery, they range from 1 MiB to 100 MiB. For AWS Lambda processing, you can set a buffering hint between 1 MiB and 3 MiB using the BufferSizeInMBs processor parameter. The size threshold is applied to the buffer before compression. These options are treated as hints. Kinesis Data Firehose might choose to use different values when it is optimal (amazon. “Amazon Kinesis Data Firehose Quota.”)

Actually, we could have processed a 1000 Files per 2 minutes but I cannot publish that much amount of data to the firehose and hence had to set reserved concurrency?

No alt text provided for this image

Figure:?Records delivered to Open Search

Before running the pipeline, each lambda was powered tune so right amount of resources could be allocated. Why Power tunning is important?

Choosing the memory allocated to Lambda functions is an optimization process that balances speed (duration) and cost. While you can manually run tests on functions by selecting different memory allocations and measuring the time taken to complete, the?AWS Lambda Power Tuning?tool allows you to automate the process.

This tool uses AWS Step Functions to run multiple concurrent versions of a Lambda function at different memory allocations and measure the performance. The input function is run in your AWS account, performing live HTTP calls and SDK interaction, to measure likely performance in a live production scenario.?

Read More : https://docs.aws.amazon.com/lambda/latest/operatorguide/profile-functions.html

No alt text provided for this image

Shows the Count of documents. The pipeline is still running while writing the articles and hence you see 54M we are expecting the count to be around 90-100M. as you can see we work and deal with massive Big Data and make sure search queries are optimized and cached to deliver the best performance on the application?

Thank you and please post your question in the comments and if you want to learn more I have a series on AWS Step Function

Learn AWS Step Functions :

Title: Beginner | : Learn AWS Step Function in a Very Easy way | Part #1

Title: Beginner | Learn AWS Step Function in Very Easy way| Hello World | Part #2

Title: Beginner | Learn AWS Step Function in Very Easy way| Retry Logic | Part #3

Title: Beginner | Learn AWS Step Function in Very Easy way| Catch Logic | Part #4

Beginner | Learn AWS Step Function in Very Easy way| Catch Custom Error Logic | Part #4A

Beginner | Learn AWS Step Function in Very Easy way| Choice & Branching| Part #5

Learn AWS Step Function in Very Easy way| Process Json Files in Batches | Part #6


AWS Step Function | Parallel Processing JSON Documents and Push Failed Items to DLQ| #7

Title: Async Callback Pattern using AWS Step function + SQS queue + Lambda in Python

要查看或添加评论,请登录

Soumil S.的更多文章

社区洞察

其他会员也浏览了