Microsoft Fabric: Transform and integrate multiple raw data sources into a consolidated, structured format for analytics and reporting.

Microsoft Fabric: Transform and integrate multiple raw data sources into a consolidated, structured format for analytics and reporting.

The objective of this project is to transform and integrate multiple raw data sources into a consolidated, structured format suitable for downstream analytics and reporting. The data pertains to product master information, which includes various attributes and classifications necessary for effective product management and analysis. The specific objectives are as follows:

Problem Statement:

Product master data is distributed across multiple Delta tables, each containing different attributes and details. The challenge is to consolidate and prepare this data for analytics while maintaining accuracy and completeness.

Solution:

  • Extract raw data from the following Delta tables into Spark DataFrames:productmaster_classification_e1auspm: Contains classification attributes.productmaster_e1maram: Includes core product details.productmaster_e1mvkem: Covers material valuation details.productmaster_e1maktm: Contains product descriptions in different languages.productmaster_e1mbewm: Provides stock and valuation-related details.
  • Use Spark's optimized reading capabilities to handle large datasets efficiently.


2. Data Transformation

Problem Statement:

Raw data is not in a user-friendly format:

  • The classification data needs filtering and restructuring.
  • Missing values exist, which could lead to data inaccuracies.
  • Column names are not descriptive, making it hard to understand.
  • Additional fields are required for completeness but are missing in the source.

Solution:

  • Filtering and Pivoting:Apply filtering on the productmaster_classification_e1auspm DataFrame to retain only relevant attributes based on ATNAM values.Pivot the DataFrame to convert attribute values (ATWRT for textual descriptions and ATCOD for codes) into distinct columns, making the data more structured.
  • Handling Missing Values:Fill null or missing values in the pivoted DataFrame with predefined default values to ensure data completeness and avoid issues in subsequent processing.
  • Renaming Columns:Rename the columns in the transformed DataFrame to more meaningful names, ensuring consistency and readability across teams and reports.
  • Adding New Columns:Enrich the DataFrame by adding default-value columns, such as Supplier_Region, to include missing metadata that may be useful for downstream analytics.


3. Data Integration

Problem Statement:

The product data is scattered across multiple tables, leading to fragmented insights. Joining these tables without careful handling of keys or mismatched schemas could result in data duplication, loss, or inconsistencies.

Solution:

  • Joining DataFrames:Identify the common key (PRODUCTMASTER_ID) across the DataFrames.Use Spark joins to combine the classification DataFrame with the other product master DataFrames:e1maram for core details.e1mvkem for valuation attributes.e1maktm for multilingual descriptions.e1mbewm for stock and valuation information.Handle schema mismatches and ensure all necessary columns are retained.
  • Final DataFrame Construction:After the joins, select and rename columns to ensure the resulting DataFrame has all essential attributes in a structured and readable format.Ensure the final DataFrame includes:Classification attributes.Core product details.Valuation and stock details.Multilingual product descriptions.


4. Data Loading

Problem Statement:

Once the data is integrated and transformed, it needs to be stored in a Delta table for further use. Challenges include:

  • Ensuring data integrity and successful write operations.
  • Handling large data volumes efficiently.

Solution:

  • Save the final DataFrame to a Delta table (trf.productmaster) using Spark.
  • Incorporate error-handling mechanisms during the save operation to:Verify successful writes.Log errors for debugging and troubleshooting.
  • Use Delta table features such as ACID compliance for data reliability and ease of future updates.


Summary of Problem-Solution Mapping

ProblemSolutionFragmented raw data across multiple Delta tablesExtract raw data into separate DataFrames using Spark for efficient processing.Raw classification data is unstructured and incompleteFilter, pivot, and enrich the classification data to make it structured and complete.Missing values in dataUse default values to fill nulls, ensuring no critical data gaps.Non-descriptive column namesRename columns to make them meaningful and consistent.Scattered data across tables, requiring integrationJoin DataFrames on PRODUCTMASTER_ID to create a comprehensive dataset.Data needs to be saved for downstream analytics and reportsSave as a Delta table with error handling to ensure reliable storage.


Code:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, concat, lit, substring, when, first
from delta.tables import *
from datetime import datetime


