Microsoft Fabric: Data Transformation for Product Attributes Management

Microsoft Fabric: Data Transformation for Product Attributes Management

Context:

Organizations often manage product data in a structured but complex format where attributes are stored as key-value pairs across multiple rows in a database. This format, while flexible, makes it difficult to perform analytics, reporting, and downstream processing. For meaningful use, such as categorization, supplier analysis, or compliance checks, these attributes must be transformed into a tabular format with clearly defined columns.

Objective:

The goal is to transform raw product attribute data stored in a database table (raw.productmaster) into a well-structured and enriched format suitable for reporting, analysis, and integration with downstream systems. The process should:

  • Filter and extract relevant product attributes.
  • Pivot the data to convert rows of attribute-value pairs into columns.
  • Handle missing values to ensure data integrity.
  • Provide meaningful and user-friendly column names for better understanding.
  • Add new contextual information (e.g., supplier region) to enrich the dataset.

Input:

  • A raw data table (raw.productmaste) containing the following key columns:
  • PRODUCTMASTER_ID: A unique identifier for the product classification.
  • NAM: Attribute names describing product characteristics.
  • WRT: Attribute values associated with the attribute names.
  • COD: Additional attribute-related codes.

Output:

  • A transformed and enriched DataFrame (pmc_df) with:
  • Key product attributes as columns.
  • Null values filled with appropriate default values.
  • Columns renamed with meaningful aliases.
  • A new column (Supplier_Region) added with a static value.

Challenges:

  • Data Volume and Performance:

The raw table may contain millions of rows, necessitating efficient filtering, grouping, and transformation.

  • Handling Inconsistent Data:

Null or missing values in attribute columns can lead to incomplete records.

Some attributes may not be present for all products.

  • Dynamic Data Structure:

The list of relevant attributes (ATNAM values) may change over time, requiring flexibility in the code.

  • Data Enrichment:

Additional contextual information, such as supplier region, needs to be integrated dynamically.


Requirements:

Transformations:

Filter the data to include only relevant attributes (ATNAM values).

Pivot rows into columns using the NAM field as headers and aggregate the first value of WRT or COD as the respective cell value.

Null Handling:

Replace missing values with predefined defaults to ensure data completeness.

Column Renaming:

Map raw attribute names to user-friendly column names, e.g., Z_BUDGETGROUP to bgc.

Data Enrichment:

Add a static column (Supplier_Region) with the value "XYZ" for additional context.

Output Validation:

Ensure the resulting DataFrame is well-structured, free of null values (where defaults are provided), and contains the required attributes.

Use Case Examples:

Supplier Analysis:

Enable analysis of products based on suppliers and regions.

Compliance Reporting:

Extract hazard-related flags (Hazardous_Flag) for regulatory submissions.

Inventory Management:

Use Large_Order_Quantity and Item_Description for stock optimization.

Proposed Solution: The code uses Apache Spark to process the raw data efficiently, employing:

Lambda Functions:

Modular functions for filtering, pivoting, filling nulls, renaming columns, and adding new columns.

DataFrame Transformations:

Filters, group-by operations, and joins to reshape the data.

Enrichment:

Adding context with new fields (Supplier_Region) and renaming columns for usability.

This solution addresses the need to convert raw attribute data into a clean, tabular, and enriched format, ready for diverse business use cases.

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, first, lit

# Load the raw data into DataFrames
df = spark.table("raw.productmaster_classification_e1auspm")

