Spark Performance Tuning: Addressing Common Issues and Optimization Strategies
Run efficient spark jobs by Devashish Somani

Spark Performance Tuning: Addressing Common Issues and Optimization Strategies

Sometimes just knowing the tools are not important, knowing how to use it plays most important part. Though we have tools to process Big Data it can be quite cumbersome at times to process the data. Sometimes even a small data can be troublesome to process by a cluster and sometimes even a big data can be processed by a small cluster if optimized correctly.

Apache Spark a big data processing engine even though very fast when compared to its preceder MapReduce, sometimes it needs Optimization/Tuning the job in order to execute efficiently and faster. If spark jobs are not running as expected always remember the first step is check if the issue is among the 5S.

5S stands for Spill, Skew, Shuffle, Storage, Serialization.

“Running a Spark job without tuning is like cooking with all the burners on high—fast, furious, and bound to burn out!”

In this article lets go through some of the common errors and issues encountered and their solutions and optimizations we can implement.

1. OutOfMemoryError

2. Spark Shuffle Errors

3. Executor Lost

4. Skewed Data

5. Serialization Errors

6. Long Garbage Collection (GC) Times

7. Incorrect Results Due to DataFrame API Misuse

8. Slow Job Execution

9. Java Heap Space Error

10. Insufficient Number of Partitions

11. Network Timeouts and Connection Errors

12. Disk Spilling Issues

13. Job Stuck in Pending State

14. Checkpointing Issues

15. Poor Performance Due to Catalyst Optimization

16. Kryo Serialization Errors

17. Unoptimized Wide Transformations

18. Cluster Resource Overutilization

19. S3 I/O Timeout

20. Poor Job execution with Hive SerDe-based R/W

21. Broadcast Timeout Issues

22. Datetime Incompatibility Exception



1. OutOfMemoryError

This is one of the most frequent errors in Apache Spark, occurring when executors or the driver run out of memory during processing. The error message typically looks like this:

ERROR Executor: Exception in task 7.0 in stage 6.0 (TID 439) java.lang.OutOfMemoryError at java.io.ByteArrayOutputStream.hugeCapacity(Unknown Source) at java.io.ByteArrayOutputStream.grow(Unknown Source) at java.io.ByteArrayOutputStream.ensureCapacity(Unknown Source) at java.io.ByteArrayOutputStream.write(Unknown Source) at java.io.ObjectOutputStream$BlockDataOutputStream.drain(Unknown Source) at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(Unknown Source) at java.io.ObjectOutputStream.writeObject0(Unknown Source) at java.io.ObjectOutputStream.writeObject(Unknown Source) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:239) at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.lang.Thread.run(Unknown Source)        

Root Cause:

  • The volume of data in production is significantly higher than in staging or QA environments.
  • There might be a sudden increase in the daily data load due to an upstream error.

Solution:

  • Optimize memory usage: Increase the memory allocated to executors and drivers.
  • Use DataFrame API: Convert RDDs to DataFrames or Datasets to utilize Spark’s Catalyst optimizer.
  • Efficient Data Persistence: Choose storage levels like MEMORY_AND_DISK for most cases. Consider MEMORY_AND_DISK_2 for large data to replicate your data only if your workload is extremely sensitive to node failures and you can’t afford any delays or recomputation. This is rare in managed environments like Serverless.
  • Dynamic Resource Allocation: Enable Spark's dynamic resource allocation to scale executors based on workload.
  • Correct Spark Configuration: Ensure the pre-production environment mirrors the production environment in terms of data volume.
  • Enable Adaptive Query Execution (AQE): Configure Spark to use AQE by setting spark.sql.adaptive.enabled is set to true to optimize the execution plan dynamically based on runtime statistics.
  • Adjust memory storage Fraction: Reducing spark.memory.storageFraction can improve performance for shuffle-heavy jobs which causes stage fail due to out-of-memory errors.?


2. Spark Shuffle Errors

Shuffle operations in Spark involve redistributing data across nodes, which can be resource-intensive. Errors such as FetchFailedException, BlockManagerException, ShuffleFileNotFoundException, ShuffleBlockException. MetadataFetchFailedException might occur:

