Mastering Spark SQL Functions: A Comprehensive Guide
Krishna Yogi Kolluru
Data Science Architect | ML | GenAI | Speaker | ex-Microsoft | ex- Credit Suisse | IIT - NUS Alumni | AWS & Databricks Certified Data Engineer | T2 Skilled worker
Apache Spark SQL provides a rich set of functions to handle various data operations. This guide covers essential Spark SQL functions with code examples and explanations, making it easier to understand and apply them in your data processing tasks.
1. Aggregation Functions
Aggregation functions perform calculations on a set of values and return a single result. They are commonly used for summarizing data.
count()
Purpose: Counts the number of items in a group.
Example: Counting the number of orders processed in a day.
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("AggregationExample").getOrCreate()
data = [(1, "Order1"), (2, "Order2"), (3, "Order3")]
columns = ["id", "order_name"]
df = spark.createDataFrame(data, columns)
df_count = df.selectExpr("count(order_name) as total_orders")
df_count.show()
Sample Output:
+------------+
|total_orders|
+------------+
| 3|
+------------+
sum()
Purpose: Computes the sum of all values in a group.
Example: Calculating the total sales revenue for a given period.
data = [(1, 100), (2, 150), (3, 200)]
columns = ["id", "sales"]
df = spark.createDataFrame(data, columns)
total_sales = df.selectExpr("sum(sales) as total_sales")
total_sales.show()
Sample Output:
+-----------+
|total_sales|
+-----------+
| 450|
+-----------+
avg()
Purpose: Returns the average of all values in a group.
Example: Finding the average customer satisfaction score.
average_sales = df.selectExpr("avg(sales) as average_sales")
average_sales.show()
Sample Output:
+-------------+
|average_sales|
+-------------+
| 150.0|
+-------------+
max()
Purpose: Returns the maximum value in a group.
Example: Determining the highest sales in a month.
max_sales = df.selectExpr("max(sales) as max_sales")
max_sales.show()
Sample Output:
+---------+
|max_sales|
+---------+
| 200|
+---------+
min()
Purpose: Returns the minimum value in a group.
Example: Identifying the lowest temperature recorded in a city.
min_sales = df.selectExpr("min(sales) as min_sales")
min_sales.show()
Sample Output:
+---------+
|min_sales|
+---------+
| 100|
+---------+
2. Window Functions
Window functions operate over a specified range of data and return a value for each row within that range.
row_number()
Purpose: Assigns a unique number to each row within a window partition.
Example: Ranking employees based on their sales performance.
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
windowSpec = Window.orderBy("sales")
df.withColumn("row_num", row_number().over(windowSpec)).show()
Sample Output:
+---+-----+-------+
| id|sales|row_num|
+---+-----+-------+
| 1| 100| 1|
| 2| 150| 2|
| 3| 200| 3|
+---+-----+-------+
rank()
Purpose: Assigns a rank to each row within a partition of a result set.
Example: Ranking students in a class based on their exam scores.
from pyspark.sql.functions import rank
df.withColumn("rank", rank().over(windowSpec)).show()
Sample Output:
+---+-----+----+
| id|sales|rank|
+---+-----+----+
| 1| 100| 1|
| 2| 150| 2|
| 3| 200| 3|
+---+-----+----+
dense_rank()
Purpose: Similar to rank(), but with no gaps in ranking.
Example: Ranking players in a game where ties are possible.
from pyspark.sql.functions import dense_rank
df.withColumn("dense_rank", dense_rank().over(windowSpec)).show()
Sample Output:
+---+-----+----------+
| id|sales|dense_rank|
+---+-----+----------+
| 1| 100| 1|
| 2| 150| 2|
| 3| 200| 3|
+---+-----+----------+
lag()
Purpose: Returns the value from a previous row in the window.
Example: Comparing sales between the current and previous months.
from pyspark.sql.functions import lag
df.withColumn("previous_sales", lag("sales", 1).over(windowSpec)).show()
Sample Output:
+---+-----+-------------+
| id|sales|previous_sales|
+---+-----+-------------+
| 1| 100| null|
| 2| 150| 100|
| 3| 200| 150|
+---+-----+-------------+
lead()
Purpose: Returns the value from a subsequent row in the window.
Example: Forecasting the next quarter’s sales based on current data.
from pyspark.sql.functions import lead
df.withColumn("next_sales", lead("sales", 1).over(windowSpec)).show()
Sample Output:
+---+-----+----------+
| id|sales|next_sales|
+---+-----+----------+
| 1| 100| 150|
| 2| 150| 200|
| 3| 200| null|
+---+-----+----------+
3. String Functions
String functions are used to manipulate and analyze text data.
concat()
Purpose: Concatenates multiple strings into one.
Example: Merging first and last names into a full name.
from pyspark.sql.functions import concat, litdf_names = spark.createDataFrame([("John", "Doe"), ("Jane", "Smith")], ["first_name", "last_name"])
df_names.withColumn("full_name", concat(df_names.first_name, lit(" "), df_names.last_name)).show()
Sample Output:
+----------+---------+----------+
|first_name|last_name| full_name|
+----------+---------+----------+
| John| Doe| John Doe|
| Jane| Smith|Jane Smith|
+----------+---------+----------+
substring()
Purpose: Extracts a substring from a string.
Example: Extracting the area code from a phone number.
from pyspark.sql.functions import substring
df_names.withColumn("initials", substring(df_names.first_name, 1, 3)).show()
Sample Output:
+----------+---------+--------+
|first_name|last_name|initials|
+----------+---------+--------+
| John| Doe| Joh|
| Jane| Smith| Jan|
+----------+---------+--------+
trim()
Purpose: Removes leading and trailing spaces from a string.
Example: Cleaning up user input data by trimming unnecessary spaces.
from pyspark.sql.functions import trim
df_text = spark.createDataFrame([(" hello ",)], ["text"])
df_text.withColumn("trimmed_text", trim(df_text.text)).show()
Sample Output:
+-----------+------------+
| text|trimmed_text|
+-----------+------------+
| hello | hello|
+-----------+------------+
upper()
Purpose: Converts a string to uppercase.
Example: Standardizing email addresses to uppercase for consistency.
from pyspark.sql.functions import upper
df_names.withColumn("upper_name", upper(df_names.first_name)).show()
Sample Output:
+----------+---------+----------+
|first_name|last_name| upper_name|
+----------+---------+----------+
| John| Doe| JOHN|
| Jane| Smith| JANE|
+----------+---------+----------+
lower()
Purpose: Converts a string to lowercase.
Example: Normalizing product codes for uniformity.
from pyspark.sql.functions import lower
df_names.withColumn("lower_name", lower(df_names.last_name)).show()
Sample Output:
+----------+---------+----------+
|first_name|last_name| lower_name|
+----------+---------+----------+
| John| Doe| doe|
| Jane| Smith| smith|
+----------+---------+----------+
4. Array Functions
Array functions allow for operations on array-type data structures within Spark SQL.
array()
Purpose: Creates an array from the given elements.
Example: Grouping multiple product IDs into a single array for an order.
领英推荐
from pyspark.sql.functions import array
df_items = spark.createDataFrame([(1, "apple", "banana"), (2, "orange", "grape")], ["id", "fruit1", "fruit2"])
df_items.withColumn("fruits", array("fruit1", "fruit2")).show()
Sample Output:
+---+------+-------+-------------+
| id|fruit1|fruit2| fruits|
+---+------+-------+-------------+
| 1| apple|banana|[apple, banana]|
| 2|orange| grape| [orange, grape]|
+---+------+-------+-------------+
explode()
Purpose: Expands an array into a set of rows.
Example: Splitting a list of tags into individual rows for analysis.
from pyspark.sql.functions import explode
df_items.withColumn("fruit", explode("fruits")).select("id", "fruit").show()
Sample Output:
+---+------+
| id| fruit|
+---+------+
| 1| apple|
| 1|banana|
| 2|orange|
| 2| grape|
+---+------+
size()
Purpose: Returns the length of an array.
Example: Counting the number of items in a shopping cart.
from pyspark.sql.functions import size
df_items.withColumn("num_fruits", size("fruits")).show()
Sample Output:
+---+------+-------+-------------+----------+
| id|fruit1|fruit2| fruits|num_fruits|
+---+------+-------+-------------+----------+
| 1| apple|banana|[apple, banana]| 2|
| 2|orange| grape| [orange, grape]| 2|
+---+------+-------+-------------+----------+
array_contains()
Purpose: Checks if an array contains a specific element.
Example: Determining if a customer purchased a particular product.
from pyspark.sql.functions import array_contains
df_items.withColumn("contains_banana", array_contains("fruits", "banana")).show()
Sample Output:
+---+------+-------+-------------+---------------+
| id|fruit1|fruit2| fruits|contains_banana|
+---+------+-------+-------------+---------------+
| 1| apple|banana|[apple, banana]| true|
| 2|orange| grape| [orange, grape]| false|
+---+------+-------+-------------+---------------+
sort_array()
Purpose: Sorts the elements of an array.
Example: Sorting a list of grades in ascending order.
from pyspark.sql.functions import sort_array
df_grades = spark.createDataFrame([(1, [85, 92, 78]), (2, [76, 89, 95])], ["id", "grades"])
df_grades.withColumn("sorted_grades", sort_array("grades")).show()
Sample Output:
+---+------------+-------------+
| id| grades|sorted_grades|
+---+------------+-------------+
| 1|[85, 92, 78]| [78, 85, 92]|
| 2|[76, 89, 95]| [76, 89, 95]|
+---+------------+-------------+
5. Date and Time Functions
Date and time functions in Spark SQL are used to manipulate and extract information from date and time values.
current_date()
Purpose: Returns the current date.
Example: Adding a timestamp to user activity logs.
from pyspark.sql.functions import current_date
df_items.withColumn("current_date", current_date()).show()
Sample Output:
+---+------+-------+-------------+------------+
| id|fruit1|fruit2| fruits|current_date|
+---+------+-------+-------------+------------+
| 1| apple|banana|[apple, banana]| 2024-09-02|
| 2|orange| grape| [orange, grape]| 2024-09-02|
+---+------+-------+-------------+------------+
date_add()
Purpose: Adds a specified number of days to a date.
Example: Calculating the due date for an invoice.
from pyspark.sql.functions import date_add
df_dates = spark.createDataFrame([("2024-09-01",)], ["start_date"])
df_dates.withColumn("due_date", date_add("start_date", 10)).show()
Sample Output:
+----------+----------+
|start_date| due_date|
+----------+----------+
|2024-09-01|2024-09-11|
+----------+----------+
datediff()
Purpose: Returns the difference between two dates.
Example: Measuring the time taken to close a customer support ticket.
from pyspark.sql.functions import datediff
df_dates = spark.createDataFrame([("2024-09-01", "2024-09-05")], ["start_date", "end_date"])
df_dates.withColumn("date_diff", datediff("end_date", "start_date")).show()
Sample Output:
+----------+----------+---------+
|start_date| end_date|date_diff|
+----------+----------+---------+
|2024-09-01|2024-09-05| 4|
+----------+----------+---------+
year()
Purpose: Extracts the year from a date.
Example: Grouping sales data by year for trend analysis.
from pyspark.sql.functions import year
df_dates.withColumn("year", year("start_date")).show()
Sample Output:
+----------+----+
|start_date|year|
+----------+----+
|2024-09-01|2024|
+----------+----+
month()
Purpose: Extracts the month from a date.
Example: Analyzing monthly subscription data.
from pyspark.sql.functions import month
df_dates.withColumn("month", month("start_date")).show()
Sample Output:
+----------+-----+
|start_date|month|
+----------+-----+
|2024-09-01| 9|
+----------+-----+
6. Conditional Functions
Conditional functions allow for decision-making within your data transformations.
if()
Purpose: Returns a value if a condition is true; otherwise, returns another value.
Example: Applying discounts based on membership status.
from pyspark.sql.functions import expr
df = spark.createDataFrame([(1, "Gold", 100), (2, "Silver", 150), (3, "Bronze", 200)], ["id", "membership", "price"])
df.withColumn("discounted_price", expr("if(membership == 'Gold', price * 0.9, price)")).show()
Sample Output:
+---+----------+-----+---------------+
| id|membership|price|discounted_price|
+---+----------+-----+---------------+
| 1| Gold| 100| 90.0|
| 2| Silver| 150| 150.0|
| 3| Bronze| 200| 200.0|
+---+----------+-----+---------------+
when()
Purpose: Evaluates a list of conditions and returns one of multiple possible result expressions.
Example: Categorizing employees based on their performance scores.
from pyspark.sql.functions import when
df = spark.createDataFrame([(1, 95), (2, 85), (3, 75)], ["id", "score"])
df.withColumn("category", when(df.score >= 90, "High").when(df.score >= 80, "Medium").otherwise("Low")).show()
Sample Output:
+---+-----+--------+
| id|score|category|
+---+-----+--------+
| 1| 95| High|
| 2| 85| Medium|
| 3| 75| Low|
+---+-----+--------+
case()
Purpose: Similar to when(), but allows for more complex branching logic.
Example: Assigning grades to students based on their scores.
df.withColumn("grade", expr("""
CASE
WHEN score >= 90 THEN 'A'
WHEN score >= 80 THEN 'B'
WHEN score >= 70 THEN 'C'
ELSE 'D'
END
""")).show()
Sample Output:
+---+-----+-----+
| id|score|grade|
+---+-----+-----+
| id|score|grade|
+---+-----+-----+
| 1| 95| A|
| 2| 85| B|
| 3| 75| C|
+---+-----+-----+
7. Null Handling Functions
Null handling functions help manage null or missing data in your DataFrame.
isnull()
Purpose: Checks if a value is null.
Example: Identifying incomplete customer profiles.
df = spark.createDataFrame([(1, None), (2, "Complete"), (3, None)], ["id", "profile_status"])
df.withColumn("is_incomplete", df.profile_status.isNull()).show()
Sample Output:
+---+-------------+-------------+
| id|profile_status|is_incomplete|
+---+-------------+-------------+
| 1| null| true|
| 2| Complete| false|
| 3| null| true|
+---+-------------+-------------+
coalesce()
Purpose: Returns the first non-null value in a list.
Example: Filling in missing data with default values.
df = spark.createDataFrame([(1, None), (2, "Active"), (3, None)], ["id", "status"])
df.withColumn("status_final", coalesce(df.status, lit("Unknown"))).show()
Sample Output:
+---+-------+------------+
| id| status|status_final|
+---+-------+------------+
| 1| null| Unknown|
| 2| Active| Active|
| 3| null| Unknown|
+---+-------+------------+
na.fill()
Purpose: Replaces null values with a specified value.
Example: Substituting missing sales data with an average value.
df_sales = spark.createDataFrame([(1, 100), (2, None), (3, 150)], ["id", "sales"])
df_sales.na.fill({"sales": 125}).show()
Sample Output:
+---+-----+
| id|sales|
+---+-----+
| 1| 100|
| 2| 125|
| 3| 150|
+---+-----+
Conclusion
Spark SQL provides a rich set of functions to handle various data processing tasks, making it easier to manipulate, analyze, and transform data. This guide covered key functions with practical examples to help you better understand how to use them in your data pipelines.
By mastering these functions, you can unlock the full potential of Spark SQL and efficiently manage your data.