How Spark distributes the load across multiple machines?

How Spark distributes the load across multiple machines?

Today, I would like to deep dive into how Spark distributes the load across multiple machines.

While Spark provides default configurations to help us get started, they are often not sufficient for optimal performance. It is our responsibility to instruct Spark on how we want to distribute the load across the cluster. To achieve this, we must learn and understand the concept of partitions.

In Spark, computations on datasets are translated into Jobs,Stages,Tasks. Where each task runs on exactly one core. Each task corresponds to a single partition, meaning the number of tasks equals the number of partitions.

Wide transformations require an operation called “shuffle,” which is essentially transferring data between different partitions. Shuffle is considered to be an expensive operation and we should avoid it if possible. Shuffle will results in different partitions.

Spark will first groupBy in each initial partition and only then shuffle the data, partition it by key and groupBy again in each shuffled partition to increase efficiency and reduce the number of rows while shuffling.

Controlling the number of partitions in each stage:

Adaptive Query Execution (AQE): AQE is an optimization technique in Spark SQL that makes use of the runtime statistics to choose the most efficient query execution plan. We will deep dive this feature at another time, but for now, let's stick with configuration properties.

spark.sql.files.maxPartitionBytes: The maximum number of bytes to pack into a single partition when reading files. This configuration is effective only when using file-based sources such as Parquet, JSON, and ORC. The default is 128 MB.

spark.sql.files.minPartitionNum: The suggested (not guaranteed) minimum number of partitions when reading files. If not set, the default value is spark.sql.leafNodeDefaultParallelism. This configuration is effective only when using file-based sources such as Parquet, JSON, and ORC.

In addition to these static configuration values, we often need to dynamically repartition our dataset.

For example, when we filter our dataset, we might end up with uneven partitions, causing skewed data and unbalanced processing.

Another example could be when we want our data to be written in a partitioned way to different folders by a certain key. We might want our dataset partitioned by that key in memory beforehand to avoid searching in multiple partitions while writing.

Different Types of Spark Partitioning:

Hash Partitioning: Splits our data in such a way that elements with the same hash (which can be a key, keys, or a function) will be in the same partition. We can also specify the desired number of partitions so that the final determined partition will be hash % numPartitions. Note that if numPartitions is greater than the number of groups with the same hash, there would be empty partitions.

Range Partitioning: Similar to hash partitioning, but based on a range of values. For performance reasons, this method uses sampling to estimate the ranges. As a result, the output may be inconsistent since the sampling can return different values. The sample size can be controlled by the configuration value spark.sql.execution.rangeExchange.sampleSizePerPartition

Round Robin Partitioning: Distributes the data from the source number of partitions to the target number of partitions in a round-robin manner to maintain equal distribution between the resulting partitions. Since repartitioning is a shuffle operation, if we don’t specify a value, it will use the configuration values to set the final number of partitions.

Choosing the Optimal Number of Partitions:

There is no one line straight forward answer to this question. It depends on the available resources in your cluster, as well as dataset properties such as size, cardinality, and skewness.

For me, I decide based on the number of cores (it gives clarity on what to set for spark.sql.shuffle.partitions default is 200 and I prefer 3x partitions for each core) and output partition size (let's say 100 to 200 MB for each file) or input data size.

To get complete clarity on this topic, I strongly suggest watching Daniel Tomes's talk at the 2019 Spark+AI Summit - "Apache Spark Core—Deep Dive—Proper Optimization.

Watch here: https://www.youtube.com/watch?v=daXEp4HmS-E "

References: Spark+AI Summit, Salesforce Tech blogs

要查看或添加评论,请登录

社区洞察

其他会员也浏览了