org.apache.spark.shuffle.FetchFailedException: Failed to connect to host:port ShuffleMapStage has failed the maximum allowable number of times        
ERROR BlockManager: Error in BlockManager: BlockManagerException: Block not found        
ERROR BlockManager: ShuffleFileNotFoundException: Shuffle file not found at /path/to/shuffle/file
ERROR ShuffleBlockFetcherIterator: Exception while fetching shuffle block
java.io.FileNotFoundException: /path/to/shuffle/file (No such file or directory)
	at java.io.RandomAccessFile.open0(Native Method)
	at java.io.RandomAccessFile.open(RandomAccessFile.java:316)
	at org.apache.spark.storage.ShuffleBlockFetcherIterator$.getShuffleFile(ShuffleBlockFetcherIterator.scala:158)
	at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:120)
	at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:72)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
	at org.apache.spark.scheduler.Task.run(Task.scala:119)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:407)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)        
ERROR BlockManager: ShuffleBlockException: Exception while fetching shuffle block
ERROR ShuffleBlockFetcherIterator: Exception while fetching shuffle block
java.lang.Exception: Exception while fetching shuffle block
	at org.apache.spark.storage.ShuffleBlockFetcherIterator$$anon$1.fetch(ShuffleBlockFetcherIterator.scala:105)
	at org.apache.spark.storage.ShuffleBlockFetcherIterator$$anon$1.get(ShuffleBlockFetcherIterator.scala:107)
	at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:119)
	at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:72)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
	at org.apache.spark.scheduler.Task.run(Task.scala:119)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:407)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Error fetching shuffle block
	at org.apache.spark.storage.ShuffleBlockFetcherIterator$$anon$1.fetch(ShuffleBlockFetcherIterator.scala:98)
	... 10 more        
org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 14 partition 0        

Root Cause:

  • Operations like joins or groupBy cause uneven distribution of data across partitions leading to high disk and network I/O.
  • Executor loss or worker decommission due to cluster scaling.
  • External shuffle service is unavailable on worker nodes.
  • Excessively large shuffle files can lead to high I/O and memory consumption.

Solution:

  • Increase Shuffle Partitions: Use a higher value for spark.sql.shuffle.partitions to handle larger partitions.
  • Compress shuffle data: Set spark.io.compression.codec as either of these:

1. For Most Cases: Use Snappy for a good balance of speed and compression efficiency.

2. For Speed: Use lz4 (default) if extremely fast compression and decompression are needed.

3. For High Compression Ratio: Use gzip if minimizing data size is more critical than speed.

  • Optimize Joins: Use broadcast joins for smaller datasets.
  • Check Shuffle Service: Ensure spark.shuffle.service.enabled is set to true, and the external shuffle service is properly configured.
  • Adjust the shuffle file buffer size: Set larger buffer sizes spark.shuffle.file.buffer to improve I/O performance during shuffle operations.
  • Enable Adaptive Query Execution (AQE): Ensure spark.sql.adaptive.enabled is set to true to optimize plans based on runtime statistics, thus reducing the amount of data shuffled and processed, leading to faster job execution.
  • Increase Executor Memory and Overhead: To reduce the risk of out-of-memory errors and improve shuffle performance. (spark.executor.memoryOverhead & spark.executor.memory)
  • Adjust memory storage Fraction: Reducing spark.memory.storageFraction can improve performance for shuffle-heavy jobs which encounter frequent shuffle fetch failures or disk spills
  • Increase shuffle I/O Number of Connections Per Peer: Increasing value of spark.shuffle.io.numConnectionsPerPeer will allow more parallel connections, thus reducing the time required for shuffle read/write operations.
  • Consolidates Shuffle Files: When spark.shuffle.consolidateFiles set to true, it consolidates multiple shuffle output files from the same executor and task into fewer larger files


3. Executor Lost

Occasionally, one or more executors might be lost during job execution, resulting in the following error:

Lost executor 1 on host: Executor heartbeat timed out after 128083 ms        

Root Cause:

  • Executors are lost due to hardware failures, network issues, or container eviction in YARN/Kubernetes clusters.

Solution:

  • Increase Retry Attempts: Set spark.task.maxFailures to a higher value to allow more retries.
  • Ensure Cluster Stability: Verify that cluster resources are stable and not over-allocated.
  • Enable Speculative Execution: Set spark.speculation to true to mitigate the impact of “straggler” tasks that may cause executor loss due to long execution times. Speculative execution detects slow-running tasks and starts duplicates on other nodes. The result from the faster task is used, and the slower task is stopped.


