Spark Optimization Strategies
Chetesh Bhagat
Big Data Developer at EPAM Systems Specializing in Spark, Scala, Hive, Hadoop, and AWS Solutions
Introduction
Apache Spark is a really useful tool for handling big data, but to get the best results, you need to know how to make it work efficiently. That's where optimization comes in. In this article, we'll talk about some easy tips to make your Spark code run faster and use fewer resources. Some of these tips involve changing inefficient code you might not even realize you're using, while others are just small adjustments to make your code shine. Let's get started!
Table of Contents
1. Use DataFrame/Dataset over RDD
2. Don’t Collect Data
3. Persisting & Caching data
4. Use coalesce() over repartition()
5. Broadcast Large Variables
6. Aggregate with Accumulators
7. Avoid Groupbykey
8. Leverage mapPartitions() over map()
9. Use Serialized data format’s
10. Avoid User-Defined Functions (UDFs)
11. Adaptive Query Execution (AQE)
12. Disable DEBUG & INFO Logging
13. Use Broadcast?Hash?Join
1. Use DataFrame/Dataset over RDD:
Spark DataFrames act like database tables, preserving data structure and types for efficient handling. They utilize off-heap storage in binary format and generate encoder code on-the-fly, thanks to Project Tungsten. This eliminates the need for serialization and deserialization when distributing data across a cluster, resulting in performance improvements.
Using RDDs directly in Spark can cause performance issues because RDDs lack optimization techniques and involve serialization and deserialization during distribution, leading to inefficiencies like repartitioning and shuffling.
Project Tungsten
Project Tungsten enhances memory and CPU efficiency in Spark Dataset/DataFrame by rewriting operations in bytecode at runtime. It optimizes jobs for CPU and memory performance, utilizing DataFrame's columnar format with metadata to optimize queries.
Catalyst Optimizer
Catalyst Optimizer further improves code execution speed by logically enhancing queries, refactoring complex ones, and determining their execution order using rule-based and code-based optimization. Datasets ensure type safety at compile time, catching errors like referencing non-existent fields, leading to clearer code and fewer runtime errors.
2. Don’t Collect Data
When we're new to handling data, we often stick to familiar commands, like collect() in Spark, even as we tackle bigger datasets. But using collect() sends all the data back to the driver node, which can overload its memory, especially with large datasets.
# Load data
df = spark.read.parquet("/FilePath /student.parquet”)
df.collect()
So, what should we use instead?
A better alternative is using the take() action. It grabs data from the first partition it comes across and returns the result. For instance, if you only need a glimpse of the data, you can use take(1) to get just one row.
df.take(1)
3. Persisting and Caching data
Persisting
When you start using Spark, you'll find it's a "lazy evaluator," meaning it waits to do tasks until necessary, usually a good thing. But if not handled well, it can lead to issues.
When you've applied transformations to an RDD, each time you perform an action on it, Spark recalculates the RDD and its dependencies, which can be expensive.
test = [1, 2, 3, 4, 5, 6]
rdd1 = sc.parallelize(test)
rdd2 = rdd1.map(lambda x: x*x)
rdd3 = rdd2.map(lambda x: x+2)
print(rdd3.count())
print(rdd3.collect())
When I use count(), all transformations are executed, taking 0.1 seconds to finish.
When I use collect(), once more, all transformations are triggered, and it still takes 0.1 seconds to finish the task.
To break this cycle, we persist the data.
# By default cached to memory and disk
rdd3.persist(StorageLevel.MEMORY_AND_DISK)
# before rdd is persisted
print(rdd3.count())
# after rdd is persisted
print(rdd3.collect())
In our previous code, we just need to persist the final RDD. This means that when we perform the first action on the RDD, the resulting data will be saved in the cluster. After that, any further actions on the same RDD will be much quicker because the previous result is stored already.
Exploring Further Data Persistence Options
Caching:
Utilize caching techniques to store frequently accessed data in memory, thus reducing repetitive disk reads. Caching for RDDs and DataFrames is lazy, meaning data is cached only when an action is executed. Optimize performance by caching medium-sized DataFrames, as very small ones may not significantly impact speed. When caching, assign the cached data to a separate DataFrame for efficient query execution—for instance, using cached_df = sample_df.cache(). This initial caching run acts as a rehearsal, significantly reducing processing time in subsequent runs. However, be cautious with massive data sets, as overloading the cluster with excessive cached data can lead to performance issues.
df.where(col(“state”) == “PR”).cache()
When caching use in-memory columnar format, By tuning the batchSize property you can also improve Spark performance.
spark.conf.set(“spark.sqql.inMemoryColumnarStorage.compressed”, true)
spark.conf.set(“spark.sql.inMemoryColumnarStorage.batchSize”,10000)
4. Use coalesce() over repartition()
Spark's strength lies in its ability to process data in parallel, achieved by partitioning data into subsets across the cluster's cores, managed by the driver node. Adjusting the number of partitions is crucial for optimal performance: too few can leave resources idle, while too many can lead to inefficient data shuffling. Spark suggests aiming for around 128 MB per partition, but this can vary based on specific requirements.
When reducing the number of partitions, prefer using coalesce() over repartition(). coalesce() is optimized and minimizes data movement across partitions, making it ideal for larger datasets. However, if you need to increase the number of partitions, use repartition() instead.
Note: Use repartition() to increase the number of partitions.
Example repartition()
val rdd1 = spark.sparkContext.parallelize(Range(0,25), 6)
println("parallelize : "+rdd1.partitions.size)
val rdd2 = rdd1.repartition(4)
println("Repartition size : "+rdd2.partitions.size)
rdd2.saveAsTextFile("/tmp/re-partition")
This yields output?Repartition size : 4?and the repartition re-distributes the data(as shown below) from all partitions which is full shuffle leading to very expensive operation when dealing with billions and trillions of data. By?tuning?the partition size to optimal, you can?improve the?performance?of the Spark application?
Partition 1 : 1 6 10 15 19
Partition 2 : 2 3 7 11 16
Partition 3 : 4 8 12 13 17
Partition 4 : 0 5 9 14 18
Example Coalesce()
val rdd3 = rdd1.coalesce(4)
println("Repartition size : "+rdd3.partitions.size)
rdd3.saveAsTextFile("/tmp/coalesce")
If you compared the below output with section 1, you will notice partition 3 has been moved to 2 and Partition 6 has moved to 5, resulting data movement from just 2 partitions.
Partition 1 : 0 1 2
Partition 2 : 3 4 5 6 7 8 9
Partition 4 : 10 11 12
Partition 5 : 13 14 15 16 17 18 19
Note: Coalesce can only decrease the number of partitions. For even distribution of data among partition we should use repartition over coalesce.
5. Broadcast Large Variables
Spark also has something called a Broadcast variable. These are used for reading only and are stored in all the worker nodes in the cluster. They're really helpful when you need to share a big lookup table with all the nodes.
For example, if you have a file with country codes like "USA" for the United States, and you need to turn them into country names, you can use Broadcast variables to save this lookup table on all the worker nodes. It makes things faster and easier.
# lookup
country = {"IND":"India","USA":"United States of America","SA":"South Africa"}
# broadcast
broadcaster = sc.broadcast(country)
# data
userData = [("Johnny","USA"),("Faf","SA"),("Sachin","IND")]
# create rdd
rdd_data = sc.parallelize(userData)
# use broadcast variable
def convert(code):
return broadcaster.value[code]
# transformation
output = rdd_data.map(lambda x: (x[0], convert(x[1])))
# action
output.collect()
6.? Aggregate with Accumulators
Suppose you want to aggregate some value. This can be done with simple programming using a variable for a counter.
file = sc.textFile("/FileStore/tables/log.txt")
领英推荐
# variable counter
warningCount = 0
def extractWarning(line):
global warningCount
if ("WARNING" in line):
warningCount +=1
lines = file.flatMap(lambda x: x.split(","))
lines.foreach(extractWarning)
# output variable
warningCount
However, there's a catch.
When we try to view the result on the driver node, then we get a 0 value. This is because when the code is implemented on the worker nodes, the variable becomes local to the node. This means that the updated value is not sent back to the driver node. To overcome this problem, we use accumulators.
Accumulators?in Spark are shared variables used for tasks that are both associative and commutative. They come in handy for tasks like counting blank lines in a text file or identifying corrupted data.
file = sc.textFile("/FileStore/tables/log.txt")
# accumulator
warningCount = sc.accumulator(0)
def extractWarning(line):
global warningCount
if ("WARNING" in line):
warningCount +=1
lines = file.flatMap(lambda x: x.split(","))
lines.foreach(extractWarning)
# accumulator value
warningCount.value # output 4
One thing to be remembered when working with accumulators is that worker nodes can only write to accumulators. But only the driver node can read the value.
7. Avoid Groupbykey
When you started your data engineering journey, you would have certainly come across the word counts example.
# List of sample sentences
text_list = ["this is a sample sentence", "this is another sample sentence", "sample for a sample test"]
# Create an RDD
rdd = sc.parallelize(text_list)
# Split sentences into words using flatMap
rdd_word = rdd.flatMap(lambda x: x.split(" "))
# Create a paired-rdd
rdd_pair = rdd_word.map(lambda x: (x, 1))
# Count occurence per word using groupbykey()
rdd_group = rdd_pair.groupByKey()
rdd_group_count = rdd_group.map(lambda x:(x[0], len(x[1])))
rdd_group_count.collect()
But why bring it here? Well, it is the best way to highlight the inefficiency of?groupbykey()?transformation when working with pair-rdds.
Groupbykey shuffles the key-value pairs across the network and then combines them. With much larger data, the shuffling is going to be much more exaggerated. So, how do we deal with this? Reducebykey!
Reducebykey?on the other hand first combines the keys within the same partition and only then does it shuffle the data.
Here is how to count the words using reducebykey()
# Count occurence per word using reducebykey()
rdd_reduce = rdd_pair.reduceByKey(lambda x,y: x+y)
rdd_reduce.collect()
This leads to much lower amounts of data being shuffled across the network.
As you can see, the amount of data being shuffled in the case of reducebykey is much lower than in the case of groupbykey.
8. Leverage mapPartitions() over map():
When you're working with Spark and need to apply a function to each part of your data, you should use mapPartitions(). It's better because it processes bigger chunks of data at once, reducing the work overhead.
In simpler terms, mapPartitions() is like a faster version of map() when you have to do big setup tasks, like connecting to a database, for each part of your data. It's handy for speeding up your Spark jobs, especially with large datasets.
Example map()
????? import spark.implicits._
val df3 = df2.map(row=>{
val util = new Util() // Initialization happends for every record
val fullName = util.combine(row.getString(0),row.getString(1),row.getString(2))
(fullName, row.getString(3),row.getInt(5))
})
val df3Map = df3.toDF("fullName","id","salary")
Example mapPartitions()
????? ? val df4 = df2.mapPartitions(iterator => {
val util = new Util()
val res = iterator.map(row=>{
val fullName = util.combine(row.getString(0),row.getString(1),row.getString(2))
(fullName, row.getString(3),row.getInt(5))
})
res
})
val df4part = df4.toDF("fullName","id","salary")?
9. Use Serialized data format’s
In Spark pipelines, data often moves from one job to another. It's best to use optimized formats like Parquet or Avro for intermediate files. These formats are better for performance and efficiency compared to text, CSV, or JSON.
Parquet: It's a columnar format that speeds up queries and is more efficient than CSV or JSON. It's compatible with most Hadoop frameworks and offers compression and encoding for better performance. Use spark.read.parquet() to read Parquet files and df.write.parquet() to write them.
Avro: This row-based format is widely used in Spark, especially for Kafka-based pipelines. It's open-source, compact, and includes schema information, making it easy to process later. Use spark.read.format("avro").load() to read Avro files and df.write.format("avro").save() to write them.
Spark is optimized for Parquet and ORC formats due to their efficient read throughput. It supports vectorization, reducing disk I/O, and works well with columnar formats. Use compression and avoid too many small files for better performance.
?10. Avoid User-Defined Functions (UDFs):
Avoid using UDFs in Spark when possible due to serialization overhead. Prioritize built-in Spark functions for better performance. UDFs limit Spark's optimization ability and may decrease DataFrame/Dataset performance. Opt for Spark SQL built-in functions for optimization benefits. Research existing Spark SQL Functions before creating a UDF to avoid redundancy. Spark SQL regularly adds new functions, so check for existing ones first.
11. Adaptive Query Execution(AQE):
AQE?allows Spark to re-optimize and adjust query plans based on runtime statistics collected during query execution.?Basically, it's when the?spark job is running.
When AQE is turned on, as the code is being executed, Spark will?feedback statistics about the size of the data in the shuffle files so that for the next stage, when working out the logical plan, it can do the following three main things.
Switch Join Strategies:
In the diagram below,?the tree on the left shows an example plan where Spark has selected a?Sort-Merge Join. Before any data has actually been read, Spark can read that the file size is over?15MB, which is above the default threshold for using a broadcast join (?default is 10MB?) . In Spark 2.x, this is the strategy that will be used to carry out the query.?
The tree on the right shows the re-optimized plan produced with?AQE. When the query runs, we see that the actual size of the file that gets read in is?8MB. Remember that Spark is also pushing filters down to the data source and trying to collect from the file only the necessary data. The result here is an?8MB?file, which is?below?the default threshold for a?broadcast join. With AQE, Spark is able to?dynamically switch join strategies?to use the more performant?Broadcast-Hash Join instead of Sort-Merge Join
Coalesce the number of shuffle partitions:
Tuning shuffle partitions is a common pain point for Spark users. The best number of partitions depends on?data size, but data sizes may differ vastly from stage to stage so this number can be hard to tune.?
To solve this problem, we can set a relatively large number of shuffle partitions at the beginning,?then combine adjacent small partitions into bigger partitions at runtime by looking at the shuffle file statistics.
For example, let’s say we are running the query?SELECT max(i) FROM tbl GROUP BY j. The input data table is rather small so there are only two partitions before grouping. The initial shuffle partition number is set to five, so after local grouping, the partially grouped data is shuffled into five partitions. Without AQE, Spark will start five tasks to do the final aggregation. However, there are three very small partitions here, and it would be a waste to start a separate task for each of them.
Instead, AQE?coalesces?these three small partitions into one and, as a result, the final aggregation now only needs to?perform three tasks rather than five?(as shown in the image below).
Optimize Skew Joins:
Data skew occurs when data is unevenly distributed among partitions in the cluster. Severe skew can significantly downgrade query performance, especially with joins.?AQE skew join optimization detects such skew automatically from shuffle file statistics.?It then?splits the skewed partitions into smaller subpartitions, which will be joined to the corresponding partition from the other side respectively.
In the example below, note that table A has a partition A0 significantly bigger than its other partitions. The skew join optimization will thus split partition A0 into two subpartitions and join each of them to the corresponding partition B0 of table B.
Without AQE, there are four tasks running the sort-merge join with one task taking a much longer time than the others.?With AQE, there are five tasks running the join, but each task will take roughly the same amount of time, resulting in overall better performance.
Using Adaptive Query Execution can dramatically speed up your queries. It improves your query plan as your query runs, eliminating the need to collect statistics or worry about inaccurate estimations. It is turned off by default, but you can enable it by setting?spark.sql.adaptive.enabled?to?true.
12. Disable DEBUG & INFO Logging:
To improve Spark job performance, set logging levels to WARN or ERROR to reduce unnecessary log output. Logging can slow down jobs, especially in large-scale operations. During development, debug/info messages are often written to the console or log files using println() or logging frameworks like log4j.
To optimize, replace println() with log4j's info/debug methods:
??? logger.debug("Debug logging messages")
logger.info("Info logging messages")
Disable DEBUG/INFO levels to minimize logging output:
??? log4j.rootLogger=warn, stdout
I experienced this firsthand when our team's job had 5 log statements in a map() transformation, resulting in excessive I/O operations during processing. After disabling DEBUG & INFO logging, job runtimes reduced significantly, from hours to minutes. While Spark workloads are increasingly CPU and memory-bound, minimizing I/O operations remains a good practice.
13. Use Broadcast Hash Join
To optimize joins in your Spark application:
●??????? Pay attention to join operations as they can be costly.
●??????? Consider using BroadcastHashJoin when one dataset is small enough to be broadcasted.
●??????? Start with the most selective join and use SQL hints if necessary to force a specific join type.
●??????? For example, when joining a small dataset with a large one, consider forcing a broadcast join.
●??????? Check if Spark is picking up broadcast hash join; if not, force it using SQL hint.
●??????? Avoid cross-joins.
●??????? Collect statistics on tables for Spark to compute an optimal plan.
Broadcast HashJoin is efficient when one dataset is small enough to be duplicated in memory on all nodes. It eliminates significant data exchange in the cluster by broadcasting the smaller table beforehand, speeding up the join process. Adjust the Spark configuration parameter spark.sql.autoBroadcastHashJoin to control this behavior.
Streamlining Success: Unveiling Optimization Techniques with Side-by-Side?Analysis
References
●??????? https://databricks.com/blog/2015/04/28/project-tungsten-bringing-spark-closer-to-bare-metal.html
Cloud Technical Lead at ITC Infotech | MBA (NMIMS) | Azure & AWS Solutions | DevOps Strategist | Certified Drone Pilot (DGCA)
6 个月Excellent overview of Apache Spark! It's exciting to see how it's empowering data analysts to tackle big data challenges with ease. Thanks for sharing your insights…!!