Spark Performance Tuning: Spill

What is Spill in Apache Spark:

Spill is a critical concept in Apache Spark that significantly impacts the performance and efficiency of Spark applications. Data spill occurs when there isn’t enough memory available to hold all the necessary data for computations. To prevent out-of-memory errors and ensure tasks are completed successfully, Spark temporarily writes data to disk. It is the most common phenomenon during data shuffling. This process is essential but comes with a performance trade-off due to slower disk I/O compared to memory access as it involves data serialization, de-serialization, reading, and writing to the disk. Thus, this event can slow down Spark jobs.

Why Does Data Spill Happen?

The demarcation between execution and storage memory is not rigid. Based on the requirement and available free memory, the execution memory can extend into the storage memory and vice versa, supporting the dynamic allocation mechanism. This flexibility allows for effective utilization of the available memory. Moreover, Spark could evict available storage memory, making space for execution tasks, by following an LRU (Least Recently Used) policy, ensuring that less frequently accessed data is evicted first. If further memory is insufficient for execution tasks (like shuffling or sorting), Spark spills data to disk. This is a fallback mechanism to prevent out-of-memory errors. The spilled data can be read back into memory when needed, albeit with a performance cost due to disk I/O.

Spill metrics on Spark UI :

  • In the Summary metrics

  • Aggregated metrics by the executor

Shuffle spill (memory) is the size of the deserialized form of the shuffled data in memory. Shuffle spill (disk) is the size of the serialized form of the data on disk.

Spill(memory) and spill(disc) describe the same chunk of data. First metric is describing space occupied by those spilled data in memory before they were moved to disc, second is describing their size when written to disc. Those two metrics may be different because data may be represented differently when written to disc, for example, they may be compressed.

Understanding Memory Allocation in Apache Spark:

Memory areas in the worker node comprise On-Heap memory (JVM memory ), Off-Heap memory (Outside JVM ), and Overhead memory (Outside JVM). The on-heap memory area is used for processing tasks for the Spark job. This is the portion that leads to the initiation of the spill problem. On-Heap Memory comprises four sections:

  • Reserved Memory (~300 MB): memory reserved for the spark engine. Defaults to 300 MB and cannot be modified.
  • User Memory: This section is used to store all defined data structures (e.g., UDFs from users) that are needed for RDD conversion operations. User memory is managed by Spark.
  • Unified Memory: Unified memory consists of storage memory and execution memory. Storage memory is used for cache and persistent operations. Execution memory is used for execution shuffles, joins, sorts, and aggregations.
  • Overhead Memory: Amount of additional memory to be allocated per executor process, in MB, unless otherwise specified. This memory accounts for things like VM overheads, interned strings, other native overheads, etc. This grows with the executor size (typically 6–10%). Overhead memory is max (384 MB, 10% of driver memory). The spark executor overhead memory ensures efficient task execution, garbage collection, and safeguarding against memory-related errors.

In addition to JVM Heap, two more segments of memory are accessed by Spark.

  • Off-Heap Memory: This segment of memory lies outside the JVM but is used by the JVM for certain use cases (e.g., interning of strings). Off-heap memory can be used by Spark explicitly as well to store serialized data frames and RDDs.
  • PySpark Memory: Specific to PySpark and SparkR, this is the memory used by the libraries of the Python/R process, which resides outside of the JVM.

Simplified Memory Calculation:

Example Calculation:

Suppose an executor is configured with 14 GB (spark.executor.memory = 14 GB) and the default overhead (10% of executor memory or at least 384MB). Hence, Total Memory = Executor memory + overhead memory = 14 GB + 1.4 GB = 15.4 GB.

The default values of spark configuration are :

spark.memory.fraction = 0.6 spark.memory.storageFraction = 0.5

Let’s assume an overhead of 1.4 GB for simplicity and the default memory fraction:

