Wide Vs. Narrow Transformations in Spark/Distributed Compute
Don Hilborn
Seasoned Solutions Architect with over 20 years of Industry Experience. Focused on Unlocking Decades' of Experience in Enterprise Data Architecture - offering unparalleled ability to harness data to improve decisions.
Overview
In this blog I will provide examples of Spark and other distributed systems doing what are known as Narrow versus Wide Transformations or Dependencies. Dependencies better describe the activity but Transformations seems to have caught on more. We will explore how Narrow Transformations and how they can make a big impact on performance. I will give you examples of when Narrow Transformations should be used. I will also give you patterns to look for where you may be able to turn a Wide Transformation into a Narrow Transformation. To better understand these topics we will use RDDs (Resilient Distributed Datasets).?
Spark still uses RDDs as a fundamental abstraction for distributed data processing. However, since the introduction of DataFrames and Datasets, RDDs have become a lower-level API that is mainly used for low-level transformation and processing of unstructured data. However, I will use RDD in this blog because the concepts are made clearer at that level.
DataFrames and Datasets are higher-level abstractions built on top of RDDs that provide a more efficient and expressive API for working with structured and semi-structured data. DataFrames and Datasets provide a more SQL-like programming interface and can leverage Spark's Catalyst optimizer to perform advanced query optimization and code generation.
That being said, RDDs are still a fundamental building block of Spark, and they provide a flexible and resilient way to handle distributed data processing. RDDs are particularly useful for handling unstructured data or performing custom operations that are not supported by the higher-level abstractions.
There are several white papers and technical documents available on RDDs in Spark, but one of the best resources is the original paper that introduced RDDs, titled "Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing." The paper was authored by Matei Zaharia, Mosharaf Chowdhury, Tathagata Das, Ankur Dave, Justin Ma, Murphy McCauley, Michael J. Franklin, Scott Shenker, and Ion Stoica, and was published in the Proceedings of the 9th USENIX Conference on Networked Systems Design and Implementation (NSDI'12).
The paper provides a comprehensive overview of RDDs, including their design, implementation, and evaluation. It also includes examples of how to use RDDs in Spark, and discusses the advantages and limitations of the RDD abstraction. The paper has been widely cited and is considered a seminal work in the field of distributed computing.
If you are interested in learning more about RDDs in Spark, I highly recommend reading this paper. It provides a thorough and detailed explanation of the underlying concepts and algorithms, and is a great resource for anyone interested in distributed data processing with Spark.
In summary, while Spark has introduced higher-level abstractions like DataFrames and Datasets, RDDs remain an important part of the Spark ecosystem and are still used for low-level processing and handling unstructured data. I use RDDs in this blog because it helps understand the concept of Wide versus Narrow transformations.
Wide Vs Narrow Transformations in Spark
Actions
Actions in Spark are operations that trigger the computation of a result by executing one or more transformations on a DataFrame. Actions can be performed on a DataFrame to generate a result, such as counting the number of rows or aggregating the data.
Narrow transformations are operations that can be performed on a single partition of a DataFrame without needing to shuffle the data across multiple partitions. These types of transformations are often faster and more efficient than wide transformations because they can be executed in parallel on each partition of the data. Here are some examples of actions in Spark that can be done as a narrow transformation:
Note that the efficiency of these operations as narrow transformations may depend on the data distribution and the computation required for each row. If the computation required for each row is too complex, it may not be possible to perform the operation as a narrow transformation.
Transformations In Spark and Distributed Compute
Transformations are operations that transform a Spark DataFrame into a new DataFrame without altering the original data. Operations like select() and filter() are examples of transformations in Spark. These operations will return transformed results as a new DataFrame instead of changing the original DataFrame.
In Spark, a DataFrame is an immutable distributed collection of data that is organized into named columns. The data in a DataFrame is stored across multiple machines in a cluster, and Spark provides a set of operations to manipulate the data in the DataFrame.
Transformations are a type of operation that creates a new DataFrame by applying a function to each element of the original DataFrame without altering the original data. Transformations are executed lazily, which means that the new DataFrame is not actually computed until an action is performed.
Examples of transformations in Spark include select(), filter(), groupBy(), join(), and many others. When you apply a transformation to a DataFrame, you are creating a new DataFrame with a new set of columns or a new set of rows based on the operation you perform. The original DataFrame remains unchanged.
For example, if you have a DataFrame df with columns name, age, and gender, and you apply a select() transformation to select only the name and age columns: new_df = df.select(["name", "age"])
Scala
// Example data for demonstration purposes
val data = sc.parallelize(Seq(
??("sensor1", 1),
??("sensor2", 2),
??("sensor3", 3),
??("sensor4", 4),
??("sensor5", 5)
))
// Wide transformation: groupByKey
val groupedData = data.groupByKey()
// Narrow transformation: mapValues
val sumData = groupedData.mapValues(_.sum)
// Show the result
sumData.foreach(println)
In this example, we have a dataset of sensor readings and corresponding values. The wide transformation is groupByKey, which groups the data by the sensor, creating a key-value pair where the key is the sensor and the value is an iterable of all the corresponding values. This is a wide transformation because it shuffles the data across the cluster, which can be expensive for large datasets.
领英推荐
The narrow transformation is mapValues, which applies a function to each value in the iterable created by groupByKey. In this case, the function is _.sum, which computes the sum of all the values for each sensor. This is a narrow transformation because it does not require shuffling the data across the cluster. Finally, we print out the resulting dataset using foreach.
Python
# Create an RDD of key-value pairs
rdd = sc.parallelize([(1, 2), (3, 4), (5, 6)])
# Narrow transformation: mapValues() - only depends on one partition
narrow_rdd = rdd.mapValues(lambda x: x * 2)
print("Narrow transformation output:", narrow_rdd.collect())
# Wide transformation: reduceByKey() - depends on multiple partitions
wide_rdd = rdd.reduceByKey(lambda x, y: x + y)
print("Wide transformation output:", wide_rdd.collect())
In the example above, mapValues() is a narrow transformation because it only operates on one partition at a time, whereas reduceByKey() is a wide transformation because it requires shuffling data across multiple partitions.
The mapValues() transformation applies a function to the values of each key-value pair in the RDD, but only within the same partition, so it doesn't require shuffling the data across the network. The output is a new RDD with the same partitioning scheme as the original RDD.
The reduceByKey() transformation applies a function to the values of each key across all the partitions of the RDD, and combines the results to produce a new RDD with each unique key and its corresponding reduced value. This requires shuffling data across the network to ensure that all values for a given key end up on the same partition before being reduced.
SQL
-- Create a temporary view of a DataFrame with two columns: id and value
df.createOrReplaceTempView("my_table")
-- Narrow transformation: SELECT and WHERE - operates on a single partition
SELECT id, value * 2
FROM my_table
WHERE id < 3;
-- Wide transformation: GROUP BY and SUM - requires shuffling data across partitions
SELECT id, SUM(value)
FROM my_table
GROUP BY id;
In the example above, the first SQL query performs a narrow transformation because it applies a SELECT and WHERE clause that only depend on a single partition of the DataFrame. The output is a new DataFrame with the same partitioning scheme as the original DataFrame.
The second SQL query performs a wide transformation because it requires shuffling data across multiple partitions to group values by key. The output is a new DataFrame with a different partitioning scheme, where each partition contains the results for a different key.
Note that Spark SQL queries can be optimized and executed using either a narrow or wide transformation depending on the query plan and the underlying data distribution. It's important to understand the query execution plan and the characteristics of the data to choose the most efficient transformation.
Fault Tolerance Given Immutable Dataframes?
In Spark, "fault tolerance" refers to the ability of the system to continue working correctly in the event of failures, such as node failures, network failures, or disk failures. Fault tolerance is an important feature of distributed systems like Spark, where failures are common and can cause data loss and application downtime.
Immutable DataFrames are another important concept in Spark. In an immutable DataFrame, the data cannot be modified after it has been created. Instead, any operation on the DataFrame creates a new DataFrame that contains the results of the operation. This property is essential for fault tolerance in Spark because it allows the system to recover from failures by simply recreating the lost DataFrame from its lineage (i.e., the sequence of operations that created it).
By using immutable DataFrames, Spark can ensure that the data is consistent and recoverable, even in the event of failures. If a node fails during a computation, the system can simply recreate the lost DataFrame from its lineage on another node, without having to worry about data corruption or inconsistencies.
In Spark, transformations and actions are two types of operations that can be performed on distributed datasets called RDDs (Resilient Distributed Datasets). Lazy transformations are operations that do not execute immediately when called. Instead, they create a new RDD that represents the result of the transformation, without actually computing the transformation. Examples of lazy transformations include map(), filter(), and groupBy().
Lazy transformations are not executed until an eager action is called on the RDD. An eager action is an operation that triggers the computation of the RDD and returns a result to the driver program or writes the output to disk. Examples of eager actions include reduce(), count(), and saveAsTextFile().
The benefit of lazy transformations is that they allow Spark to optimize the execution plan and minimize the amount of data movement required for the computation. By deferring the execution of transformations until the last possible moment, Spark can combine multiple transformations into a single stage and reduce the number of shuffles required.
In contrast, eager actions force Spark to execute all the preceding transformations and compute the entire RDD before returning the result. This can be inefficient if you only need a small part of the data, or if you are performing exploratory data analysis and need to experiment with different transformations.
In summary, fault tolerance and immutable DataFrames are two key features of Spark that work together to provide a reliable and fault-tolerant distributed computing platform. Lazy transformations are operations that create a new RDD without actually computing the transformation, while eager actions trigger the computation of the RDD and return a result to the driver program. Using lazy transformations can help optimize the execution plan and minimize data movement, while eager actions force Spark to compute the entire RDD before returning a result.
Summary
The goal of this blog is to help you understand Wide versus Narrow Transformations and patterns to look for where you may be able to convert a Wide to a Narrow transformation. Actions in Spark are operations that trigger the computation of a result by executing one or more transformations on a DataFrame.
We reviewed? examples of Actions in Spark that can be done as a narrow transformation: count() - returns the number of rows in the DataFrame. This operation can be done as a narrow transformation because it only requires selecting the first n rows of each partition and concatenating the results. Examples of transformations in Spark include select(), filter(), groupBy(), join(), and many others. The original DataFrame remains unchanged.
Fault tolerance and immutable DataFrames are two key features of Spark that work together to provide a reliable and fault-tolerant distributed computing platform. Lazy transformations are operations that create a new RDD without actually computing the transformation, while Eager actions trigger the computation of the RDD and return a result to the driver program.?
Using lazy transformations can help optimize the execution plan and minimize data movement, while eager actions force Spark to compute the entire RDD before returning a result. Lazy transformations are operations that create a new RDD without actually computing the transformation, while eager actions trigger the computation of the RDD and return a result to the driver program.
Data Engineer at Huawei
10 个月This shows we should refactor our SQL scripts when convert to spark SQL. Thank you soo much :)
Data Engineer || IIT Delhi
1 年Hi Don Hilborn .... Quite a helpful post . Could you please tell me whether Spark adds an I/O operation when a Wide Transformation is executed?