4. Skewed Data

Skewed data can lead to errors like:

Stage X contains a task with very large runtime: 1024.0s        

Root Cause:

  • Uneven distribution of data among partitions causes workload imbalance.

Solution:

  • Adjust partitions to distribute data evenly using repartition() or coalesce() depending on the context.
  • Salting Technique: Introduce a random key to distribute data more evenly.

import org.apache.spark.sql.functions._

val saltedDF = df.withColumn("salt", expr("rand() * 10").cast("int"))
val partitionedDF = saltedDF.repartition(10, col("salt"))        

  • Custom Partitioning: Use custom partitioning to balance data distribution.

import org.apache.spark.sql.functions._

def customPartitioning(df: DataFrame): DataFrame = {
  df.repartition(10, col("partition_key"))
}

val partitionedDF = customPartitioning(df)        

  • Test Data Variations: Validate pipelines on a pre-production environment that mimics production data.
  • Enable Adaptive Query Execution(AQE): Enable AQE by setting spark.sql.adaptive.enabled to true, it can help optimize join strategies and repartition data dynamically.
  • Enable Adaptive Dynamic Partition Pruning: Setting spark.sql.adaptive.dynamicPartitionPruning to true may improve query performance by dynamically pruning irrelevant partitions based on runtime statistics, reducing data scanned and improving overall job efficiency but it comes at cost of increased overhead, high resource usage and planning time. Its not recommended for small dataset.


5. Serialization Errors

Serialization errors occur when Spark fails to serialize user-defined objects, which is necessary for data transfer between executors. Example:

org.apache.spark.SparkException: Task not serializable        

Root Cause:

  • Non-serializable classes.
  • Anonymous functions or inner classes capturing non-serializable variables.
  • Incorrect use of the transient keyword.

Solution:

  • Implement Serializable: Ensure custom classes implement the Serializable trait.
  • Check Closures: Make sure all variables in transformations are serializable.
  • Use @transient Carefully: Avoid using it on fields required in transformations.
  • Test Serializability: Test classes outside Spark to ensure they are serializable.

import java.io.{ObjectOutputStream, ByteArrayOutputStream}

def isSerializable(obj: Any): Boolean = {
  try {
    val byteArrayOutputStream = new ByteArrayOutputStream()
    val objectOutputStream = new ObjectOutputStream(byteArrayOutputStream)
    objectOutputStream.writeObject(obj)
    objectOutputStream.close()
    true
  } catch {
    case e: Exception => false
  }
}

println(isSerializable(new CustomClass("example")))  // Should print true if serializable        


6. Long Garbage Collection (GC) Times

High GC times can slow down performance, leading to warnings such as:

