What is a DAG?
A DAG, or Directed Acyclic Graph, is a conceptual representation of the series of computations performed by Spark. In Spark, DAGs are used to represent both the sequence of transformations applied to the data and the execution plan to be carried out across the distributed data processing framework.
Key Components of a DAG
- Vertices: Represent RDDs or DataFrames resulting from transformations.
- Edges: Represent the operations (transformations or actions) applied to the data.
How DAG Works in PySpark
- Transformation and Action: When you define transformations (e.g., map, filter, groupBy) on RDDs or DataFrames, Spark does not execute them immediately. Instead, it builds a logical execution plan in the form of a DAG.
- Lazy Evaluation: Spark evaluates the DAG lazily. This means transformations are not executed until an action (e.g., collect, count, saveAsTextFile) is called. This allows Spark to optimize the execution plan.
- Job and Stages: When an action is called, Spark’s scheduler divides the DAG into a series of stages. Each stage consists of tasks that can be executed in parallel. Stages are separated by shuffle operations (data redistributions across nodes).
Example of DAG in PySpark
Let's consider an example to illustrate how a DAG is built and executed in PySpark.
from pyspark.sql import SparkSession
# Initialize SparkSession
spark = SparkSession.builder.appName("DAG Example").getOrCreate()
# Create a DataFrame
data = [("Alice", 1), ("Bob", 2), ("Cathy", 3), ("David", 4)]
df = spark.createDataFrame(data, ["name", "value"])
# Define transformations
df_filtered = df.filter(df["value"] > 1)
df_squared = df_filtered.withColumn("squared", df_filtered["value"] * df_filtered["value"])
# Define an action
result = df_squared.collect()
print(result)
领英推è
DAG Construction in the Example
- Initial DataFrame (df): This is the initial RDD.
- Filter Transformation (df_filtered): A transformation that filters rows where value > 1.
- Map Transformation (df_squared): A transformation that adds a new column squared.
When you call collect(), Spark builds a DAG representing these transformations. The DAG for the above example might look like this:
Initial DataFrame
|
filter (value > 1)
|
withColumn (squared)
|
collect
Execution Plan
- Stage 1: Apply the filter transformation.
- Stage 2: Apply the withColumn transformation.
- Stage 3: Execute the collect action.
Each stage consists of tasks that are distributed and executed across the cluster nodes.
Benefits of DAG in PySpark
- Optimization: Spark can optimize the execution plan by understanding the entire sequence of transformations. It can, for instance, minimize data shuffling or combine transformations.
- Fault Tolerance: Since the DAG represents the logical execution plan, Spark can recompute lost data using lineage information if a node fails.
- Parallel Execution: The DAG allows Spark to execute tasks in parallel, improving performance and resource utilization.