Performance Tuning in join Spark 3.0

No alt text provided for this image

When we perform join in spark and if your data is small in size .Then spark by default applies the broad cast join .There are two thing in spark broadcast join .

Broadcast join

This has 2 phase,

broadcast-> the smaller dataset is broadcasted across the executors in the cluster where the larger table is located.

hash join-> A standard hash join is performed on each executor.

In this there is no shuffling . Sounds interesting right ?

spark.sql.autoBroadcastJoinThreshold

This can be configured to set the Maximum size in bytes for a dataframe to be broadcasted.

-1 will disable broadcast join

Default is 10485760 ie 10MB for more information visit

Broadcast join parameters


Shuffle Hash Join


When the side of the table is relatively small, we choose to broadcast it out to avoid shuffle, improve performance. But because the broadcast table is first to collect to the driver segment, and then distributed to each executor redundant, so when the table is relatively large, the use of broadcast progress will be the driver and executor side caused greater pressure.

But because Spark is a distributed computing engine, you can partition the large number of data can be divided into n smaller data sets for parallel computing. This idea is applied to the Join is Shuffle Hash Join. Spark SQL will be larger table join and rule, the first table is divided into n partitions, and then the corresponding data in the two tables were Hash Join, so that is to a certain extent, the same time, Reducing the pressure on the side of the driver broadcast side, but also reduce the executor to take the entire broadcast by the memory of the table. The principle is as follows:

Shuffle Hash Join is divided into two steps:

1.      On the two tables were in accordance with the join keys re-zoning, that shuffle, the purpose is to have the same join keys value of the record assigned to the corresponding partition

2.       The corresponding partition in the data for the join, here first small table partition is constructed as a hash table, and then according to the large table recorded in the join keys value out to match

Shuffle Hash Join conditions are the following:

The average size of the partition does not exceed the value configured by spark.sql.autoBroadcastJoinThreshold, the default is 10M Source code GitHub link

The side of the table should be significantly smaller than the other side, the small side will be broadcast (obviously less than the definition of 3 times the small, here for the empirical value) We can see that in a certain size of the table, SparkSQL from the perspective of time and space, the two tables will be re-zoning, and the small table in the partition hash, to complete the join. In maintaining a certain degree of complexity on the basis of minimizing the driver and executor memory pressure, to enhance the stability of the calculation.

No alt text provided for this image

The main thing to note here is that Shuffle Hash join will be used as the join strategy only when spark.sql.join.preferSortMergeJoin is set to false

and the cost to build a hash map is less than sorting the data. By default, sort merge join is preffered over Shuffle Hash Join.

ShuffledHashJoin is still useful when :

1) any partition of the build side could fit in memory

2)one table is much smaller than the other one, then cost to build a hash table on a smaller table is smaller than sorting the larger table.

No alt text provided for this image


Sort merge join


The two implementations described above are more applicable to tables of a certain size, but when both tables are very large, it is clear that whatever of them will apply a lot of pressure on the memory. This is because the join is taken when the two are hash join, is the side of the data completely loaded into memory, the use of hash code to take bond values equal to the record to connect.

When the two tables are very large, Spark SQL uses a new algorithm to join the table, that is, Sort Merge Join. This method does not have to load all the data and then into the start hash join, but need to sort the data before the join.

You can see that the first two tables in accordance with the join keys were re-shuffle, to ensure that the same value of the join keys will be divided in the corresponding partition. After partitioning the data in each partition, sorting and then the corresponding partition within the record to connect, 

It has 3 phases:

1)Shuffle Phase: The 2 large tables are repartitioned as per the join keys across the partitions in the cluster.

2)Sort Phase: Sort the data within each partition parallelly.

3)Merge Phase: Join the 2 sorted + partitioned data. This is basically merging of the dataset by iterating over the elements and joining the rows having the same value for the join key.

Also, note that sometimes spark by default chooses Merge Sort Join which might not be always ideal.

The property spark.sql.join.preferSortMergeJoin which controls the behavior of the algorithm 

Look very familiar, right? Is also very simple, because the two sequences are orderly, from scratch traverse, hit the same key on the output; if different, left to continue to take the left, and vice versa.

It can be seen, no matter how large the partition, Sort Merge Join do not have a side of the data all loaded into memory, but that is ready to take away, which greatly enhance the large amount of data under the stability of sql join.

SMJ Github link



No alt text provided for this image


Conclusion


要查看或添加评论,请登录

Indrajit S.的更多文章

  • Common XGBoost Mistakes to Avoid

    Common XGBoost Mistakes to Avoid

    Using Default Hyperparameters - Why Wrong: Different datasets need different settings - Fix: Always tune learning_rate,…

  • Processing Large Multiline Files in Spark: Strategies and Best Practices

    Processing Large Multiline Files in Spark: Strategies and Best Practices

    Handling large, multiline files can be a tricky yet essential task when working with different types of data from…

  • Integrating a Hugging Face Model with Google Colab

    Integrating a Hugging Face Model with Google Colab

    Integrating models from Hugging Face with Google Colab. Install Hugging Face Transformers Install required libs…

  • PyTorch GPU

    PyTorch GPU

    Check if CUDA is Available: This command returns True if PyTorch can access a CUDA-enabled GPU, otherwise False. Get…

  • How to choose the right model

    How to choose the right model

    Choosing the right model for a machine learning problem involves multiple steps, each of which can influence the…

  • ???? #DataScience Insight: The Significance of Data Cleaning ????

    ???? #DataScience Insight: The Significance of Data Cleaning ????

    In the world of Data Science, it's often said that 80% of a data scientist's valuable time is spent simply finding…

  • Machine Learning Model Monitoring

    Machine Learning Model Monitoring

    Machine Learning Model Monitoring ML monitoring verifies model behavior in the early phases of the MLOps lifecycle and…

  • How to optimise XGBOOST MODEL

    How to optimise XGBOOST MODEL

    How to optimise XGBOOST model XGBoost is a powerful tool for building and optimizing machine learning models, and there…

    1 条评论
  • why you should not give too much stress on this value in ML ?

    why you should not give too much stress on this value in ML ?

    What is seed Seed in machine learning means the initialization state of a pseudo-random number generator. If you use…

    1 条评论
  • Spark concepts deep dive

    Spark concepts deep dive

    Spark core architecture To summerize it in simple line Spark runs in local and cluster and Messos mode . Image copied…

    1 条评论

社区洞察

其他会员也浏览了