WARN TaskSetManager: Lost task X.X in stage Y.Y (TID Z, host, executor Y): ExecutorLostFailure (executor Y exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 128083 ms        

Root Cause:

  • Inefficient memory use leads to prolonged GC times.

Solution:

  • Minimize the GC cost by reducing the number of objects and the size of those objects
  • Tune JVM Parameters: Optimize GC using flags like spark.executor.extraJavaOptions.

Executor:

spark.executor.extraJavaOptions="-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p'"        

Driver:

spark.driver.extraJavaOptions="-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p'"        

Details of Parameters:

-XX:+UseG1GC: Use the G1 Garbage Collector (default is -XX:+UseParallelGC).

-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps: Log GC frequency and details.

-XX:InitiatingHeapOccupancyPercent: Set threshold for triggering GC.


7. Incorrect Results Due to DataFrame API Misuse

Misuse of DataFrame APIs can cause logical errors and unexpected results:

WARN QueryExecution: org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call to xxx() after end()        

Root Cause:

  • Misunderstanding of DataFrame API behaviors, especially with lazy evaluation.

Solution:

  • Debug Transformations: Use explain to inspect execution plans.
  • Understand Lazy Evaluation: Remember that transformations are lazily executed and only triggered by actions. If you expect to use the DataFrame multiple times, consider caching it to avoid redundant computation:

df_filtered.cache()  # Cache the DataFrame
df_filtered.show()
df_filtered_count = df_filtered.count()        

However, the order can impact performance and debugging.

1. Call show() First

? Use When: Debugging or validating data.

? Advantage: Immediate feedback on data content.

? Disadvantage: May be slower with large DataFrames, triggers full execution.

2. Call count() First

? Use When: Performance is a concern, or you need to confirm row count.

? Advantage: Faster for just counting rows, minimizes data processing.

? Disadvantage: No immediate data content feedback until show() is called.


8. Slow Job Execution

Poorly optimized jobs can lead to long execution times:

WARN TaskSetManager: Stage Y contains a task with very large runtime: 1024.0s        

Root Cause:

  • Inefficient query design or excessive computation.

Solution:

  • Optimize Queries: Simplify and reduce the number of stages.
  • Use Caching: Cache intermediate data to avoid recomputation.
  • Enable Adaptive Query Execution(AQE): Enable AQE by setting spark.sql.adaptive.enabled to true, it can help optimize join strategies and repartition data dynamically.
  • Enable Speculative Execution: Set spark.speculation to true to mitigate the impact of “straggler” tasks that may cause executor loss due to long execution times.


9. Java Heap Space Error

Error:

java.lang.OutOfMemoryError: Java heap space        

Root Cause:

  • The JVM heap space allocated to Spark is not enough to handle the data volume or transformations.
  • Processing a large dataset without sufficient memory.

Solution:

  • Increase the heap size by setting spark.driver.memory and spark.executor.memory to higher values.
  • Optimize your transformations to reduce memory consumption.
  • Use DataFrames or Datasets instead of RDDs for better memory management.


10. Insufficient Number of Partitions

Error:

Job performance issues or data imbalance warnings.

WARN TaskSetManager: Stage 5 contains a task with very large runtime: 1800.0s (Task 12.0 in stage 5.0, TID 70, host, executor 1)
INFO TaskSetManager: Task 12.0 in stage 5.0 (TID 70) has a skewed data partition. Time to process partition: 1800 seconds, compared to average: 200 seconds
WARN SparkContext: Skewed Data Detected: Data skew in partition 12.0 causes significant imbalance in job execution time
INFO BlockManager: Block 12 of stage 5 has a large amount of data (120 GB) that is causing imbalance in execution
WARN Executor: Task 12.0 in stage 5.0 took significantly longer to complete compared to other tasks
ERROR JobScheduler: Job 30 failed due to extreme variance in task execution times. Stage 5 task 12.0 has excessive runtime compared to others        

Root Cause:

  • The number of partitions is too low, causing uneven data distribution and resource underutilization.
  • High skewness or large single partitions causing task imbalances.

Solution:

  • Increase the number of partitions using repartition() or coalesce() depending on the context.
  • Monitor Spark UI to identify skewness and adjust partitions accordingly.
  • Set spark.sql.shuffle.partitions to a higher value (e.g., 200).


11. Network Timeouts and Connection Errors

Error:

org.apache.spark.network.netty.NettyRpcEnv: Ignored failure: java.io.IOException: Connection timed out        

Root Cause:

  • Network latency or failures between driver and executors.
  • Poor network configurations or transient network issues.

Solution:

  • Increase spark.network.timeout and spark.executor.heartbeatInterval to accommodate longer delays.
  • Ensure network stability and optimize cluster node placement.
  • Increase shuffle retry parameters spark.shuffle.io.maxRetries & spark.shuffle.io.retryWait which helps in scenarios where network instability or disk I/O issues may cause executor failures during shuffle operations. A longer wait time can give the system enough time to recover from transient issues


12. Disk Spilling Issues

Error:

WARN MemoryStore: Not enough space to cache map at /tmp/spark-abc123/blocks/part-0.0
INFO BlockManager: Block storage is running low on space. Initiating disk spill.
INFO MemoryStore: Disk spilling enabled to free up memory. Attempting to spill data to disk.
WARN MemoryStore: Memory limit exceeded. Spilling to disk. Used memory: 80% of limit.
INFO MemoryStore: Spilled data to disk at /tmp/spark-abc123/blocks/part-0.0
ERROR TaskSetManager: Task 10.0 in stage 20.0 (TID 150) failed due to disk spilling issues.
ERROR BlockManager: Unable to store block in memory and disk spilling failed. Possible out-of-disk-space issue.
WARN SparkContext: Job 30 failed due to insufficient memory and disk space for caching.        

Root Cause:

  • Insufficient memory causing Spark to spill data to disk, leading to high I/O wait times.
  • Inefficient memory usage or large shuffle operations.

Solution:

  • Increase executor memory or memory overhead (spark.executor.memory and spark.executor.memoryOverhead).
  • Use more partitions to distribute data evenly and reduce the size of shuffled data.


13. Job Stuck in Pending State

Error:

Jobs remain in the pending state without progressing.

INFO KubernetesClient: Kubernetes client is configured and connected to the cluster.
INFO SparkSubmit: Submitting job with the following configuration: ...
INFO SparkKubernetesClient: Spark application has been started on Kubernetes
INFO KubernetesSchedulerBackend: Executor request sent for 50 executors
INFO KubernetesSchedulerBackend: Waiting for executor pods to be scheduled
INFO KubernetesPodScheduler: Scheduling executor pods for job 12
INFO KubernetesPodScheduler: Executor pod spark-executor-0 is pending
WARN KubernetesPodScheduler: Pod spark-executor-0 is in pending state for more than 10 minutes
WARN KubernetesPodScheduler: Insufficient resources available for pod spark-executor-1
ERROR KubernetesPodScheduler: Pod scheduling failed for job 12 due to lack of resources
INFO SparkContext: Job 12 is pending due to insufficient executor resources
ERROR SparkContext: Job 12 has not progressed; all executor pods remain in the pending state
INFO SparkContext: Monitoring job status. No progress observed for 30 minutes.
WARN KubernetesSchedulerBackend: Executors could not be allocated. Resource request could not be fulfilled.
ERROR SparkContext: Job 12 failed due to timeout. Executors were not scheduled in the expected time frame.        

Root Cause:

  • Cluster resource constraints such as lack of available executors or cores.
  • Misconfigured cluster settings (e.g., dynamic allocation settings).

Solution:

  • Check cluster resource availability and increase if necessary.
  • Review and adjust spark.dynamicAllocation.minExecutors and spark.dynamicAllocation.maxExecutors configurations.
  • Verify there are no deadlocks or waiting stages in the job.


14. Checkpointing Issues

Error:

java.io.IOException: Checkpoint directory is not set.        

Root Cause:

  • No checkpoint directory specified for operations that require checkpointing.
  • Incorrect or inaccessible checkpoint directory path.

Solution:

  • Set the checkpoint directory using sc.setCheckpointDir ("s3://path/to/checkpoint/").
  • Ensure the directory is accessible and has write permissions.


15. Poor Performance Due to Catalyst Optimization

Error:

Slow query execution due to suboptimal execution plans.

INFO QueryExecution: Spark SQL Context running in SQL execution mode.
WARN TaskSetManager: Stage 5 contains a task with very large runtime: 2100.0s
ERROR DAGScheduler: Job 17 failed: Task 5.0 in stage 5.0 failed 4 times, most recent failure: Lost task 5.0 in stage 5.0 (TID 248) (executor 4): Task took too long to execute. Likely due to an inefficient execution plan.
 
INFO QueryExecution: Analysis of query took 10s, but the execution took 50m. 
WARN SparkSession: Plan Description: Project [id#109, name#110]
 - Filter is not efficiently pruning data partitions
 - Full Scan on a large dataset 
 - Cartesian Product with large number of rows

INFO SparkPlan: Physical Plan:
  *Project [id#109, name#110]
  *Filter (isnull(name#110) || (name#110 = "ABC"))
    +- *FileScan parquet [id#109, name#110] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/path/to/data], PartitionFilters: [], PushedFilters: [IsNull(name)]
 
INFO SparkPlan: Execution plan may benefit from optimizations or additional filtering

WARN SparkContext: Task execution time exceeds expected bounds due to a suboptimal execution plan.        

Root Cause:

  • Catalyst optimizer generates inefficient query plans.
  • Joins, filters, or aggregations are not executed in the most optimal order.

Solution:

  • Use explain() to analyze and debug query plans.
  • Reorder join operations or use broadcast joins for smaller datasets.
  • Avoid unnecessary transformations and filters after join operations.
  • Enable Adaptive Query Execution (AQE): Setting spark.sql.adaptive.enabled to true dynamically optimizes query execution plans based on runtime statistics, which can enhance performance by adapting to data characteristics.


16. Kryo Serialization Errors

Error:

java.io.IOException: Kryo serialization failed        

Root Cause:

  • Kryo serialization issues when complex objects or unsupported data types are serialized.
  • Objects larger than the default buffer size.

Solution:

  • Register frequently used classes with Kryo using spark.kryo.classesToRegister.
  • For complex or non-standard objects, define custom serializers by extending Kryo.Serializer and register them with Kryo

import com.esotericsoftware.kryo.Kryo
import com.esotericsoftware.kryo.Serializer

class MyClassSerializer extends Serializer[MyClass] {
  override def write(kryo: Kryo, output: Output, obj: MyClass): Unit = {
    // Implement serialization logic
  }

  override def read(kryo: Kryo, input: Input, `type`: Class[MyClass]): MyClass = {
    // Implement deserialization logic
  }
}

val conf = new SparkConf()
  .set("spark.serializer", classOf[KryoSerializer].getName)
  .set("spark.kryo.classesToRegister", "com.example.MyClass")        

  • Ensure all classes used in transformations are serializable by Kryo.
  • Tune Kryo Settings: You can control the buffer size with spark.kryoserializer.buffer and `spark.kryoserializer.buffer.max` settings.
  • Ensure you are using the latest version of Kryo


17. Unoptimized Wide Transformations

Error:

Job execution is slow with long stages.

WARN TaskSetManager: Stage X contains a task with very large runtime: 2048.0s        

Root Cause:

  • Wide transformations like groupBy, join, or repartition that cause heavy shuffles.
  • Unoptimized repartitioning or sorting.

Solution:

  • Use narrow transformations wherever possible (map, filter, etc.).
  • Cache intermediate DataFrames to minimize recomputation and shuffle data.
  • Use coalesce instead of repartition to reduce the number of partitions.


18. Cluster Resource Overutilization

Error:

Executors get killed or stuck due to high resource usage.

INFO KubernetesClient: Kubernetes client connected. Submitting Spark application.
INFO SparkSubmit: Submitting Spark job with 50 executors and specified resource configurations.
INFO KubernetesSchedulerBackend: Requesting 50 executor pods from Kubernetes
INFO KubernetesPodScheduler: Executor pod spark-executor-0 is being scheduled
INFO KubernetesPodScheduler: Executor pod spark-executor-1 is being scheduled

ERROR Executor: Task 5.0 in stage 10.0 (TID 125) exceeded memory limits and was killed by Kubernetes.
ERROR Executor: Task 6.0 in stage 10.0 (TID 126) failed due to out-of-memory errors.
INFO KubernetesPodScheduler: Executor pod spark-executor-2 is in a stuck state due to high resource usage
WARN KubernetesPodScheduler: Executor pod spark-executor-2 has been in a pending state for 15 minutes due to resource constraints

ERROR KubernetesClient: Executor pod spark-executor-0 terminated unexpectedly with exit code 137
ERROR KubernetesClient: Pod spark-executor-0 was killed due to OOM (Out of Memory) by Kubernetes
INFO KubernetesSchedulerBackend: Executor pod spark-executor-1 failed to start, retrying
WARN SparkContext: Executors are being killed or stuck due to high resource usage. Job progress is affected.
ERROR SparkContext: Job 20 has stalled due to high resource consumption and executor failures.

INFO KubernetesPodScheduler: Monitoring pod status. Executors may need resource adjustments.
WARN KubernetesPodScheduler: Excessive resource usage detected. Review resource configurations and job demands.        

Root Cause:

  • Insufficient cluster resources for the job's demand.
  • Misconfigured resource settings, leading to resource contention.

Solution:

  • Adjust executor memory, core settings, and dynamic allocation to balance resource utilization.
  • Monitor and scale the cluster based on workload requirements.
  • Use spark.yarn.executor.memoryOverhead or spark.executor.cores wisely to prevent over-utilization.


19. S3 I/O Timeout

Error:

Executors get killed or stuck due to high resource usage.

ERROR BlockManager: Failed to fetch block rdd_3_7 from executor 1.0 due to S3 I/O timeout
WARN TaskSetManager: Task 3.0 in stage 5.0 failed: FetchFailure, result stage 5.0 is not retryable        

Root Cause:

  • Network Latency: High latency in network communication with S3.
  • S3 Throughput Limits: Exceeding the throughput limits of S3.

Solution:

  • Increase timeout settings using spark.network.timeout.
  • Reduce frequency of executor sending signal to driver with spark.executor.heartbeatInterval to reduce network load.
  • Add retry configurations (spark.hadoop.fs.s3a.retry.interval & spark.hadoop.fs.s3a.retry.limit)
  • Optimize file sizes by using fewer, larger files instead of many small files.
  • Use Columnar Formats: Store data in efficient formats like Parquet or ORC.
  • Increase Parallelism: Set spark.sql.shuffle.partitions to higher value.


20. Poor Job execution with Hive SerDe-based R/W

Using the Hive SerDe (Serializer/Deserializer) for reading and writing Hive Parquet files.

Root Cause:

  • When spark.sql.hive.convertMetastoreParquet is set to false, Spark uses the Hive SerDe (Serializer/Deserializer) to read Parquet tables, which is less efficient than using Spark's native Parquet reader.
  • This might be necessary in environments where compatibility with Hive’s specific Parquet format or SerDe is required., such as reading an int32 column as int64 across different files, but it reduces the performance advantages of Spark's Catalyst optimizer and Tungsten execution engine.

Result:

  • Performance Degradation: You may see too many tasks in spark web-ui and job may run slower due to the overhead of Hive's SerDe processing, which is not optimized for Spark's execution engine.

Solution:

  • Re-enable Native Parquet Reader: Set spark.sql.hive.convertMetastoreParquet to true:
  • Schema Normalization: Instead of disabling the native reader, normalize your schemas to ensure consistent data types (e.g., converting all int32 to int64).


21. Broadcast Timeout Issues

Issue: Broadcast timeout errors occur when the broadcasted data cannot be sent to all executors within the allotted time, causing job failures or delays.

Error Example:

org.apache.spark.network.TimeoutException: Waiting for 1000 ms timed out        

Root Cause:

  • Large broadcast variables that exceed the default timeout limits.
  • Network issues or slow communication between nodes.

Solution:

  • Increase Broadcast Timeout: Set spark.sql.broadcastTimeout to more value.
  • Optimize Broadcast Joins:Use broadcast joins only for datasets that fit comfortably in memory. Check the size of the broadcasted data and ensure it is optimized for broadcasting.
  • Disable broadcast joins for very large datasets or when the broadcasted data size exceeds available memory by setting spark.sql.autoBroadcastJoinThreshold to -1.


22. Datetime Incompatibility Exception

Error:

java.lang.IllegalArgumentException: Cannot decode the Parquet datetime value 1582-10-10T00:00:00.000 to 1970-01-01T00:00:00.000 due to datetime rebasing.
Caused by: org.apache.spark.SparkException: Failed to decode the datetime value due to mismatched datetime format. Consider setting the spark.sql.legacy.parquet.datetimeRebaseModeInRead config to LEGACY to handle rebasing.        

Root Cause:

  • Mismatch between the date formats (Julian vs. Gregorian) used in different Spark versions.

Solution:

  • Ensures correct date handling and compatibility with older data formats. Set spark.sql.legacy.parquet.datetimeRebaseModeInWrite & spark.sql.parquet.datetimeRebaseModeInRead to LEGACY to handle date conversion:



By understanding and addressing these common errors, you can ensure smoother operations and better performance in your Apache Spark production environment.

From tweaking memory settings to handling shuffle operations, there’s a lot to manage for smooth and efficient data processing.

Remember, mastering Spark isn’t just about understanding its capabilities—it’s also about recognising that the default settings are merely a starting point. So dive deep into the configuration settings, experiment with different approaches, and tailor Spark to fit your needs. With a bit of tuning and a dash of humor, you’ll turn those Spark challenges into opportunities for optimization. Happy Spark tuning!



Pawan Kumar Chahar

Data Engineer | Python | PySpark | Pandas | DBT | ETL | BigQuery | Snowflake | SQL | GCP | AWS | Docker | Tableau

2 个月

Very Good Explanation Devashish Somani

要查看或添加评论,请登录

Devashish Somani的更多文章

社区洞察

其他会员也浏览了