Wide Vs. Narrow Transformations in Spark/Distributed Compute

Wide Vs. Narrow Transformations in Spark/Distributed Compute

Overview

In this blog I will provide examples of Spark and other distributed systems doing what are known as Narrow versus Wide Transformations or Dependencies. Dependencies better describe the activity but Transformations seems to have caught on more. We will explore how Narrow Transformations and how they can make a big impact on performance. I will give you examples of when Narrow Transformations should be used. I will also give you patterns to look for where you may be able to turn a Wide Transformation into a Narrow Transformation. To better understand these topics we will use RDDs (Resilient Distributed Datasets).?

Spark still uses RDDs as a fundamental abstraction for distributed data processing. However, since the introduction of DataFrames and Datasets, RDDs have become a lower-level API that is mainly used for low-level transformation and processing of unstructured data. However, I will use RDD in this blog because the concepts are made clearer at that level.

DataFrames and Datasets are higher-level abstractions built on top of RDDs that provide a more efficient and expressive API for working with structured and semi-structured data. DataFrames and Datasets provide a more SQL-like programming interface and can leverage Spark's Catalyst optimizer to perform advanced query optimization and code generation.

That being said, RDDs are still a fundamental building block of Spark, and they provide a flexible and resilient way to handle distributed data processing. RDDs are particularly useful for handling unstructured data or performing custom operations that are not supported by the higher-level abstractions.