# Load the raw data into DataFrames - Change the delta table name, if required 
df = spark.table("raw.productmaster_classification_e1auspm")
df_e1maram_pm = spark.table("raw.productmaster_e1maram")
df_e1mvkem_pm = spark.table("raw.productmaster_e1mvkem")
df_e1maktm_pm = spark.table("raw.productmaster_e1maktm")
df_e1mbewm_pm = spark.table("raw.productmaster_e1mbewm")

# Define lambda functions
filter_atnam = lambda df, conditions: df.filter(col("ATNAM").isin(conditions))
pivot_atwrt = lambda df: df.filter(col("ATNAM") != "Z_CRP_LARGEORDERQTY").groupBy("PRODUCTMASTER_CLASSIFICATION_ID").pivot("ATNAM").agg(first("ATWRT"))
pivot_atcod = lambda df: df.filter(col("ATNAM") == "Z_CRP_LARGEORDERQTY").groupBy("PRODUCTMASTER_CLASSIFICATION_ID").pivot("ATNAM").agg(first("ATCOD"))
fill_nulls = lambda df, fill_values: df.fillna(fill_values)
add_aliases = lambda df: df.select(
    col("PRODUCTMASTER_CLASSIFICATION_ID").alias("PRODUCTMASTER_CLASSIFICATION_ID"),
    col("Z_CRP_BUDGETGROUP").alias("bgc"),
    col("Z_CRP_DAFLRESTRICTED").alias("dafl_restricted"),
    col("Z_CRP_HAZARDOUS").alias("Hazardous_Flag"),
    col("Z_CRP_ITEMDESCRIPTION").alias("Item_Description"),
    col("Z_CRP_LARGEORDERQTY").alias("Large_Order_Quantity"),
    col("Z_CRP_MANUFACTURERNAME").alias("Manufacturer"),
    col("Z_CRP_PML").alias("pml"),
    col("Z_CRP_SERVICELINE").alias("Service_Line"),
    col("Z_CRP_SIO").alias("SIO"),
    col("Z_CRP_VENDORNAME").alias("Supplier"),
    col("Z_CRP_SUPPLIERPARTID").alias("Supplier_Part_ID")
)
add_new_column = lambda df, column_name, value: df.withColumn(column_name, lit(value))

# Use the lambda functions
filtered_df = filter_atnam(df, ["Z_CRP_BUDGETGROUP", "Z_CRP_DAFLRESTRICTED", "Z_CRP_HAZARDOUS", "Z_CRP_ITEMDESCRIPTION", "Z_CRP_LARGEORDERQTY", "Z_CRP_MANUFACTURERNAME", "Z_CRP_PML", "Z_CRP_SERVICELINE", "Z_CRP_SIO", "Z_CRP_SUPPLIERPARTID", "Z_CRP_VENDORNAME"])
pivot_df_atwrt = pivot_atwrt(filtered_df)
pivot_df_atcod = pivot_atcod(filtered_df)
pivot_df = pivot_df_atwrt.join(pivot_df_atcod, on="PRODUCTMASTER_CLASSIFICATION_ID", how="left")
pivot_df = fill_nulls(pivot_df, {"Z_CRP_BUDGETGROUP": "NULL", "Z_CRP_DAFLRESTRICTED": "NULL", "Z_CRP_HAZARDOUS": "NULL", "Z_CRP_ITEMDESCRIPTION": "NULL", "Z_CRP_LARGEORDERQTY": 0, "Z_CRP_MANUFACTURERNAME": "NULL", "Z_CRP_PML": "NULL", "Z_CRP_SERVICELINE": "NULL", "Z_CRP_SIO": "NULL", "Z_CRP_SUPPLIERPARTID": "NULL", "Z_CRP_VENDORNAME": "NULL"})
pmc_df = add_aliases(pivot_df)
pmc_df = add_new_column(pmc_df, "Supplier_Region", "XYZ")