Overhead Memory (‘spark.yarn.executor.MemoryOverhead') =

max (10% of executor memory or at least 384MB) = max (10% of 14336 MB, 384 MB) = 1433.6 MB

Executor Memory (‘spark.executor.memory’) = 14 GB = 14 * 1024 MB = 14336 MB

Reserved Memory = 300 MB (default allocation)

Remaining Heap Space = Executor Memory - Reserved Memory = (14336 –300) MB = 14036 MB

User Memory = Remaining Heap Space * (1- spark.memory.fraction) = 14036 0.4 = 5614.4 MB

Unified Memory = Remaining Heap Space * (spark.memory.fraction) = 14036 0.6 = 8421.6 MB

Storage Memory = Unified Memory * (spark.memory.storageFraction) = 8421.6 MB 0.5 (i.e. 50%) = 4210.8 MB.

Execution Memory =Unified Memory * (1- spark.memory.storageFraction) = 8421.6 MB * 0.5 (i.e. remaining 50%; 1- 0.5) = 4210.8 MB

Possible action that could generate spill:

Any action in a Spark job can lead to an increase in data size over the allocated memory capacity in RAM:

· Reducing the spark.sql.shuffle.partitions lead to bigger file sizes per partition, which could induce disk spills during the shuffling process initiated by wide transformations.

· groupBy() and agg() functions on large datasets require shuffling and aggregation. This operation is memory-intensive and likely to cause a disk spill.

· Setting spark.sql.files.max.PartitionBytes to high compared with the default value (128 MB).

· Skewed data could also lead to data spills, as the explode() function in Apache Spark transforms an array or map columns into multiple rows, one for each element. When used on multiple columns in a data table, it can significantly increase the number of rows, thereby increasing the amount of data that Spark needs to process. This increase in data volume can lead to a spill if the transformed dataset exceeds the memory limits allocated to Spark executors.

Solution to mitigate spill in Apache Spark:

Tune the memory configuration:

  • Adjust the heap memory with ‘spark.executor.memory’ configuration providing adequate allocation of a unified area of memory for each executor in the cluster. Consequently, increasing the execution and storage memory allocation.
  • Configure additional memory by allocating sufficient off-heap memory with ‘spark.memory.offHeap.size’ and further enable it with ‘spark.memory.offHeap.enabled’.
  • Tune adequate allocation of memory ratio between storage and execution with ‘spark.memory.storage .fraction’. Also, set the desired value for spark’s executor unified memory with ‘spark.memory.faction’.

Partition Strategies:

  • Optimize the partition size by repartition () and coalesce ().
  • Consider various partition strategies for groupBy() and join() if known about the data distribution such as hash, range, and custom partition to avoid data shuffling and harness the data locality feature.

Data Skewness:

  • Mitigate data skewness by ‘Salting’ (adding random prefixes to key) to split skewed data or use ‘Broadcast join’ between one smaller dataset (>=10mb) and another larger dataset to avoid shuffling.

Data compression:

  • Use PySpark Compression codec such as snappy and gzip to read the compatible compressed data to avoid memory footprint during data processing.
  • Enable compression during data shuffling to reduce the size of data being shuffled between executors. This helps to prevent spills during large shuffles. This feature could be enabled with ‘spark.shuffle.compress’ and ‘spark.shuffle.spill.compress’.
  • Use optimized data formats such as Parquet and ORC, which are columnar storage formats that could reduce memory usage.

Caching and Persistence:

  • Use cache() and persist() methods to reduce re-computation overhead.
  • Select the appropriate storage level for caching to balance between memory usage and CPU efficiency.

Monitoring and Profiling:

  • Regularly monitor Spark applications using Spark UI and logs to identify stages and tasks where memory spill occurs.
  • Use profiling tools to gain insights into memory usage patterns and optimize accordingly.
  • UI and logs to identify stages and tasks where memory spill occurs.
  • Use profiling tools to gain insights into memory usage patterns and optimize accordingly.

Spark SQL Performance:

  • Use AQE (Adaptive Query Execution) tuning to automatically detect the right number of shuffle partition during runtime. Enable this functionality by ‘set spark.sql.shuffle.partitions=auto’
  • Cache the table using ‘spark.table(“table”).cache().count()’ to find the actual size of the table in memory by referring to the storage tab in Spark UI.
  • Manually fine-tune the shuffle partitions as below:

Manually fine-tune the shuffle partitions as below by computing the shuffle partitions:

Let’s assume that:

Total number of worker cores in the cluster = T

Total number of data being shuffled in shuffle stage (in MB) = B

Optimal size of data to be processed per task (in MB) = 128

Hence, the multiplication factor (M): M=ceiling(B/(128*T))

Number of shuffle partitions (N): N= M*T

* The optimal size of data to be processed per task should be 128MB approximately.

If you are neither using auto-tune (AOS) nor manually fine-tuning the shuffle partitions, then as a rule of thumb set this to twice, or better thrice, the number of total worker CPU cores:

— sql

set spark.sql.shuffle.partitions = 2*<number of total worker cores in cluster>

— in PySpark

spark.conf.set(“spark.sql.shuffle.partitions”, 2*<number of total worker cores in cluster>)

Conclusion

Disk spill in Apache Spark is a common performance bottleneck that arises when in-memory data exceeds the allocated limits and overflows to disk. By understanding the causes of disk spills and employing effective mitigation strategies, such as optimizing memory usage, efficient data partitioning, and using appropriate data structures, you can significantly improve the performance and reliability of Spark applications. Regular monitoring and tuning are essential to ensure that Spark applications run efficiently and leverage the full potential of in-memory processing.

Refer to my blog on Data spill:

https://medium.com/@biswas.upasana/spark-performance-tuning-spill-838c357ac935


要查看或添加评论,请登录

社区洞察

其他会员也浏览了