# Define lambda functions
filter_atnam = lambda df, conditions: df.filter(col("ATNAM").isin(conditions))
pivot_atwrt = lambda df: df.filter(col("ATNAM") != "Z_CRP_LARGEORDERQTY").groupBy("PRODUCTMASTER_CLASSIFICATION_ID").pivot("ATNAM").agg(first("ATWRT"))
pivot_atcod = lambda df: df.filter(col("ATNAM") == "Z_CRP_LARGEORDERQTY").groupBy("PRODUCTMASTER_CLASSIFICATION_ID").pivot("ATNAM").agg(first("ATCOD"))
fill_nulls = lambda df, fill_values: df.fillna(fill_values)
add_aliases = lambda df: df.select(
    col("Z_CRP_BUDGETGROUP").alias("bgc"),
    col("Z_CRP_DAFLRESTRICTED").alias("dafl_restricted"),
    col("Z_CRP_HAZARDOUS").alias("Hazardous_Flag"),
    col("Z_CRP_ITEMDESCRIPTION").alias("Item_Description"),
    col("Z_CRP_LARGEORDERQTY").alias("Large_Order_Quantity"),
    col("Z_CRP_MANUFACTURERNAME").alias("Manufacturer"),
    col("Z_CRP_PML").alias("pml"),
    col("Z_CRP_SERVICELINE").alias("Service_Line"),
    col("Z_CRP_SIO").alias("SIO"),
    col("Z_CRP_VENDORNAME").alias("Supplier"),
    col("Z_CRP_SUPPLIERPARTID").alias("Supplier_Part_ID")
)
add_new_column = lambda df, column_name, value: df.withColumn(column_name, lit(value))

# Use the lambda functions
filtered_df = filter_atnam(df, ["Z_CRP_BUDGETGROUP", "Z_CRP_DAFLRESTRICTED", "Z_CRP_HAZARDOUS", "Z_CRP_ITEMDESCRIPTION", "Z_CRP_LARGEORDERQTY", "Z_CRP_MANUFACTURERNAME", "Z_CRP_PML", "Z_CRP_SERVICELINE", "Z_CRP_SIO", "Z_CRP_SUPPLIERPARTID", "Z_CRP_VENDORNAME"])
pivot_df_atwrt = pivot_atwrt(filtered_df)
pivot_df_atcod = pivot_atcod(filtered_df)
pivot_df = pivot_df_atwrt.join(pivot_df_atcod, on="PRODUCTMASTER_CLASSIFICATION_ID", how="left")
pivot_df = fill_nulls(pivot_df, {"Z_CRP_BUDGETGROUP": "NULL", "Z_CRP_DAFLRESTRICTED": "NULL", "Z_CRP_HAZARDOUS": "NULL", "Z_CRP_ITEMDESCRIPTION": "NULL", "Z_CRP_LARGEORDERQTY": 0, "Z_CRP_MANUFACTURERNAME": "NULL", "Z_CRP_PML": "NULL", "Z_CRP_SERVICELINE": "NULL", "Z_CRP_SIO": "NULL", "Z_CRP_SUPPLIERPARTID": "NULL", "Z_CRP_VENDORNAME": "NULL"})
pmc_df = add_aliases(pivot_df)
pmc_df = add_new_column(pmc_df, "Supplier_Region", "XYZ")

# Show the result
display(pmc_df)        


This code uses lambda functions to:

  1. Filter the DataFrame for specific NAM values.
  2. Pivot the DataFrame for WRT values.
  3. Pivot the DataFrame for the COD value for Z_LARGEORDERQTY.
  4. Join the pivoted DataFrames.
  5. Handle null values with default values.
  6. Add alias names to the columns.
  7. Add a new column named Supplier_Region with a default value.
  8. Display the resulting DataFrame.


Conclusion:

This code effectively transforms and enriches the product master classification data by filtering, pivoting, handling null values, and adding new columns. The use of lambda functions makes the code modular and reusable, allowing for easy adjustments and extensions in the future. This structured DataFrame can now be used for further analysis or reporting purposes.

If you have any further questions or need additional modifications, feel free to ask!


Bhola Thakur

Development Head | Meridian Solution

2 个月

Insightful

回复

要查看或添加评论,请登录

RAJEEV KUMAR的更多文章

社区洞察

其他会员也浏览了