Building a Medallion Architecture with EMR Serverless and Apache Iceberg: An Incremental Data Processing Guide with Hands-On Code
Introduction:
In the world of data engineering, organizing and managing data through a well-defined architecture is crucial for maintaining quality, consistency, and performance. The Medallion Architecture provides a structured approach to data lake management by organizing data into layers or "zones" – Bronze, Silver, and Gold. This structure not only optimizes storage and query performance but also enables incremental data processing and simplifies data governance. In this blog, we’ll explore how to build a Medallion Architecture using AWS EMR Serverless and Apache Iceberg, a table format designed to bring ACID transactions to data lakes. We’ll also query our processed data using popular query engines like Snowflake, Athena, and StarRocks.
Video based Guide
What is Medallion Architecture?
Medallion Architecture is a pattern for structuring data storage layers in a data lake to ensure data quality, optimize processing, and facilitate analysis. The architecture typically has three main layers:
With Medallion Architecture, we can incrementally process new data, enforce schema and quality rules, and track data lineage across each stage.
Why Use EMR Serverless and Apache Iceberg?
Together, EMR Serverless and Apache Iceberg allow us to create a scalable, cost-effective, and easily maintainable Medallion Architecture.
Hands-On: Setting Up the Architecture
Step 1: Create EMR Serverless Cluster
Step 2: Generate Some data into Raw layers
This script will generate some mock data into raw folder
Step 3: Ingest into bronze layers Incrementally
This script performs incremental data ingestion into an Iceberg table following a data lakehouse model. It scans a specified directory or S3 bucket for new files based on a checkpoint that tracks the last processed time, ensuring only unprocessed files are ingested. When new files are detected, they’re read into a Spark DataFrame, with additional metadata columns added to log the file source and processing time. The DataFrame is then appended to an Iceberg table in a "bronze" layer, where it’s stored for further incremental processing. Finally, the script updates the checkpoint, so subsequent runs only process newly added files since the last checkpoint.
Main Function
This script when ran for the first time it will scan the directory raw/* and then after processing it will commit checkpoint to S3 where it will store the last processed file timestamp and then when it runs next time it will again scan the directory and will filter files greater than last processed timestamp this way it will process only new incremental files and every run it updates the checkpoint to S3
Submit Job to EMR
Step 4: Ingest into silver layers Incrementally from bronze layers
This script implements a data processing pipeline using Apache Iceberg and Spark. It moves data incrementally from a Bronze (raw data) table to a Silver (curated data) table. Initially, it checks if the Silver table exists; if not, it creates the table using an Avro schema. On each run, it reads the Bronze table's history to determine the latest snapshot ID and fetches incremental data since the last processed snapshot, tracked via a checkpoint stored in S3. Using a merge operation, it updates or inserts records into the Silver table, ensuring data deduplication via a window function. The pipeline maintains consistency and processes only new data, optimizing data flow between stages.
If you run it again you will see in spark logs no data to process
Submit Job to EMR (bronze-silver)
Step 5: Now simulate Updates are arriving into into Raw Folder
Run python File https://github.com/soumilshah1995/emr-apache-iceberg-workshop/blob/main/datagen/updates_iceberg.py
New file has been added now if I run raw-bronze.py on EMR only that file should be processed since job had committed checkpoint last run
EMR LOGS
Only New file was processed Lovely !
Lets run job for Bronze to silver
EMR Logs
This time it will list all snapshots from bronze and then fetch the checkpoint and should do incremental query to get new deltas and should perform MERGE INTO Target table
Screenshot its clear that its doing incremental query and only processing New Deltas
Also you can see Dataframe where OP is 'U' which stands for update its clear that it didn't read all data and brining in new deltas
Query engine
Query in Athena
Records
New Updates were merged into silver table perfect
Query in StarRocks
Start the container and execute following commands
Output
Query in DuckDB
Query in Snowflake
Conclusion
By combining AWS EMR Serverless and Apache Iceberg, we’ve demonstrated a scalable and efficient implementation of the Medallion Architecture. This setup supports incremental processing, ensures data consistency, and provides flexibility for querying data across various engines.
Big Data Developer | Data Engineering | Python | SQL | Spark | Scala | Hadoop | Sqoop | Hive | AWS | Software Testing
1 周Great work show cases your ??knowledge
Software Engineering | Big Data Distributed Systems | Machine Learning
1 周Love this
Founder @ ITVersity | GVP Data and Analytics @ Infolob | Cloud Transformation, Data Services | Udemy Instructor
1 周Thanks for sharing!!! FYI, Raghu Raman A V