ShuffleHashJoin - The what , why and when
THE SHUFFLE HASH JOIN

ShuffleHashJoin - The what , why and when

A bit of history

Prior to 1.2.0 , ShuffleHashJoin(SHJ) was the default join for spark. From 1.2.0 , it became SMJ(SortMergeJoin) join?. Later they have removed the SHJ in 1.6.0 and brought back it again in 2.0.0.

Now it is available and chosen on basis of multiple checks which we will discuss here

What is it

?Typically a hash join consist of build and probe side. Build side will be converted to an in-memory hash table and probe side will query this hash table with join key to perform the join row by row. So by this you can assume that , build side and?probe side should brought to the same machine for doing that which requires redistribution of data. And this is different from?Broadcast Hash Join because the entire dataset is not broadcasted here but each partition of build side?is converted into a hash map and the corresponding probe side partition created using same partitioner will query this partition row by row using join key. So at a time , hash map?created from a partition should fit into memory.?

Usually if we are performing SHJ between two Dataframes which is read from some input files? and performing a count, this will happen in 4 stages (3 ShuffleMapStages and 1 ResultStage) .

?Here stage 20 is the result stage and rest all are ShuffleMapStages . ShuffleMapStage is a stage with a shuffle write operation performed by ShuffleMapTasks. While ResultStage performs the actual action performed.

No alt text provided for this image


  • 2 stages for reading files in parallel will be executed first ( 2 mapPartitionStages) and calls an exchange with a hash partitioner which write the shuffle files 1 each for each shuffle partition from each input partition (if 2 input partitions are there and 200 shuffle partitions , 400 files will be written for a mapPartitionStage). But these files will be consolidated later into single file per input partition ordered by partition id and with an index file which will have each partition location within a file which can be used to fetch that particular partition by reducer (Because reducer 0 needs to read partition block 0 only). Total output blocks in a shuffle file will be equal to spark.sql.shuffle.partitions.

No alt text provided for this image


  • Next stage will read these files and create shuffledRowRDD each for each mapPartitionStage . So here we have two ShuffledRowRDDs. Then the classic hash join is performed using one shuffledRowRDD partition as hash table (build side) and other one as streaming side (probe side) for the same partition ID (Means , say , partition with ID 0 of one build side will be converted to a hash map each row of partition with ID 0 from probe side will query the build side partition ) . Each row of probe side will do a lookup to hash map of the build side partition and comes with output partition . Finally an RDD (ZippedPartitionsRDD) with number of partitions equals shuffle partitions are created

No alt text provided for this image


  • Rest of the tasks are depending up on what action you have performed on joined DataFrame. Since here we are calculating the count here:

  1. Each partitions of ZippedPartitionsRDD is read and count is calculated with in a partition. The resultant RDD holds the count from each partition . And this will be written to disk as shuffle files(See 1 record written per task which is nothing but the count per partition)

No alt text provided for this image

2. Next stage will read these shuffle files and do a combiner logic to aggregate this per partition count to total count and the result is handed over to driver

Cons of SHJ:

When you talk about cons of the SHJ, majority say that the "hash table per partition should fit into memory" is the major one. But that's not the major drawback due to which the sort merge join introduced.

?

The Major drawback with SHJ is that it produces one shuffle file each for each reducer (each shuffle partition) from each mapper(each input partition). This means , if your dataframe is having 1000 partitions and your spark.shuffle.sql.partitions=2000 then , it can produce 1000*2000=2M shuffle files on each shufflemapstage . And it will re access these files to form a consolidated file each for each input partition with one block represents one shuffle partition .(this was an enhancement introduced to shuffle sort when it is brought back again)

It opens a file and maintains a BufferedOutputStream for every partition, each requiring a memory buffer of the size of spark.shuffle.file.buffer (default 32k). Resulting in R FileStreams and memory allocated for buffers of the size R * bufferSize — where R is the number of reduce tasks (equivalent to the number of partitions after the shuffle). So on an executor with C cores there exist C * R open BufferedOutputStreams at a time. Furthermore, after writing the R files, we must re-access and read them (eventually into memory) to create an integrated output file.

?Obviously BroadcastHashJoin (BHJ) works better than this if one side of the join relation is smaller enough to be broadcasted since SHJ involves shuffling while BHJ not. That's why BHJ is the top most preferable join during spark plan creation.

