Handling Data Skewness in Apache Spark with the help of AQE

Handling Data Skewness in Apache Spark with the help of AQE

Data skewness is a prevalent issue in distributed data processing systems like Apache Spark. It occurs when the distribution of data across partitions is uneven, leading to some partitions being overloaded while others remain underutilized. This imbalance can significantly degrade the performance of Spark jobs, causing longer execution times and inefficient resource utilization.

Let us explore the different aspects of data skewness, its root causes, and strategies to handle it in the latest version of Apache Spark, with a particular emphasis on Adaptive Query Execution (AQE).

Understanding Data Skewness

Data skewness in Spark typically arises during operations that involve shuffling data, such as joins, aggregations, and groupBy operations. When the data is not evenly distributed, some partitions end up with a disproportionate amount of data, leading to “hot spots” that slow down the entire job. The root causes of data skewness include:

  1. Uneven Key Distribution: When keys are not uniformly distributed, some keys may have significantly more records than others.
  2. Skewed Data Sources: Data sources themselves may be inherently skewed, leading to uneven partitioning.
  3. Improper Partitioning: Default partitioning strategies may not always be optimal for the given data distribution.

Handling Data Skewness in Apache Spark (in Batch)

To mitigate the effects of data skewness, several strategies can be employed, such as

  • Salting,
  • Broadcast Joins,
  • Increasing the number of Partitions,
  • Custom Partitioning, and
  • Adaptive Query Execution (AQE).

Now let us discuss on AQE to some extent.

It is a feature introduced in Apache Spark 3.0 (and enabled by default since Apache Spark 3.2.0) that dynamically optimizes query plans based on runtime statistics. This capability allows Spark to adjust execution strategies on-the-fly, leading to significant performance improvements, especially in scenarios involving data skewness and suboptimal query plans.

AQE is designed to address the limitations of static query optimization by allowing Spark to re-optimize query plans during execution. This dynamic approach helps in handling data skewness, optimizing join strategies, and adjusting the number of partitions based on the actual data processed.

As of Spark 3.0, there are three major features in AQE:

  1. Dynamically coalescing post-shuffle partitions,
  2. Dynamically switching join strategies, and
  3. Dynamically optimizing skew joins.

Let us discuss the first one and the last one for our case.

The Coalesce Partitions (spark.sql.adaptive.coalescePartitions.enabled) is also enabled by default. This feature coalesces the post shuffle partitions based on the map output statistics when both

  • "spark.sql.adaptive.enabled" and
  • "spark.sql.adaptive.coalescePartitions.enabled" configurations are true.

This feature simplifies the tuning of shuffle partition number when running queries. We do not need to set a proper shuffle partition number to fit your dataset. Spark can pick the proper shuffle partition number at runtime once we set a large enough initial number of shuffle partitions.

AQE skew join optimization detects skewed data automatically from shuffle file statistics. It then splits the skewed partitions into smaller subpartitions, which will be joined to the corresponding partition from the other side respectively. This feature dynamically handles skew in sort-merge join by splitting (and replicating if needed) skewed tasks into roughly evenly sized tasks. It takes effect when both

  • "spark.sql.adaptive.enabled", and
  • "spark.sql.adaptive.skewJoin.enabled" configurations are enabled.

Additionally, there are two additional parameters to tune skewJoin in AQE:

  • "spark.sql.adaptive.skewJoin.skewedPartitionFactor" (default value: 5). This adjusts the factor by which if medium partition size is multiplied, partitions are considered as skewed partitions if they are larger than that.
  • "spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes" (default value 256MB). This is the minimum size of skewed partition, and it marks partitions as skewed if they larger than the value set for this parameter.

Note :

The Spark UI is an invaluable tool for diagnosing and addressing data skewness. To Data Engineers it provides detailed insights into the execution of Spark jobs, including:

  1. Stages and Tasks: The Stages tab shows the distribution of tasks across different stages, highlighting any imbalances.
  2. Summary Metrics: Metrics such as task duration and data size can indicate skewed partitions.

By analyzing these metrics, data engineers can pinpoint the stages and tasks affected by skewness and apply appropriate mitigation strategies.

Reference materials worth reading:

  1. https://www.databricks.com/blog/2020/05/29/adaptive-query-execution-speeding-up-spark-sql-at-runtime.html#:~:text=To%20solve%20this%20problem%2C%20we,FROM%20tbl%20GROUP%20BY%20j.
  2. https://spark.apache.org/docs/latest/sql-performance-tuning.html#adaptive-query-execution
  3. https://chengzhizhao.com/deep-dive-into-handling-apache-spark-data-skew/#google_vignette

Binay Bhusan Mishra

Vice President|2 X AWS |Cloud & Digital | Data Analytics | Visual Storytelling

5 个月

Insightful

Prashaint Kumar Mishra

AWS Certified AI Practitioner | AWS Solution Architect | Azure Databricks | ADF | Senior Data Engineer | Spark | Scala | Python | Kafka | Hive | Hbase | PySpark | ETL | SQL | Apache Spark | Databricks | Agile | DWH

6 个月

Interesting

要查看或添加评论,请登录

Rahul Chakraborty的更多文章

社区洞察

其他会员也浏览了