Performance Tuning in join Spark 3.0
Indrajit S.
Senior Data Scientist @ Citi | GenAI | Kaggle Competition Expert | PHD research scholar in Data Science
When we perform join in spark and if your data is small in size .Then spark by default applies the broad cast join .There are two thing in spark broadcast join .
Broadcast join
This has 2 phase,
broadcast-> the smaller dataset is broadcasted across the executors in the cluster where the larger table is located.
hash join-> A standard hash join is performed on each executor.
In this there is no shuffling . Sounds interesting right ?
spark.sql.autoBroadcastJoinThreshold
This can be configured to set the Maximum size in bytes for a dataframe to be broadcasted.
-1 will disable broadcast join
Default is 10485760 ie 10MB for more information visit
Shuffle Hash Join
When the side of the table is relatively small, we choose to broadcast it out to avoid shuffle, improve performance. But because the broadcast table is first to collect to the driver segment, and then distributed to each executor redundant, so when the table is relatively large, the use of broadcast progress will be the driver and executor side caused greater pressure.
But because Spark is a distributed computing engine, you can partition the large number of data can be divided into n smaller data sets for parallel computing. This idea is applied to the Join is Shuffle Hash Join. Spark SQL will be larger table join and rule, the first table is divided into n partitions, and then the corresponding data in the two tables were Hash Join, so that is to a certain extent, the same time, Reducing the pressure on the side of the driver broadcast side, but also reduce the executor to take the entire broadcast by the memory of the table. The principle is as follows:
Shuffle Hash Join is divided into two steps:
1. On the two tables were in accordance with the join keys re-zoning, that shuffle, the purpose is to have the same join keys value of the record assigned to the corresponding partition
2. The corresponding partition in the data for the join, here first small table partition is constructed as a hash table, and then according to the large table recorded in the join keys value out to match
Shuffle Hash Join conditions are the following:
The average size of the partition does not exceed the value configured by spark.sql.autoBroadcastJoinThreshold, the default is 10M Source code GitHub link
The side of the table should be significantly smaller than the other side, the small side will be broadcast (obviously less than the definition of 3 times the small, here for the empirical value) We can see that in a certain size of the table, SparkSQL from the perspective of time and space, the two tables will be re-zoning, and the small table in the partition hash, to complete the join. In maintaining a certain degree of complexity on the basis of minimizing the driver and executor memory pressure, to enhance the stability of the calculation.
The main thing to note here is that Shuffle Hash join will be used as the join strategy only when spark.sql.join.preferSortMergeJoin is set to false
and the cost to build a hash map is less than sorting the data. By default, sort merge join is preffered over Shuffle Hash Join.
ShuffledHashJoin is still useful when :
1) any partition of the build side could fit in memory
2)one table is much smaller than the other one, then cost to build a hash table on a smaller table is smaller than sorting the larger table.
Sort merge join
The two implementations described above are more applicable to tables of a certain size, but when both tables are very large, it is clear that whatever of them will apply a lot of pressure on the memory. This is because the join is taken when the two are hash join, is the side of the data completely loaded into memory, the use of hash code to take bond values equal to the record to connect.
When the two tables are very large, Spark SQL uses a new algorithm to join the table, that is, Sort Merge Join. This method does not have to load all the data and then into the start hash join, but need to sort the data before the join.
You can see that the first two tables in accordance with the join keys were re-shuffle, to ensure that the same value of the join keys will be divided in the corresponding partition. After partitioning the data in each partition, sorting and then the corresponding partition within the record to connect,
It has 3 phases:
1)Shuffle Phase: The 2 large tables are repartitioned as per the join keys across the partitions in the cluster.
2)Sort Phase: Sort the data within each partition parallelly.
3)Merge Phase: Join the 2 sorted + partitioned data. This is basically merging of the dataset by iterating over the elements and joining the rows having the same value for the join key.
Also, note that sometimes spark by default chooses Merge Sort Join which might not be always ideal.
The property spark.sql.join.preferSortMergeJoin which controls the behavior of the algorithm
Look very familiar, right? Is also very simple, because the two sequences are orderly, from scratch traverse, hit the same key on the output; if different, left to continue to take the left, and vice versa.
It can be seen, no matter how large the partition, Sort Merge Join do not have a side of the data all loaded into memory, but that is ready to take away, which greatly enhance the large amount of data under the stability of sql join.