Microsoft Fabric: Transform and integrate multiple raw data sources into a consolidated, structured format for analytics and reporting.
The objective of this project is to transform and integrate multiple raw data sources into a consolidated, structured format suitable for downstream analytics and reporting. The data pertains to product master information, which includes various attributes and classifications necessary for effective product management and analysis. The specific objectives are as follows:
Problem Statement:
Product master data is distributed across multiple Delta tables, each containing different attributes and details. The challenge is to consolidate and prepare this data for analytics while maintaining accuracy and completeness.
Solution:
2. Data Transformation
Problem Statement:
Raw data is not in a user-friendly format:
Solution:
3. Data Integration
Problem Statement:
The product data is scattered across multiple tables, leading to fragmented insights. Joining these tables without careful handling of keys or mismatched schemas could result in data duplication, loss, or inconsistencies.
Solution:
领英推荐
4. Data Loading
Problem Statement:
Once the data is integrated and transformed, it needs to be stored in a Delta table for further use. Challenges include:
Solution:
Summary of Problem-Solution Mapping
ProblemSolutionFragmented raw data across multiple Delta tablesExtract raw data into separate DataFrames using Spark for efficient processing.Raw classification data is unstructured and incompleteFilter, pivot, and enrich the classification data to make it structured and complete.Missing values in dataUse default values to fill nulls, ensuring no critical data gaps.Non-descriptive column namesRename columns to make them meaningful and consistent.Scattered data across tables, requiring integrationJoin DataFrames on PRODUCTMASTER_ID to create a comprehensive dataset.Data needs to be saved for downstream analytics and reportsSave as a Delta table with error handling to ensure reliable storage.
Code:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, concat, lit, substring, when, first
from delta.tables import *
from datetime import datetime
# Load the raw data into DataFrames - Change the delta table name, if required
df = spark.table("raw.productmaster_classification_e1auspm")
df_e1maram_pm = spark.table("raw.productmaster_e1maram")
df_e1mvkem_pm = spark.table("raw.productmaster_e1mvkem")
df_e1maktm_pm = spark.table("raw.productmaster_e1maktm")
df_e1mbewm_pm = spark.table("raw.productmaster_e1mbewm")
# Define lambda functions
filter_atnam = lambda df, conditions: df.filter(col("ATNAM").isin(conditions))
pivot_atwrt = lambda df: df.filter(col("ATNAM") != "Z_CRP_LARGEORDERQTY").groupBy("PRODUCTMASTER_CLASSIFICATION_ID").pivot("ATNAM").agg(first("ATWRT"))
pivot_atcod = lambda df: df.filter(col("ATNAM") == "Z_CRP_LARGEORDERQTY").groupBy("PRODUCTMASTER_CLASSIFICATION_ID").pivot("ATNAM").agg(first("ATCOD"))
fill_nulls = lambda df, fill_values: df.fillna(fill_values)
add_aliases = lambda df: df.select(
col("PRODUCTMASTER_CLASSIFICATION_ID").alias("PRODUCTMASTER_CLASSIFICATION_ID"),
col("Z_CRP_BUDGETGROUP").alias("bgc"),
col("Z_CRP_DAFLRESTRICTED").alias("dafl_restricted"),
col("Z_CRP_HAZARDOUS").alias("Hazardous_Flag"),
col("Z_CRP_ITEMDESCRIPTION").alias("Item_Description"),
col("Z_CRP_LARGEORDERQTY").alias("Large_Order_Quantity"),
col("Z_CRP_MANUFACTURERNAME").alias("Manufacturer"),
col("Z_CRP_PML").alias("pml"),
col("Z_CRP_SERVICELINE").alias("Service_Line"),
col("Z_CRP_SIO").alias("SIO"),
col("Z_CRP_VENDORNAME").alias("Supplier"),
col("Z_CRP_SUPPLIERPARTID").alias("Supplier_Part_ID")
)
add_new_column = lambda df, column_name, value: df.withColumn(column_name, lit(value))
# Use the lambda functions
filtered_df = filter_atnam(df, ["Z_CRP_BUDGETGROUP", "Z_CRP_DAFLRESTRICTED", "Z_CRP_HAZARDOUS", "Z_CRP_ITEMDESCRIPTION", "Z_CRP_LARGEORDERQTY", "Z_CRP_MANUFACTURERNAME", "Z_CRP_PML", "Z_CRP_SERVICELINE", "Z_CRP_SIO", "Z_CRP_SUPPLIERPARTID", "Z_CRP_VENDORNAME"])
pivot_df_atwrt = pivot_atwrt(filtered_df)
pivot_df_atcod = pivot_atcod(filtered_df)
pivot_df = pivot_df_atwrt.join(pivot_df_atcod, on="PRODUCTMASTER_CLASSIFICATION_ID", how="left")
pivot_df = fill_nulls(pivot_df, {"Z_CRP_BUDGETGROUP": "NULL", "Z_CRP_DAFLRESTRICTED": "NULL", "Z_CRP_HAZARDOUS": "NULL", "Z_CRP_ITEMDESCRIPTION": "NULL", "Z_CRP_LARGEORDERQTY": 0, "Z_CRP_MANUFACTURERNAME": "NULL", "Z_CRP_PML": "NULL", "Z_CRP_SERVICELINE": "NULL", "Z_CRP_SIO": "NULL", "Z_CRP_SUPPLIERPARTID": "NULL", "Z_CRP_VENDORNAME": "NULL"})
pmc_df = add_aliases(pivot_df)
pmc_df = add_new_column(pmc_df, "Supplier_Region", "XYZ")
# Create DataFrames with the specified column names and mapped values
pm_e1maram_df = df_e1maram_pm.select(
col("PRODUCTMASTER_ID").alias("PRODUCTMASTER_ID"),
col("FORMT").alias("fleet"),
col("MFRPN").alias("manufacturer_part_id"),
col("ERNAM").alias("order_raised_by"),
col("ERSDA").alias("po_created"),
col("ERNAM").alias("requestor")
)
pm_e1mvkem_df = df_e1mvkem_pm.select(
col("PRODUCTMASTER_ID").alias("PRODUCTMASTER_ID"),
col("LFMNG").alias("Max_Order_Qty"),
col("AUMNG").alias("Min_Order_Qty")
)
pm_e1maktm_df = df_e1maktm_pm.select(
col("PRODUCTMASTER_ID").alias("PRODUCTMASTER_ID"),
col("MAKTX").alias("niin"),
col("MAKTX").alias("Short_Name")
)
pm_e1mbewm_df = df_e1mbewm_pm.select(
col("PRODUCTMASTER_ID").alias("PRODUCTMASTER_ID"),
col("PEINH").alias("Unit_Price")
)
# Join the DataFrames on PRODUCTMASTER_ID
pm_final_df = pm_e1maram_df.join(pm_e1mvkem_df, "PRODUCTMASTER_ID", "inner") \
.join(pm_e1maktm_df, "PRODUCTMASTER_ID", "inner") \
.join(pm_e1mbewm_df, "PRODUCTMASTER_ID", "inner")
# Select all columns to include in the final DataFrame
pm_final_df = pm_final_df.select(
col("PRODUCTMASTER_ID"),
col("fleet"),
col("manufacturer_part_id"),
col("order_raised_by"),
col("po_created"),
col("requestor"),
col("Max_Order_Qty"),
col("Min_Order_Qty"),
col("niin"),
col("Short_Name"),
col("Unit_Price")
)
# Add a new column with a default value
pm_final_df = pm_final_df.withColumn("Supplier_Product_Description", lit("NULL"))
# Join pmc_df and pm_final_df on PRODUCTMASTER_CLASSIFICATION_ID and PRODUCTMASTER_ID
final_df = pmc_df.join(pm_final_df, pmc_df.PRODUCTMASTER_CLASSIFICATION_ID == pm_final_df.PRODUCTMASTER_ID, "inner")
# Select all columns to include in the final DataFrame
final_df = final_df.select(
pmc_df["bgc"],
pmc_df["dafl_restricted"],
pmc_df["Hazardous_Flag"],
pmc_df["Item_Description"],
pmc_df["Large_Order_Quantity"],
pmc_df["Manufacturer"],
pmc_df["pml"],
pmc_df["Service_Line"],
pmc_df["SIO"],
pmc_df["Supplier"],
pmc_df["Supplier_Part_ID"],
pmc_df["Supplier_Region"],
pm_final_df["fleet"],
pm_final_df["manufacturer_part_id"],
pm_final_df["order_raised_by"],
pm_final_df["po_created"],
pm_final_df["requestor"],
pm_final_df["Max_Order_Qty"],
pm_final_df["Min_Order_Qty"],
pm_final_df["niin"],
pm_final_df["Short_Name"],
pm_final_df["Unit_Price"],
pm_final_df["Supplier_Product_Description"]
)
# Show the final DataFrame
#display(final_df)
table_path = "trf.productmaster"
try:
final_df.write.format("delta").mode("overwrite").saveAsTable(table_path)
print(f"Successfully write data to {table_path}")
except Exception as e:
print(f"Error write data to trf.productmaster : {e}")
Outcome
This process ensures that product master data is:
Let me know if you'd like a specific part elaborated further or need assistance implementing this workflow.
I.T Analyst at Tata Consultancy Services
1 个月Very informative