Handling Data Skewness in Spark: The Power of Salting in PySpark ??

Data skew can cause significant performance bottlenecks in Apache Spark, particularly during shuffling or joining operations. When certain keys or partitions contain disproportionate amounts of data, tasks can become unbalanced, leading to inefficiencies and long processing times.

A great technique to mitigate this is salting.

?? What is Salting in Spark?

Salting involves adding a random value (a "salt") to the key column, which helps distribute the data more evenly across partitions. This reduces skewness and optimizes Spark jobs by avoiding resource contention during operations like joins.

?? How Does Salting Work?

  1. Step 1: Add a random "salt" value (e.g., a number) to the key column that causes skew.
  2. Step 2: Perform the transformation or join using the salted key.
  3. Step 3: Remove the salt after the operation to restore the original dataset structure.

???? Real-Time Example in PySpark:

Imagine you’re working with e-commerce data. You have two datasets:

  • Sales data (sales_df), where each transaction includes a product_id and quantity_sold.
  • Product catalog (product_df), which lists product details by product_id.

Problem: A few products (e.g., product_id = 101, product_id = 102) have significantly higher sales than others, leading to data skew during a join operation.

Step-by-Step Salting in PySpark:

Create Sales Data :

sales_data = [(101, 100), (101, 200), (102, 150), (102, 100), (103, 20), (104, 30)]

columns = ["product_id", "quantity_sold"] sales_df = spark.createDataFrame(sales_data, columns) sales_df.show()

Output:

|product_id |quantity_sold|

|101 |100 |

|101 |200 |

|102 |150 |

|102 |100 |

|103 |20 |

|104 |30 |

Create Product Catalog Data:

Python Code

product_data = [(101, "Laptop"), (102, "Smartphone"), (103, "Tablet"), (104, "Headphones")] product_columns = ["product_id", "product_name"]

product_df = spark.createDataFrame(product_data, product_columns) product_df.show()

Output:

|product_id |product_name|

|101 |Laptop |

|102 |Smartphone |

|103 |Tablet |

|104 |Headphones |

Introduce Salting: To handle the skew, we add a random number as a "salt" to the product_id column in both dataframes.

Python Code

from pyspark.sql.functions import col, rand # Adding salt to sales_df

sales_df_salted = sales_df.withColumn("salt", (rand() * 10).cast("int"))

sales_df_salted = sales_df_salted.withColumn("salted_product_id", col("product_id") + col("salt")) sales_df_salted.show()

Output:

|product_id |quantity_sold|salt|salted_product_id|

|101 |100 |3 |104 |

|101 |200 |7 |108 |

|102 |150 |2 |104 |

|102 |100 |9 |111 |

|103 |20 |6 |109 |

|104 |30 |1 |105 |

Perform Join Using Salted Keys:

Python Code

# Join using the salted product ID joined_df = sales_df_salted.join(product_df, sales_df_salted.salted_product_id == product_df.product_id, "inner") joined_df.select("product_id", "quantity_sold", "product_name").show()

Output:

|product_id |quantity_sold|product_name|

|101 |100 |Laptop |

|101 |200 |Laptop |

|102 |150 |Smartphone |

|102 |100 |Smartphone |

|103 |20 |Tablet |

|104 |30 |Headphones |

Remove Salt After Join: Once the join operation is complete, you can discard the salt column if necessary.

? Benefits of Salting:

  • Reduced Skew: Evenly distributes data across partitions, preventing a few partitions from being overloaded.
  • Improved Performance: Faster joins and aggregations by balancing the workload.
  • Avoids Resource Contention: Reduces the risk of out-of-memory errors caused by large skewed partitions.

???? When to Use Salting:

  • During joins or aggregations involving skewed keys.
  • When experiencing long shuffle times or executor failures due to skew.
  • In real-time streaming applications where partitioning can affect data processing efficiency.

?? Takeaway:

Salting is an effective way to handle data skewness and optimize Spark performance. By distributing the data more evenly, it ensures your jobs run faster and are more resource-efficient.


Raja S.

Chief Engineer at Samsung

1 周

Though salting helps, I see two problems with the code you have posted. 1. the join keys are not in the same domain (salted vs unsalted) 2. The salting mechanism does not produce unique ids for unique product_ids, there are clashes between two products_ids.

回复
Vinay Kumar Yerrolla

Senior Data Engineer at LTIMINDTREE | Immediate Joiner | Senior Data Analyst |AWS |Py-Spark |SQL | Power-BI | DATA BRICKS.

1 周

After joining with salted key, it gives output only 102 record but how come you got all records can you please clarify

回复
Riyaz Uddin Syed

Sr. Java Developer | Microservices Specialist | Spring Boot | Webflux | AWS | GCP | Python | Javascript | REST | Apache Kafka | Apache Spark | SQL, IBM-DB2, BigQuery & MongoDB | Artificial Intelligence & Machine Learning

3 个月

Very insightful ???? Murali !

要查看或添加评论,请登录

Murali Naga Venkata Nadh Kommanaboina的更多文章

社区洞察

其他会员也浏览了