Pros of SHJ:

  • Shuffled hash join avoids sort compared to sort merge join. This advantage shows up obviously when joining large table in terms of saving CPU and IO

The conditions to satisfy for spark plan to pick this strategy:

Spark has?kept the SHJ under a number of conditional check. But at the same time?it will be checked soon after checking if the join can be broadcast join and before SorMergeJoin because , if a join cannot be a broadcast join but build side?dataset partitions?are?small enough to create a hash map , it can give benefit than SMJ.

SHJ strategy selection conditions as below:

code reference :https://github.com/apache/spark/blob/branch-2.4/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala#L260

?if !conf.preferSortMergeJoin && canBuildRight(joinType) && canBuildLocalHashMap(right) && muchSmaller(right, left) ||!RowOrdering.isOrderable(leftKeys

OR

if !conf.preferSortMergeJoin && canBuildLeft(joinType) && canBuildLocalHashMap(Left) && muchSmaller(left, right) ||!RowOrdering.isOrderable(leftKeys))
        

Condition 1: !conf.preferSortMergeJoin --> sort merge join should be disabled

?This means if spark.sql.join.preferSortMergeJoin=false

?

Condition 2: canBuildRight(joinType)?OR?canBuildLeft(joinType) --> Spark should be able to use left or right side as build side for the join type.

?

Code reference : https://github.com/apache/spark/blob/branch-2.4/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala#L260:~:text=%7D-,private%20def%20canBuildRight(joinType%3A%20JoinType)%3A%20Boolean%20%3D,%7D,-private%20def%20broadcastSide

?

  • canBuildRight or canBuildLeft --> It is just checking if the right or left table can be build as hash table for hash join.
  • canBuildRight?will return true if right side is smaller and a join is inner/left outer/left semi/left anti
  • canBuildLeft will return true if left side is smaller?and a join is inner/right outer

?

?

Condition 3: canBuildLocalHashMap(Right)?OR canBuildLocalHashMap(Left) --> Left or right side data set should be small enough to build a hash map

Code reference : https://github.com/apache/spark/blob/branch-2.4/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala#L260:~:text=*%20Matches%20a%20plan%20whose%20single,%7D

?

  • If a left or right side is chosen for building hash , now canBuildLocalHashMap() for that side should be true. This is Validated based on below condition :

Data set size in bytes < spark.sql.autoBroadcastJoinThreshold * spark.sql.shuffle.partitions        

Here we can adjust two parameters . Either adjust spark.sql.autoBroadcastJoinThreshold or spark.sql.shuffle.partitions to a value which gives spark.sql.autoBroadcastJoinThreshold* spark.sql.shuffle.partitions?a?greater value than build side dataset's size in bytes.

Note that if you provide higher autoBroadcastJoinThreshold?value than Data set size in bytes, it will choose broadcast join.

?This is nothing but spark is validating that the single partition of build side should be small enough to fit into memory

?

?

Next condition has two parts , 4a or 4b should be true:

?

Condition 4a:?muchSmaller(right, left) OR muchSmaller(left, right) --> checking if build side data set is 3 times smaller than probe or streaming side

Code reference :https://github.com/apache/spark/blob/branch-2.4/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala#L260:~:text=private%20def%20muchSmaller,%7D

This condition is checking if the build data set is 3 times smaller than the probe dataset?. This check is because ,building a hash map is costlier than sorting . So spark is making sure that the build side is much smaller


Build side Data set size in bytes * 3 <= probe side data set size in bytes        

Condition4b: !RowOrdering.isOrderable(leftKeys) --> checking if the join keys are not orderable

?

Code reference :

https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ordering.scala#:~:text=def-,isOrderable,-(dataType%3A

It is checking if the join keys are not orderable. RowOrdering.isOrderable will return true if join keys are of below types

  • NullType
  • AtomicType (BinaryType,BooleanType,CharType,DateType,NumericType,StringType,TimestampType,VarcharType)
  • StructType with all fields orderable (recursive)
  • ArrayType with orderable type of the elements
  • UserDefinedType

Aravind Sriram

Sr Data Engineer | Microsoft Certified Data Engineer | Apache Spark|Airflow|Databricks|3X Microsoft Certified|3X Databricks|2X Astronomer Certified| 5 ? SQL HackerRank

1 年

Thanks for sharing Insightful content

要查看或添加评论,请登录

社区洞察

其他会员也浏览了