PySpark script for Incremental Load with SCD2

The Product table is designed to track changes over time using SCD Type 2. This means that whenever a product's attributes (e.g., Price or Stock) are updated, the existing record is marked as historical, and a new record is inserted with the updated values. The table includes additional columns such as StartDate, EndDate, and IsCurrent to manage the validity period of each record.

from pyspark.sql import SparkSession

from pyspark.sql.functions import lit, current_timestamp, col, when

import pandas as pd

# 1. Initialize Spark Session

spark = SparkSession.builder \

.appName("Incremental Load with SCD2") \

.config("spark.driver.extraClassPath", "/path/to/sqljdbc42.jar") \ # Replace with the path to your JDBC driver


# 2. Database Connection Details

source_db_url = "jdbc:sqlserver://SUDHI\\SUDHI:1433;databaseName=Practice"

target_db_url = "jdbc:sqlserver://SUDHI\\SUDHI:1433;databaseName=TargetDB"

db_properties = {

"user": "Sa",

"password": "Password1",

"driver": ""


# 3. Extract Source Data

source_df =, table="dbo.Products", properties=db_properties)

# 4. Extract Target Data (Only Active Records)

target_df =, table="dbo.Products_SCD2", properties=db_properties) \

.filter(col("Is_Current") == 1)

# 5. Identify New and Updated Records

joined_df = source_df.alias("src").join(target_df.alias("tgt"), "ProductID", "left_outer")

# Identify changed records (SCD Type 2)

changed_records = joined_df.filter(

(col("tgt.ProductName").isNotNull()) & (

(col("src.ProductName") != col("tgt.ProductName")) |

(col("src.Price") != col("tgt.Price")) |

(col("src.Stock") != col("tgt.Stock"))


).select("src.ProductID", "src.ProductName", "src.Price", "src.Stock")

# Identify new records

new_records = joined_df.filter(col("tgt.ProductName").isNull()) \

.select("src.ProductID", "src.ProductName", "src.Price", "src.Stock")

# 6. Handling SCD Type 2 - Mark Old Records as Inactive

if changed_records.count() > 0:

updated_records = target_df.alias("tgt").join(changed_records.alias("upd"), "ProductID") \

.select("tgt.ProductID", "tgt.ProductName", "tgt.Price", "tgt.Stock", "tgt.Effective_Date") \

.withColumn("End_Date", current_timestamp()) \

.withColumn("Is_Current", lit(0))

# Overwrite existing records to mark them inactive

updated_records.write.jdbc(url=target_db_url, table="dbo.Products_SCD2", mode="append", properties=db_properties)

# 7. Insert New and Updated Records

if changed_records.count() > 0 or new_records.count() > 0:

new_insert_data = changed_records.union(new_records) \

.withColumn("Effective_Date", current_timestamp()) \

.withColumn("End_Date", lit(None).cast("timestamp")) \

.withColumn("Is_Current", lit(1))

# Insert new records into target database

new_insert_data.write.jdbc(url=target_db_url, table="dbo.Products_SCD2", mode="append", properties=db_properties)

# 8. Stop Spark Session


print("Incremental Load with SCD2 Process Completed Successfully!")

Target table:


