Hands-on guide! Creating a data pipeline in AWS using Glue and Step Functions.
Rodolfo Marcos
Sr Data / Full-Stack Engineer | Tech Writer | Google cloud certified | AWS @ Freestar
In this tutorial, you’ll create an AWS pipeline to get up-to-date data — with error handling, retries, and parallel processing for efficiency!
Imagine this: you’re working at Company X, and someone from HR approaches you with a request, “We’d like to know all the holidays for each of our employees’ home countries.” As the thoughtful engineer you are, you know you have to build a pipeline to retrieve the holiday data so HR can seamlessly access up-to-date information for planning.
Follow the complete tutorial video below. The content is also written in this article. I recommend watching the video and following the text content.
In this article, we’ll walk you through building a complete AWS solution for this exact need. This solution is designed with error handling, automatic retries, and fast processing using parallelization. And the best part? You can use this same approach for any type of data, saving yourself time and effort. We are not going into permission/authorization details in this guide, If you want to start on the subject I recommend learning about AWS roles and policies.
Parquet is ideal for big data because it’s a compressed, columnar format that speeds up queries and reduces storage costs. It also provides a seamless integration with AWS analytics tools like Athena and Glue, making it easy and cost-effective to query large datasets in S3.
API request example: https://date.nager.at/api/v3/publicholidays/{Year}/{Country Code}.
# Import dependencies
import requests
import pandas as pd
def download_data(country_code):
# Nager API URL
url = f"https://date.nager.at/api/v3/publicholidays/2024/{country_code}"
# Download the data
response = requests.get(url)
# Check if the request was successful
if response.status_code == 200:
# Parse the response content as JSON
data = response.json()
# Convert JSON data to a pandas DataFrame
df = pd.DataFrame(data)
# Select specific attributes (columns)
selected_columns = df[['date', 'name', 'countryCode']]
# Convert the date column to datetime format using .loc
selected_columns.loc[:, "date"] = pd.to_datetime(selected_columns.loc[:, "date"])
# Save the DataFrame as a Parquet file
selected_columns.to_parquet(f'{country_code}.parquet', engine='pyarrow', index=False)
else:
print("Failed to retrieve data:", response.status_code)
if __name__ == '__main__':
country_code = 'US'
download_data(country_code)
The script downloads US holiday data and saves the result locally as a .parquet file. Cool! But we need data for multiple countries. You might be thinking, “Why not just repeat the process for each one?” Exactly! This is where parallelization shines. By running each country at the same time, we save tons of processing time — for instance, if each run takes 1 second, instead of 60 seconds for 60 countries, we only spend 1 second for all of them! Parallel processing is essential for handling large data efficiently and it’s the core concept of the major big data frameworks like Hadoop and Spark.
AWS: Building the solution in the cloud.
We’ve developed the script locally to download data, and now it’s time to design a full-scale solution in AWS! Here’s what we need to do:
This setup will make our solution scalable, cost-effective, efficient, and ready for advanced data queries! Let’s go through each step in detail;
Set up an AWS glue job to download the data
2. On the left bar select “ETL jobs”.
3. Then select “Script editor”.
4. Select “Python shell” as the engine and “start fresh”.
5. Paste the following code in the editor screen, and replace the necessary variables. Note that the script is based on the previous one, but here we included the logic to save the data to the S3 bucket and read the country_code as a parameter.
import sys
import requests
import pandas as pd
import boto3
from awsglue.utils import getResolvedOptions
# Initialize the S3 client
s3 = boto3.client('s3')
def download_data(country_code):
# URL of the JSON data
url = f"https://date.nager.at/api/v3/publicholidays/2024/{country_code}"
# Download the data
response = requests.get(url)
# Check if the request was successful
if response.status_code == 200:
# Parse the response content as JSON
data = response.json()
# Convert JSON data to a DataFrame
df = pd.DataFrame(data)
# Select specific attributes (columns)
selected_columns = df[['date', 'name', 'countryCode']]
# Convert the date column to datetime format using .loc
selected_columns.loc[:, "date"] = pd.to_datetime(selected_columns.loc[:, "date"])
# Save the DataFrame as a Parquet file
selected_columns.to_parquet(f'{country_code}.parquet', engine='pyarrow', index=False)
return f'{country_code}.parquet'
else:
print("Failed to retrieve data:", response.status_code)
def upload_to_s3(file_name):
# Define S3 bucket and key (file path within the bucket)
bucket_name = '{your bucket name}'
file_key = f'raw/{file_name}'
# Upload the parquet file
s3.upload_file(file_name, bucket_name, file_key)
if __name__ == '__main__':
# Define the expected arguments
args = getResolvedOptions(sys.argv, ['country_code'])
# Access the parameters
country_code = args['country_code']
file_name = download_data(country_code)
upload_to_s3(file_name)
6. In the top section of the editor, navigate to “Job details” Ensure that the IAM role you’re using has the required permissions to read and write data to S3 buckets (note: we won’t cover policies and permissions in this tutorial). Choose Python 3.9 as the version, and check the box for “Load common analytics libraries.” This will automatically load pandas for us, so manual installation isn’t needed. Finally, save the job.
Use AWS Step Functions for parallel processing
The download job is set up in Glue and ready to be called. We’ll use the AWS step function as the orchestration layer which will call the download job for each country.
3. In the items bar on the left, type “Map” and then drag it into the main area. In the configuration tab on the right, check the option “Provide a path to items-array” and enter $.country_codes as the value. This specifies the list of country codes we’ll receive as a parameter to download. Scroll down in the configuration settings and set the concurrency limit to 30.
4. Again, in the items bar on the left type “Glue startjobrun” then drag and drop it inside the Map we created in the previous step.
5. Fill in the API parameters inside the configurations tab. Replace the job name with the glue job name you created earlier. Also, set the — country_code parameter.
6. In the editor, go to the top options and select “Config” Set the type to “Express” and click “Create”.
7. To test the step function, click “Execute” on the top bar. Select “Asynchronous” as the execution type, then paste the following JSON with the country list as a parameter. Once the process is finished successfully you should be able to see the files in the S3 bucket folder.
{
"country_codes": [
"AD",
"AE",
"AF",
"AG",
"AI",
"AL",
"AM",
"AO",
"AQ"
]
}
Set up a Glue crawler in AWS Lake Formation
AWS Lake Formation is a managed service that helps you set up and manage data lakes on AWS. It simplifies the process of collecting, cataloging, cleaning, and securing data from various sources stored in S3 for analytics and machine learning.
Now that the files are in place, it’s time to create a glue crawler that scans them and creates a table in the glue tables so we can query the data using AWS Athena later.
An AWS Glue Crawler is a tool that automatically scans and catalogs data stored in sources like S3, identifying data structures and creating or updating metadata tables in the Glue Data Catalog. This metadata can then be used for data querying and transformation.
2. Select the “Create crawler” option in the top right orange button and choose a name for it.
3. In the next screen under the option “Is your data already mapped to Glue tables?” Select “Not yet”. Then, click on “Add a data source”.
In the data source field select “S3”. Then, input the S3 path to the AWS bucket location where your holiday files are placed at the folder level. Click on “Add an S3 data source”.
4. On the next screen, create an AWS Glue IAM role if it does not exist. (Make sure the role has the required permissions)
5. In the “Output and scheduling” settings, select or create the database and table where the data will be stored. Review everything in the next tab, then click “Create crawler”. It will take a few seconds for the crawler to scan all the data, making it ready for querying. One of the advantages of AWS Glue Crawler is that it can scan all files, automatically determine a schema, and create a glue table for you.
Query the data with AWS Athena
Now, you can query the data and perform any analysis you want. The full example shows how to ingest and process data in AWS.
Congratulations!! You have created a data pipeline in AWS that can handle data ingests in parallel.
You can connect me via Gmail: [email protected], Linkedin: https://www.dhirubhai.net/in/rodolfo-marcos-41ab3198/ or Medium: https://medium.com/@rodolfo-marcos07
Thank you so much! I hope it will be useful for you.
Engenheira Florestal | Consultora Técnica | Rela??es Institucionais | Coordena??o executiva | Comunica??o Integrada | Florestas Plantadas | Gest?o de Projetos | Madeira | Políticas Públicas | Agricultura
4 个月Excellent!