Handling Data Skewness in Spark: The Power of Salting in PySpark ??
Murali Naga Venkata Nadh Kommanaboina
Microsoft Certified Azure Data Engineer | Senior Data Engineer | Cloud Data Solutions Expert | Machine Learning Expert | Azure & Big Data Specialist
Data skew can cause significant performance bottlenecks in Apache Spark, particularly during shuffling or joining operations. When certain keys or partitions contain disproportionate amounts of data, tasks can become unbalanced, leading to inefficiencies and long processing times.
A great technique to mitigate this is salting.
?? What is Salting in Spark?
Salting involves adding a random value (a "salt") to the key column, which helps distribute the data more evenly across partitions. This reduces skewness and optimizes Spark jobs by avoiding resource contention during operations like joins.
?? How Does Salting Work?
???? Real-Time Example in PySpark:
Imagine you’re working with e-commerce data. You have two datasets:
Problem: A few products (e.g., product_id = 101, product_id = 102) have significantly higher sales than others, leading to data skew during a join operation.
Step-by-Step Salting in PySpark:
Create Sales Data :
sales_data = [(101, 100), (101, 200), (102, 150), (102, 100), (103, 20), (104, 30)]
columns = ["product_id", "quantity_sold"] sales_df = spark.createDataFrame(sales_data, columns) sales_df.show()
Output:
|product_id |quantity_sold|
|101 |100 |
|101 |200 |
|102 |150 |
|102 |100 |
|103 |20 |
|104 |30 |
Create Product Catalog Data:
Python Code
product_data = [(101, "Laptop"), (102, "Smartphone"), (103, "Tablet"), (104, "Headphones")] product_columns = ["product_id", "product_name"]
product_df = spark.createDataFrame(product_data, product_columns) product_df.show()
Output:
|product_id |product_name|
|101 |Laptop |
|102 |Smartphone |
|103 |Tablet |
|104 |Headphones |
领英推荐
Introduce Salting: To handle the skew, we add a random number as a "salt" to the product_id column in both dataframes.
Python Code
from pyspark.sql.functions import col, rand # Adding salt to sales_df
sales_df_salted = sales_df.withColumn("salt", (rand() * 10).cast("int"))
sales_df_salted = sales_df_salted.withColumn("salted_product_id", col("product_id") + col("salt")) sales_df_salted.show()
Output:
|product_id |quantity_sold|salt|salted_product_id|
|101 |100 |3 |104 |
|101 |200 |7 |108 |
|102 |150 |2 |104 |
|102 |100 |9 |111 |
|103 |20 |6 |109 |
|104 |30 |1 |105 |
Perform Join Using Salted Keys:
Python Code
# Join using the salted product ID joined_df = sales_df_salted.join(product_df, sales_df_salted.salted_product_id == product_df.product_id, "inner") joined_df.select("product_id", "quantity_sold", "product_name").show()
Output:
|product_id |quantity_sold|product_name|
|101 |100 |Laptop |
|101 |200 |Laptop |
|102 |150 |Smartphone |
|102 |100 |Smartphone |
|103 |20 |Tablet |
|104 |30 |Headphones |
Remove Salt After Join: Once the join operation is complete, you can discard the salt column if necessary.
? Benefits of Salting:
???? When to Use Salting:
?? Takeaway:
Salting is an effective way to handle data skewness and optimize Spark performance. By distributing the data more evenly, it ensures your jobs run faster and are more resource-efficient.
Chief Engineer at Samsung
1 周Though salting helps, I see two problems with the code you have posted. 1. the join keys are not in the same domain (salted vs unsalted) 2. The salting mechanism does not produce unique ids for unique product_ids, there are clashes between two products_ids.
Senior Data Engineer at LTIMINDTREE | Immediate Joiner | Senior Data Analyst |AWS |Py-Spark |SQL | Power-BI | DATA BRICKS.
1 周After joining with salted key, it gives output only 102 record but how come you got all records can you please clarify
Sr. Java Developer | Microservices Specialist | Spring Boot | Webflux | AWS | GCP | Python | Javascript | REST | Apache Kafka | Apache Spark | SQL, IBM-DB2, BigQuery & MongoDB | Artificial Intelligence & Machine Learning
3 个月Very insightful ???? Murali !