Microsoft Fabric: Data Transformation for Product Attributes Management
Context:
Organizations often manage product data in a structured but complex format where attributes are stored as key-value pairs across multiple rows in a database. This format, while flexible, makes it difficult to perform analytics, reporting, and downstream processing. For meaningful use, such as categorization, supplier analysis, or compliance checks, these attributes must be transformed into a tabular format with clearly defined columns.
Objective:
The goal is to transform raw product attribute data stored in a database table (raw.productmaster) into a well-structured and enriched format suitable for reporting, analysis, and integration with downstream systems. The process should:
Input:
Output:
Challenges:
The raw table may contain millions of rows, necessitating efficient filtering, grouping, and transformation.
Null or missing values in attribute columns can lead to incomplete records.
Some attributes may not be present for all products.
The list of relevant attributes (ATNAM values) may change over time, requiring flexibility in the code.
Additional contextual information, such as supplier region, needs to be integrated dynamically.
Requirements:
Transformations:
Filter the data to include only relevant attributes (ATNAM values).
Pivot rows into columns using the NAM field as headers and aggregate the first value of WRT or COD as the respective cell value.
Null Handling:
Replace missing values with predefined defaults to ensure data completeness.
Column Renaming:
Map raw attribute names to user-friendly column names, e.g., Z_BUDGETGROUP to bgc.
领英推荐
Data Enrichment:
Add a static column (Supplier_Region) with the value "XYZ" for additional context.
Output Validation:
Ensure the resulting DataFrame is well-structured, free of null values (where defaults are provided), and contains the required attributes.
Use Case Examples:
Supplier Analysis:
Enable analysis of products based on suppliers and regions.
Compliance Reporting:
Extract hazard-related flags (Hazardous_Flag) for regulatory submissions.
Inventory Management:
Use Large_Order_Quantity and Item_Description for stock optimization.
Proposed Solution: The code uses Apache Spark to process the raw data efficiently, employing:
Lambda Functions:
Modular functions for filtering, pivoting, filling nulls, renaming columns, and adding new columns.
DataFrame Transformations:
Filters, group-by operations, and joins to reshape the data.
Enrichment:
Adding context with new fields (Supplier_Region) and renaming columns for usability.
This solution addresses the need to convert raw attribute data into a clean, tabular, and enriched format, ready for diverse business use cases.
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, first, lit
# Load the raw data into DataFrames
df = spark.table("raw.productmaster_classification_e1auspm")
# Define lambda functions
filter_atnam = lambda df, conditions: df.filter(col("ATNAM").isin(conditions))
pivot_atwrt = lambda df: df.filter(col("ATNAM") != "Z_CRP_LARGEORDERQTY").groupBy("PRODUCTMASTER_CLASSIFICATION_ID").pivot("ATNAM").agg(first("ATWRT"))
pivot_atcod = lambda df: df.filter(col("ATNAM") == "Z_CRP_LARGEORDERQTY").groupBy("PRODUCTMASTER_CLASSIFICATION_ID").pivot("ATNAM").agg(first("ATCOD"))
fill_nulls = lambda df, fill_values: df.fillna(fill_values)
add_aliases = lambda df: df.select(
col("Z_CRP_BUDGETGROUP").alias("bgc"),
col("Z_CRP_DAFLRESTRICTED").alias("dafl_restricted"),
col("Z_CRP_HAZARDOUS").alias("Hazardous_Flag"),
col("Z_CRP_ITEMDESCRIPTION").alias("Item_Description"),
col("Z_CRP_LARGEORDERQTY").alias("Large_Order_Quantity"),
col("Z_CRP_MANUFACTURERNAME").alias("Manufacturer"),
col("Z_CRP_PML").alias("pml"),
col("Z_CRP_SERVICELINE").alias("Service_Line"),
col("Z_CRP_SIO").alias("SIO"),
col("Z_CRP_VENDORNAME").alias("Supplier"),
col("Z_CRP_SUPPLIERPARTID").alias("Supplier_Part_ID")
)
add_new_column = lambda df, column_name, value: df.withColumn(column_name, lit(value))
# Use the lambda functions
filtered_df = filter_atnam(df, ["Z_CRP_BUDGETGROUP", "Z_CRP_DAFLRESTRICTED", "Z_CRP_HAZARDOUS", "Z_CRP_ITEMDESCRIPTION", "Z_CRP_LARGEORDERQTY", "Z_CRP_MANUFACTURERNAME", "Z_CRP_PML", "Z_CRP_SERVICELINE", "Z_CRP_SIO", "Z_CRP_SUPPLIERPARTID", "Z_CRP_VENDORNAME"])
pivot_df_atwrt = pivot_atwrt(filtered_df)
pivot_df_atcod = pivot_atcod(filtered_df)
pivot_df = pivot_df_atwrt.join(pivot_df_atcod, on="PRODUCTMASTER_CLASSIFICATION_ID", how="left")
pivot_df = fill_nulls(pivot_df, {"Z_CRP_BUDGETGROUP": "NULL", "Z_CRP_DAFLRESTRICTED": "NULL", "Z_CRP_HAZARDOUS": "NULL", "Z_CRP_ITEMDESCRIPTION": "NULL", "Z_CRP_LARGEORDERQTY": 0, "Z_CRP_MANUFACTURERNAME": "NULL", "Z_CRP_PML": "NULL", "Z_CRP_SERVICELINE": "NULL", "Z_CRP_SIO": "NULL", "Z_CRP_SUPPLIERPARTID": "NULL", "Z_CRP_VENDORNAME": "NULL"})
pmc_df = add_aliases(pivot_df)
pmc_df = add_new_column(pmc_df, "Supplier_Region", "XYZ")
# Show the result
display(pmc_df)
This code uses lambda functions to:
Conclusion:
This code effectively transforms and enriches the product master classification data by filtering, pivoting, handling null values, and adding new columns. The use of lambda functions makes the code modular and reusable, allowing for easy adjustments and extensions in the future. This structured DataFrame can now be used for further analysis or reporting purposes.
If you have any further questions or need additional modifications, feel free to ask!
Development Head | Meridian Solution
2 个月Insightful