Types of SCD and Implementation in Spark

Types of SCD and Implementation in Spark

Slowly Changing Dimensions (SCD) in Apache Spark are implemented to handle historical data changes in a data warehouse. Spark, especially with Delta Lake, simplifies SCD implementation through upserts, versioning, and ACID transactions.


1?? SCD Type 1 (Overwrite the old data)

  • No history maintained; just update the existing records.
  • Best for correcting errors or non-historic attributes.

Implementation (Spark SQL & Delta)

from delta.tables import DeltaTable
from pyspark.sql.functions import col

# Load the target table
delta_table = DeltaTable.forPath(spark, "dbfs:/mnt/delta/dim_customers")

# Load the new data
new_data = spark.read.format("delta").load("dbfs:/mnt/delta/staging/customers")

# Perform an UPSERT (Merge)
(
    delta_table.alias("target")
    .merge(new_data.alias("source"), "target.customer_id = source.customer_id")
    .whenMatchedUpdate(set={"target.name": "source.name", "target.address": "source.address"})
    .whenNotMatchedInsertAll()
    .execute()
)

        

2?? SCD Type 2 (Maintain history with a versioning approach)

  • Creates new records for changes while maintaining old ones.
  • Uses start_date, end_date, and is_active flags.

Implementation (Spark SQL & Delta)

from pyspark.sql.functions import current_date, lit

# Load existing records
existing_df = spark.read.format("delta").load("dbfs:/mnt/delta/dim_customers")

# Load new data
new_data = spark.read.format("delta").load("dbfs:/mnt/delta/staging/customers")

# Find records that changed
changed_records = existing_df.alias("e").join(new_data.alias("n"), "customer_id") \
    .filter("e.address <> n.address OR e.name <> n.name") \
    .select("n.*")

# Expire old records
updated_existing = existing_df.alias("e").join(changed_records.alias("c"), "customer_id") \
    .select("e.*", lit(current_date()).alias("end_date"), lit(False).alias("is_active"))

# Insert new version of records
new_version = changed_records.withColumn("start_date", current_date()).withColumn("is_active", lit(True))

# Merge data and save to Delta Table
final_df = existing_df.filter("is_active = True").union(updated_existing).union(new_version)
final_df.write.format("delta").mode("overwrite").save("dbfs:/mnt/delta/dim_customers")
        

3?? SCD Type 3 (Track only limited history)

  • Uses previous and current value columns.
  • Tracks only recent changes, not the full history.

Implementation (Spark SQL)

from pyspark.sql.functions import col

# Load existing data
existing_df = spark.read.format("delta").load("dbfs:/mnt/delta/dim_customers")

# Load new data
new_data = spark.read.format("delta").load("dbfs:/mnt/delta/staging/customers")

# Identify changed records
changed_records = existing_df.alias("e").join(new_data.alias("n"), "customer_id") \
    .filter("e.address <> n.address OR e.name <> n.name") \
    .select("n.customer_id", "n.name", "n.address", col("e.address").alias("previous_address"))

# Merge and save
final_df = existing_df.join(changed_records, "customer_id", "left_outer") \
    .select("customer_id", "name", "address", "previous_address")
final_df.write.format("delta").mode("overwrite").save("dbfs:/mnt/delta/dim_customers")        

4?? SCD Type 4 (Maintaining history in a separate table)

  • Instead of keeping history in the same dimension table, historical changes are stored in a separate history table.
  • The main dimension table contains only the latest record.

Implementation in Spark (Delta Lake)

from delta.tables import DeltaTable
from pyspark.sql.functions import current_date, lit

# Load main dimension table
dim_customers = spark.read.format("delta").load("dbfs:/mnt/delta/dim_customers")

# Load staging data (new updates)
staging_data = spark.read.format("delta").load("dbfs:/mnt/delta/staging/customers")

# Identify changed records
changed_records = dim_customers.alias("dim") \
    .join(staging_data.alias("stg"), "customer_id") \
    .filter("dim.name <> stg.name OR dim.address <> stg.address") \
    .select("dim.*", lit(current_date()).alias("change_date"))

# Write changed records to history table
changed_records.write.format("delta").mode("append").save("dbfs:/mnt/delta/dim_customers_history")

# Update the main table with new values
delta_table = DeltaTable.forPath(spark, "dbfs:/mnt/delta/dim_customers")
(
    delta_table.alias("dim")
    .merge(staging_data.alias("stg"), "dim.customer_id = stg.customer_id")
    .whenMatchedUpdate(set={"dim.name": "stg.name", "dim.address": "stg.address"})
    .whenNotMatchedInsertAll()
    .execute()
)        

6?? SCD Type 6 (Hybrid of Type 1, Type 2, and Type 3)

  • Maintains a full history like SCD Type 2.
  • Includes a "current value" column (like SCD Type 3).
  • Allows easy querying of both historical and latest data.

Schema Example

Implementation in Spark

from delta.tables import DeltaTable
from pyspark.sql.functions import current_date, lit, col

# Load existing dimension data
dim_customers = spark.read.format("delta").load("dbfs:/mnt/delta/dim_customers")

# Load new data
staging_data = spark.read.format("delta").load("dbfs:/mnt/delta/staging/customers")

# Identify changed records
changed_records = dim_customers.alias("dim") \
    .join(staging_data.alias("stg"), "customer_id") \
    .filter("dim.name <> stg.name OR dim.address <> stg.address") \
    .select("stg.customer_id", "stg.name", "stg.address", "dim.address", lit(current_date()).alias("start_date"), lit(None).cast("date").alias("end_date"), lit(True).alias("is_active"))

# Expire old records
expired_records = dim_customers.alias("dim") \
    .join(changed_records.alias("chg"), "customer_id") \
    .select("dim.*", lit(current_date()).alias("end_date"), lit(False).alias("is_active"))

# Merge: Remove old records, insert new ones
final_df = dim_customers.filter("is_active = True").union(expired_records).union(changed_records)
final_df.write.format("delta").mode("overwrite").save("dbfs:/mnt/delta/dim_customers")        

Which SCD Type Should You Use?

  • SCD Type 1 → If you don’t need history (overwrite).
  • SCD Type 2 → If full history is required (new records with timestamps).
  • SCD Type 3 → If only recent changes matter.
  • SCD Type 4 → If history should be stored in a separate table.
  • SCD Type 6 → If you need both full history and easy current record access.


Best Practices

  • Use Delta Lake for ACID transactions and versioning.
  • Partition data based on high-cardinality fields (e.g., customer_id).
  • Optimize performance with ZORDER indexing on frequently queried columns.
  • Leverage MERGE for efficiency instead of full table overwrites.


要查看或添加评论,请登录

Midhun Mathew的更多文章

社区洞察

其他会员也浏览了