SCD1 – Implementing Slowly Changing Dimension Type 1 in PySpark
Aman Dahiya
Senior Azure Data Engineer ,Ex-FLUOR | Microsoft Certified-Azure Data Engineer| Databricks | Pyspark | SQL |Python-Pandas
Introduction to SCD Type 1
SCD Type 1 is a basic method for managing changes to dimension data.
In SCD Type 1, when changes occur in the source data, the existing records in the target table are updated with the new values. However, no historical data is preserved, and the previous values are lost.
PySpark Implementation
We have demonstrated the implementation of SCD Type 1 using PySpark with the following steps:
领英推荐
Example Code
from pyspark.sql import DataFrame
from pyspark.sql.functions import concat_ws, md5, col, current_date, lit
from utils.logger import Logger
from utils.spark_session import SparkSessionManager
class SCDHandler:
def __init__(self):
self.spark = SparkSessionManager(self.__class__.__name__).create_session()
self.logger = Logger(self.__class__.__name__)
def check_columns_presence(self, source_df, target_df, metadata_cols):
"""
Check if all columns from the target DataFrame are present in the source DataFrame.
Args:
source_df (pyspark.sql.DataFrame): Source DataFrame.
target_df (pyspark.sql.DataFrame): Target DataFrame.
Raises:
Exception: If columns are missing in the source DataFrame.
Returns:
None
"""
cols_missing = set([cols for cols in target_df.columns if cols not in source_df.columns]) - set(metadata_cols)
if cols_missing:
raise Exception(f"Cols missing in source DataFrame: {cols_missing}")
def apply_hash_and_alias(self, source_df, target_df, metadata_cols) -> ([DataFrame, DataFrame]):
"""
Apply hash calculation and alias to source and target DataFrames.
Args:
source_df (pyspark.sql.DataFrame): Source DataFrame.
target_df (pyspark.sql.DataFrame): Target DataFrame.
metadata_cols (list): List of metadata columns to exclude from hash calculation.
Returns:
tuple: Tuple containing aliased source DataFrame and aliased target DataFrame.
"""
# Extract columns from target DataFrame excluding metadata columns
tgt_cols = [x for x in target_df.columns if x not in metadata_cols]
# Calculate hash expression
hash_expr = md5(concat_ws("|", *[col(c) for c in tgt_cols]))
# Apply hash calculation and alias to source and target DataFrames
source_df = source_df.withColumn("hash_value", hash_expr).alias("source_df")
target_df = target_df.withColumn("hash_value", hash_expr).alias("target_df")
return source_df, target_df
def scd_1(self, source_df, target_df, join_keys, metadata_cols=None) -> DataFrame:
if metadata_cols is None:
metadata_cols = []
tgt_cols = [x for x in target_df.columns]
self.check_columns_presence(source_df, target_df, metadata_cols)
source_df, target_df = self.apply_hash_and_alias(source_df, target_df, metadata_cols)
# Perform full outer join between source and target DataFrames
join_cond = [source_df[join_key] == target_df[join_key] for join_key in join_keys]
base_df = target_df.join(source_df, join_cond, 'full')
# Filter unchanged records or same records
unchanged_filter_expr = " AND ".join([f"source_df.{key} IS NULL" for key in join_keys])
unchanged_df = base_df.filter(f"({unchanged_filter_expr}) OR "
f"(source_df.hash_value = target_df.hash_value)") \
.select("target_df.*")
# Filter updated records
delta_filter_expr = " and ".join([f"source_df.{key} IS NOT NULL" for key in join_keys])
updated_df = base_df.filter(f"{delta_filter_expr} AND "
f"source_df.hash_value != target_df.hash_value") \
.select("source_df.*")
# Filter new records
new_df = base_df.filter(f"{delta_filter_expr} AND target_df.hash_value IS NULL") \
.select("source_df.*")
# Combine all dfs into result DataFrame
result_df = new_df.select(tgt_cols). \
unionByName(updated_df.select(tgt_cols)). \
unionByName(unchanged_df.select(tgt_cols))
return result_df
If you want to refer to the complete code including the SCDHandler class and its methods, you can find it on GitHub. Here’s the GitHub link:
In the repository, you’ll find the complete implementation of the SCDHandler class along with other utility functions and examples demonstrating the usage of SCD Type 1in PySpark.
Feel free to explore the code, run it in your environment, and adapt it to your specific use cases. If you have any questions or need further assistance, don’t hesitate to ask in the comments.