# Create DataFrames with the specified column names and mapped values
pm_e1maram_df = df_e1maram_pm.select(
    col("PRODUCTMASTER_ID").alias("PRODUCTMASTER_ID"),
    col("FORMT").alias("fleet"),
    col("MFRPN").alias("manufacturer_part_id"),
    col("ERNAM").alias("order_raised_by"),
    col("ERSDA").alias("po_created"),
    col("ERNAM").alias("requestor")
)

pm_e1mvkem_df = df_e1mvkem_pm.select(
    col("PRODUCTMASTER_ID").alias("PRODUCTMASTER_ID"),
    col("LFMNG").alias("Max_Order_Qty"),
    col("AUMNG").alias("Min_Order_Qty")
)

pm_e1maktm_df = df_e1maktm_pm.select(
    col("PRODUCTMASTER_ID").alias("PRODUCTMASTER_ID"),
    col("MAKTX").alias("niin"),
    col("MAKTX").alias("Short_Name")
)

pm_e1mbewm_df = df_e1mbewm_pm.select(
    col("PRODUCTMASTER_ID").alias("PRODUCTMASTER_ID"),
    col("PEINH").alias("Unit_Price")
)

# Join the DataFrames on PRODUCTMASTER_ID
pm_final_df = pm_e1maram_df.join(pm_e1mvkem_df, "PRODUCTMASTER_ID", "inner") \
                        .join(pm_e1maktm_df, "PRODUCTMASTER_ID", "inner") \
                        .join(pm_e1mbewm_df, "PRODUCTMASTER_ID", "inner")

# Select all columns to include in the final DataFrame
pm_final_df = pm_final_df.select(
    col("PRODUCTMASTER_ID"),
    col("fleet"),
    col("manufacturer_part_id"),
    col("order_raised_by"),
    col("po_created"),
    col("requestor"),
    col("Max_Order_Qty"),
    col("Min_Order_Qty"),
    col("niin"),
    col("Short_Name"),
    col("Unit_Price")
)

# Add a new column with a default value
pm_final_df = pm_final_df.withColumn("Supplier_Product_Description", lit("NULL"))

# Join pmc_df and pm_final_df on PRODUCTMASTER_CLASSIFICATION_ID and PRODUCTMASTER_ID
final_df = pmc_df.join(pm_final_df, pmc_df.PRODUCTMASTER_CLASSIFICATION_ID == pm_final_df.PRODUCTMASTER_ID, "inner")

# Select all columns to include in the final DataFrame
final_df = final_df.select(
    pmc_df["bgc"],
    pmc_df["dafl_restricted"],
    pmc_df["Hazardous_Flag"],
    pmc_df["Item_Description"],
    pmc_df["Large_Order_Quantity"],
    pmc_df["Manufacturer"],
    pmc_df["pml"],
    pmc_df["Service_Line"],
    pmc_df["SIO"],
    pmc_df["Supplier"],
    pmc_df["Supplier_Part_ID"],
    pmc_df["Supplier_Region"],   
    pm_final_df["fleet"],
    pm_final_df["manufacturer_part_id"],
    pm_final_df["order_raised_by"],
    pm_final_df["po_created"],
    pm_final_df["requestor"],
    pm_final_df["Max_Order_Qty"],
    pm_final_df["Min_Order_Qty"],
    pm_final_df["niin"],
    pm_final_df["Short_Name"],
    pm_final_df["Unit_Price"],
    pm_final_df["Supplier_Product_Description"]
)
# Show the final DataFrame
#display(final_df)

table_path = "trf.productmaster"   

try:
    final_df.write.format("delta").mode("overwrite").saveAsTable(table_path)
    print(f"Successfully write data to {table_path}")
except Exception as e:
    print(f"Error write data to  trf.productmaster : {e}")        

Outcome

This process ensures that product master data is:

  1. Transformed into a structured, enriched format.
  2. Integrated into a single DataFrame for holistic insights.
  3. Stored in a reliable and scalable Delta table for analytics and reporting.

Let me know if you'd like a specific part elaborated further or need assistance implementing this workflow.


BHARGAVA RAMUDU JUNAGARI

I.T Analyst at Tata Consultancy Services

1 个月

Very informative

回复

要查看或添加评论,请登录

RAJEEV KUMAR的更多文章

社区洞察

其他会员也浏览了