There are several white papers and technical documents available on RDDs in Spark, but one of the best resources is the original paper that introduced RDDs, titled "Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing." The paper was authored by Matei Zaharia, Mosharaf Chowdhury, Tathagata Das, Ankur Dave, Justin Ma, Murphy McCauley, Michael J. Franklin, Scott Shenker, and Ion Stoica, and was published in the Proceedings of the 9th USENIX Conference on Networked Systems Design and Implementation (NSDI'12).

The paper provides a comprehensive overview of RDDs, including their design, implementation, and evaluation. It also includes examples of how to use RDDs in Spark, and discusses the advantages and limitations of the RDD abstraction. The paper has been widely cited and is considered a seminal work in the field of distributed computing.

If you are interested in learning more about RDDs in Spark, I highly recommend reading this paper. It provides a thorough and detailed explanation of the underlying concepts and algorithms, and is a great resource for anyone interested in distributed data processing with Spark.

In summary, while Spark has introduced higher-level abstractions like DataFrames and Datasets, RDDs remain an important part of the Spark ecosystem and are still used for low-level processing and handling unstructured data. I use RDDs in this blog because it helps understand the concept of Wide versus Narrow transformations.


Wide Vs Narrow Transformations in Spark

Actions

Actions in Spark are operations that trigger the computation of a result by executing one or more transformations on a DataFrame. Actions can be performed on a DataFrame to generate a result, such as counting the number of rows or aggregating the data.

Narrow transformations are operations that can be performed on a single partition of a DataFrame without needing to shuffle the data across multiple partitions. These types of transformations are often faster and more efficient than wide transformations because they can be executed in parallel on each partition of the data. Here are some examples of actions in Spark that can be done as a narrow transformation:

  • count() - returns the number of rows in the DataFrame. This operation can be done as a narrow transformation because it only requires counting the rows in each partition and summing the results.
  • first() - returns the first row of the DataFrame. This operation can be done as a narrow transformation because it only requires returning the first row of each partition and selecting the minimum value.
  • take(n) - returns the first n rows of the DataFrame. This operation can be done as a narrow transformation because it only requires selecting the first n rows of each partition and concatenating the results.
  • foreach() - applies a function to each row of the DataFrame. This operation can be done as a narrow transformation because it only requires applying the function to each row of each partition in parallel.
  • reduce() - applies a function to reduce the rows of the DataFrame to a single value. This operation can be done as a narrow transformation if the function is associative and commutative, which allows the computation to be performed in parallel on each partition and then combined across partitions.

Note that the efficiency of these operations as narrow transformations may depend on the data distribution and the computation required for each row. If the computation required for each row is too complex, it may not be possible to perform the operation as a narrow transformation.

Transformations In Spark and Distributed Compute

Transformations are operations that transform a Spark DataFrame into a new DataFrame without altering the original data. Operations like select() and filter() are examples of transformations in Spark. These operations will return transformed results as a new DataFrame instead of changing the original DataFrame.


In Spark, a DataFrame is an immutable distributed collection of data that is organized into named columns. The data in a DataFrame is stored across multiple machines in a cluster, and Spark provides a set of operations to manipulate the data in the DataFrame.


Transformations are a type of operation that creates a new DataFrame by applying a function to each element of the original DataFrame without altering the original data. Transformations are executed lazily, which means that the new DataFrame is not actually computed until an action is performed.


Examples of transformations in Spark include select(), filter(), groupBy(), join(), and many others. When you apply a transformation to a DataFrame, you are creating a new DataFrame with a new set of columns or a new set of rows based on the operation you perform. The original DataFrame remains unchanged.


For example, if you have a DataFrame df with columns name, age, and gender, and you apply a select() transformation to select only the name and age columns: new_df = df.select(["name", "age"])

Scala

// Example data for demonstration purposes

val data = sc.parallelize(Seq(

??("sensor1", 1),

??("sensor2", 2),

??("sensor3", 3),

??("sensor4", 4),

??("sensor5", 5)

))

// Wide transformation: groupByKey

val groupedData = data.groupByKey()

// Narrow transformation: mapValues

val sumData = groupedData.mapValues(_.sum)

// Show the result

sumData.foreach(println)

In this example, we have a dataset of sensor readings and corresponding values. The wide transformation is groupByKey, which groups the data by the sensor, creating a key-value pair where the key is the sensor and the value is an iterable of all the corresponding values. This is a wide transformation because it shuffles the data across the cluster, which can be expensive for large datasets.

The narrow transformation is mapValues, which applies a function to each value in the iterable created by groupByKey. In this case, the function is _.sum, which computes the sum of all the values for each sensor. This is a narrow transformation because it does not require shuffling the data across the cluster. Finally, we print out the resulting dataset using foreach.

Python

# Create an RDD of key-value pairs

rdd = sc.parallelize([(1, 2), (3, 4), (5, 6)])

# Narrow transformation: mapValues() - only depends on one partition

narrow_rdd = rdd.mapValues(lambda x: x * 2)

print("Narrow transformation output:", narrow_rdd.collect())

# Wide transformation: reduceByKey() - depends on multiple partitions

wide_rdd = rdd.reduceByKey(lambda x, y: x + y)

print("Wide transformation output:", wide_rdd.collect())

In the example above, mapValues() is a narrow transformation because it only operates on one partition at a time, whereas reduceByKey() is a wide transformation because it requires shuffling data across multiple partitions.

The mapValues() transformation applies a function to the values of each key-value pair in the RDD, but only within the same partition, so it doesn't require shuffling the data across the network. The output is a new RDD with the same partitioning scheme as the original RDD.

The reduceByKey() transformation applies a function to the values of each key across all the partitions of the RDD, and combines the results to produce a new RDD with each unique key and its corresponding reduced value. This requires shuffling data across the network to ensure that all values for a given key end up on the same partition before being reduced.

SQL

-- Create a temporary view of a DataFrame with two columns: id and value

df.createOrReplaceTempView("my_table")

-- Narrow transformation: SELECT and WHERE - operates on a single partition

SELECT id, value * 2

FROM my_table

WHERE id < 3;

-- Wide transformation: GROUP BY and SUM - requires shuffling data across partitions

SELECT id, SUM(value)

FROM my_table

GROUP BY id;

In the example above, the first SQL query performs a narrow transformation because it applies a SELECT and WHERE clause that only depend on a single partition of the DataFrame. The output is a new DataFrame with the same partitioning scheme as the original DataFrame.

The second SQL query performs a wide transformation because it requires shuffling data across multiple partitions to group values by key. The output is a new DataFrame with a different partitioning scheme, where each partition contains the results for a different key.

Note that Spark SQL queries can be optimized and executed using either a narrow or wide transformation depending on the query plan and the underlying data distribution. It's important to understand the query execution plan and the characteristics of the data to choose the most efficient transformation.

Fault Tolerance Given Immutable Dataframes?

In Spark, "fault tolerance" refers to the ability of the system to continue working correctly in the event of failures, such as node failures, network failures, or disk failures. Fault tolerance is an important feature of distributed systems like Spark, where failures are common and can cause data loss and application downtime.

Immutable DataFrames are another important concept in Spark. In an immutable DataFrame, the data cannot be modified after it has been created. Instead, any operation on the DataFrame creates a new DataFrame that contains the results of the operation. This property is essential for fault tolerance in Spark because it allows the system to recover from failures by simply recreating the lost DataFrame from its lineage (i.e., the sequence of operations that created it).

By using immutable DataFrames, Spark can ensure that the data is consistent and recoverable, even in the event of failures. If a node fails during a computation, the system can simply recreate the lost DataFrame from its lineage on another node, without having to worry about data corruption or inconsistencies.

In Spark, transformations and actions are two types of operations that can be performed on distributed datasets called RDDs (Resilient Distributed Datasets). Lazy transformations are operations that do not execute immediately when called. Instead, they create a new RDD that represents the result of the transformation, without actually computing the transformation. Examples of lazy transformations include map(), filter(), and groupBy().

Lazy transformations are not executed until an eager action is called on the RDD. An eager action is an operation that triggers the computation of the RDD and returns a result to the driver program or writes the output to disk. Examples of eager actions include reduce(), count(), and saveAsTextFile().

The benefit of lazy transformations is that they allow Spark to optimize the execution plan and minimize the amount of data movement required for the computation. By deferring the execution of transformations until the last possible moment, Spark can combine multiple transformations into a single stage and reduce the number of shuffles required.

In contrast, eager actions force Spark to execute all the preceding transformations and compute the entire RDD before returning the result. This can be inefficient if you only need a small part of the data, or if you are performing exploratory data analysis and need to experiment with different transformations.

In summary, fault tolerance and immutable DataFrames are two key features of Spark that work together to provide a reliable and fault-tolerant distributed computing platform. Lazy transformations are operations that create a new RDD without actually computing the transformation, while eager actions trigger the computation of the RDD and return a result to the driver program. Using lazy transformations can help optimize the execution plan and minimize data movement, while eager actions force Spark to compute the entire RDD before returning a result.

Summary

The goal of this blog is to help you understand Wide versus Narrow Transformations and patterns to look for where you may be able to convert a Wide to a Narrow transformation. Actions in Spark are operations that trigger the computation of a result by executing one or more transformations on a DataFrame.

We reviewed? examples of Actions in Spark that can be done as a narrow transformation: count() - returns the number of rows in the DataFrame. This operation can be done as a narrow transformation because it only requires selecting the first n rows of each partition and concatenating the results. Examples of transformations in Spark include select(), filter(), groupBy(), join(), and many others. The original DataFrame remains unchanged.

Fault tolerance and immutable DataFrames are two key features of Spark that work together to provide a reliable and fault-tolerant distributed computing platform. Lazy transformations are operations that create a new RDD without actually computing the transformation, while Eager actions trigger the computation of the RDD and return a result to the driver program.?

Using lazy transformations can help optimize the execution plan and minimize data movement, while eager actions force Spark to compute the entire RDD before returning a result. Lazy transformations are operations that create a new RDD without actually computing the transformation, while eager actions trigger the computation of the RDD and return a result to the driver program.

Hasan K?rm?z?

Data Engineer at Huawei

10 个月

This shows we should refactor our SQL scripts when convert to spark SQL. Thank you soo much :)

回复
Sujoy Das

Data Engineer || IIT Delhi

1 年

Hi Don Hilborn .... Quite a helpful post . Could you please tell me whether Spark adds an I/O operation when a Wide Transformation is executed?

回复

要查看或添加评论,请登录

社区洞察

其他会员也浏览了