ShuffleHashJoin - The what , why and when
Akhil Pathirippilly mana
Data Engineering / Data Warehousing / Azure / AWS /CDP/Databricks
A bit of history
Prior to 1.2.0 , ShuffleHashJoin(SHJ) was the default join for spark. From 1.2.0 , it became SMJ(SortMergeJoin) join?. Later they have removed the SHJ in 1.6.0 and brought back it again in 2.0.0.
Now it is available and chosen on basis of multiple checks which we will discuss here
What is it
?Typically a hash join consist of build and probe side. Build side will be converted to an in-memory hash table and probe side will query this hash table with join key to perform the join row by row. So by this you can assume that , build side and?probe side should brought to the same machine for doing that which requires redistribution of data. And this is different from?Broadcast Hash Join because the entire dataset is not broadcasted here but each partition of build side?is converted into a hash map and the corresponding probe side partition created using same partitioner will query this partition row by row using join key. So at a time , hash map?created from a partition should fit into memory.?
Usually if we are performing SHJ between two Dataframes which is read from some input files? and performing a count, this will happen in 4 stages (3 ShuffleMapStages and 1 ResultStage) .
?Here stage 20 is the result stage and rest all are ShuffleMapStages . ShuffleMapStage is a stage with a shuffle write operation performed by ShuffleMapTasks. While ResultStage performs the actual action performed.
2. Next stage will read these shuffle files and do a combiner logic to aggregate this per partition count to total count and the result is handed over to driver
Cons of SHJ:
When you talk about cons of the SHJ, majority say that the "hash table per partition should fit into memory" is the major one. But that's not the major drawback due to which the sort merge join introduced.
?
The Major drawback with SHJ is that it produces one shuffle file each for each reducer (each shuffle partition) from each mapper(each input partition). This means , if your dataframe is having 1000 partitions and your spark.shuffle.sql.partitions=2000 then , it can produce 1000*2000=2M shuffle files on each shufflemapstage . And it will re access these files to form a consolidated file each for each input partition with one block represents one shuffle partition .(this was an enhancement introduced to shuffle sort when it is brought back again)
It opens a file and maintains a BufferedOutputStream for every partition, each requiring a memory buffer of the size of spark.shuffle.file.buffer (default 32k). Resulting in R FileStreams and memory allocated for buffers of the size R * bufferSize — where R is the number of reduce tasks (equivalent to the number of partitions after the shuffle). So on an executor with C cores there exist C * R open BufferedOutputStreams at a time. Furthermore, after writing the R files, we must re-access and read them (eventually into memory) to create an integrated output file.
?Obviously BroadcastHashJoin (BHJ) works better than this if one side of the join relation is smaller enough to be broadcasted since SHJ involves shuffling while BHJ not. That's why BHJ is the top most preferable join during spark plan creation.
Pros of SHJ:
The conditions to satisfy for spark plan to pick this strategy:
Spark has?kept the SHJ under a number of conditional check. But at the same time?it will be checked soon after checking if the join can be broadcast join and before SorMergeJoin because , if a join cannot be a broadcast join but build side?dataset partitions?are?small enough to create a hash map , it can give benefit than SMJ.
SHJ strategy selection conditions as below:
code reference :https://github.com/apache/spark/blob/branch-2.4/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala#L260
?if !conf.preferSortMergeJoin && canBuildRight(joinType) && canBuildLocalHashMap(right) && muchSmaller(right, left) ||!RowOrdering.isOrderable(leftKeys
OR
if !conf.preferSortMergeJoin && canBuildLeft(joinType) && canBuildLocalHashMap(Left) && muchSmaller(left, right) ||!RowOrdering.isOrderable(leftKeys))
Condition 1: !conf.preferSortMergeJoin --> sort merge join should be disabled
?This means if spark.sql.join.preferSortMergeJoin=false
?
Condition 2: canBuildRight(joinType)?OR?canBuildLeft(joinType) --> Spark should be able to use left or right side as build side for the join type.
?
?
?
?
Condition 3: canBuildLocalHashMap(Right)?OR canBuildLocalHashMap(Left) --> Left or right side data set should be small enough to build a hash map
?
Data set size in bytes < spark.sql.autoBroadcastJoinThreshold * spark.sql.shuffle.partitions
Here we can adjust two parameters . Either adjust spark.sql.autoBroadcastJoinThreshold or spark.sql.shuffle.partitions to a value which gives spark.sql.autoBroadcastJoinThreshold* spark.sql.shuffle.partitions?a?greater value than build side dataset's size in bytes.
Note that if you provide higher autoBroadcastJoinThreshold?value than Data set size in bytes, it will choose broadcast join.
?This is nothing but spark is validating that the single partition of build side should be small enough to fit into memory
?
?
Next condition has two parts , 4a or 4b should be true:
?
Condition 4a:?muchSmaller(right, left) OR muchSmaller(left, right) --> checking if build side data set is 3 times smaller than probe or streaming side
This condition is checking if the build data set is 3 times smaller than the probe dataset?. This check is because ,building a hash map is costlier than sorting . So spark is making sure that the build side is much smaller
Build side Data set size in bytes * 3 <= probe side data set size in bytes
Condition4b: !RowOrdering.isOrderable(leftKeys) --> checking if the join keys are not orderable
?
Code reference :
It is checking if the join keys are not orderable. RowOrdering.isOrderable will return true if join keys are of below types
Sr Data Engineer | Microsoft Certified Data Engineer | Apache Spark|Airflow|Databricks|3X Microsoft Certified|3X Databricks|2X Astronomer Certified| 5 ? SQL HackerRank
1 年Thanks for sharing Insightful content