Unlocking the Power of the Lakehouse: A Layered Approach to Data Management
Sulfikkar Shylaja
Data Engineer Lead | Data Architect | Transforming Complex Data into Impactful Insights
Building a Delta Lakehouse Architecture: A Step-by-Step Journey from Raw Data to Business Insights
In today’s fast-paced world, managing vast amounts of data is key to driving actionable insights. The Delta Lakehouse architecture provides a scalable, reliable, and structured approach to processing data across multiple layers, transforming raw data into business intelligence. In this article, we will walk through the key layers—Bronze, Conformance, Silver, and Gold—while showcasing how each stage of transformation brings the data closer to being analytics-ready.
1. Bronze Layer: Capturing Raw Data
The Bronze layer acts as the foundational layer in the Lakehouse architecture. Here, we store the source data in its raw format without any transformation, ensuring we have the original copy of the data that can be revisited if needed.
raw_data_path = "/mnt/raw_layer/customer_raw.csv"
# Read raw data (source format unchanged)
bronze_df = spark.read.csv(raw_data_path, header=True, inferSchema=True)
# Store the raw data in its original format in the Bronze layer
print("Bronze layer data saved in original format.")
Example Bronze Layer Data:
| customer_id | name | address | phone | updated_at |
|-------------|-----------|--------------|------------|------------|
| 1 | John Doe | 123 Maple St | 555-1234 | 2024-01-01 |
| 1 | John Doe | 123 Oak St | 555-5678 | 2024-02-15 |
| 2 | Jane Smith| 456 Elm St | 555-9876 | 2024-01-05 |
| 3 | Mike Lee | NULL | NULL | 2024-02-01 |
The Bronze layer stores the raw data for historical tracking, ensuring that future changes and corrections can be traced back to the original input.
2. Conformance Layer: Ensuring Data Quality and Validation
The Conformance layer is a sub-layer of the Bronze. It performs validation checks on the raw data and separates valid and invalid records. We can introduce schema validation here to enforce business rules.
from pyspark.sql import functions as F
# Read the raw data from the Bronze layer
bronze_df = spark.read.csv(raw_data_path, header=True, inferSchema=True)
# Validate data based on simple rules (e.g., phone and address cannot be null)
valid_df = bronze_df.filter((F.col("phone").isNotNull()) & (F.col("address").isNotNull()))
invalid_df = bronze_df.filter(F.col("phone").isNull() | F.col("address").isNull())
# Save valid and invalid data separately
valid_df.write.format("delta").partitionBy("customer_id").mode("overwrite").save("/mnt/delta/conformance/customer_conformed_valid")
invalid_df.write.format("delta").mode("overwrite").save("/mnt/delta/conformance/customer_conformed_invalid")
print("Conformance layer data saved with validation results.")
Valid Data:
| customer_id | name | address | phone | updated_at |
|-------------|-----------|--------------|------------|------------|
| 1 | John Doe | 123 Maple St | 555-1234 | 2024-01-01 |
| 1 | John Doe | 123 Oak St | 555-5678 | 2024-02-15 |
| 2 | Jane Smith| 456 Elm St | 555-9876 | 2024-01-05 |
Invalid Data:
| customer_id | name | address | phone | updated_at |
|-------------|----------|---------|-------|------------|
| 3 | Mike Lee | NULL | NULL | 2024-02-01 |
By separating invalid records, you maintain clean and validated data for further processing while flagging errors for future correction.
3. Silver Layer: Applying Business Rules and SCD Type 2
The Silver layer is where data cleaning and normalization happen. In this layer, we apply business rules for standardization, perform Slowly Changing Dimensions (SCD Type 2), and split the data into normalized relational models.
from pyspark.sql import functions as F
from pyspark.sql.window import Window
# Read valid data from the Conformance layer
conformed_valid_df = spark.read.format("delta").load("/mnt/delta/conformance/customer_conformed_valid")
# Standardize phone numbers (remove non-numeric characters) and ensure consistent date format
standardized_df = conformed_valid_df.withColumn("phone", F.regexp_replace("phone", "[^0-9]", "")) \
.withColumn("updated_at", F.to_date("updated_at", "yyyy-MM-dd"))
# Implement SCD Type 2 logic
window_spec = Window.partitionBy("customer_id").orderBy("updated_at")
scd2_df = standardized_df.withColumn("effective_from", F.lag("updated_at").over(window_spec)) \
.withColumn("effective_to", F.lead("updated_at").over(window_spec)) \
.withColumn("is_current", F.when(F.col("effective_to").isNull(), True).otherwise(False))
# Normalize into Customer Info and Contact Info tables (3NF normalization)
customer_info_df = scd2_df.select("customer_id", "name", "effective_from", "effective_to", "is_current").distinct()
contact_info_df = scd2_df.select("customer_id", "phone", "address", "updated_at").distinct()
# Save normalized and SCD2-compliant data
customer_info_df.write.format("delta").mode("overwrite").save("/mnt/delta/silver/customer_info_scd2")
contact_info_df.write.format("delta").mode("overwrite").save("/mnt/delta/silver/contact_info_scd2")
print("Silver layer data saved with SCD2, normalization, and standardization.")
领英推荐
Customer Info Table:
| customer_id | name | effective_from | effective_to | is_current |
|-------------|-----------|----------------|--------------|------------|
| 1 | John Doe | 2024-01-01 | 2024-02-15 | false |
| 1 | John Doe | 2024-02-15 | NULL | true |
| 2 | Jane Smith| 2024-01-05 | NULL | true |
Contact Info Table:
| customer_id | phone | address | updated_at |
|-------------|-----------|--------------|-------------|
| 1 | 5551234 | 123 Maple St | 2024-01-01 |
| 1 | 5555678 | 123 Oak St | 2024-02-15 |
| 2 | 5559876 | 456 Elm St | 2024-01-05 |
The Silver layer enables us to capture changes over time while ensuring data is normalized for optimal querying.
4. Gold Layer: Business Transformations & Aggregations
The Gold layer is where data is transformed to support specific business needs, reporting, and analysis. This layer is typically used for aggregating data and applying business-specific transformations, making it ready for consumption by tools like Power BI.
# Read SCD2-compliant and normalized tables from the Silver layer
customer_info_df = spark.read.format("delta").load("/mnt/delta/silver/customer_info_scd2")
contact_info_df = spark.read.format("delta").load("/mnt/delta/silver/contact_info_scd2")
# Business Transformation: Count active customers by region
active_customers_by_region_df = contact_info_df.filter(F.col("is_current") == True) \
.groupBy("address").agg(F.countDistinct("customer_id").alias("active_customer_count"))
# Create a final customer summary model for reporting
customer_summary_df = customer_info_df.join(contact_info_df, "customer_id") \
.filter(F.col("is_current") == True) \
.select("customer_id", "name", "address", "phone", "effective_from")
# Save final aggregated and transformed data
active_customers_by_region_df.write.format("delta").mode("overwrite").save("/mnt/delta/gold/active_customers_by_region")
customer_summary_df.write.format("delta").mode("overwrite").save("/mnt/delta/gold/customer_summary")
print("Gold layer data saved with business transformations and aggregations.")
Active Customer by Region:
| address | active_customer_count |
|---------------|-----------------------|
| 123 Oak St | 1 |
| 456 Elm St | 1 |
Customer Summary:
| customer_id | name | address | phone | effective_from |
|-------------|-----------|--------------|--------|----------------|
| 1 | John Doe | 123 Oak St | 5555678| 2024-02-15 |
| 2 | Jane Smith| 456 Elm St | 5559876| 2024-01-05 |
The Gold layer provides refined datasets for business use, allowing analysts to draw actionable insights and build dashboards.
Conclusion
The Delta Lakehouse architecture allows us to process raw data through structured layers, each with a specific role—ensuring high-quality, validated, and standardized data is available for business decision-making. By employing this layered approach, businesses can transform raw data into insightful reports, providing immense value across all operations.
Feel free to share how you are leveraging Delta Lakehouse in your data strategies!