Window and windowing functions using DataFrame API in Spark
Above Image Credits - SQL Academy

Window and windowing functions using DataFrame API in Spark

In this article we will clear the below concepts:

  1. Window Class, and
  2. Window Functions

Such as:

  • rank
  • dense_rank
  • row_number

We will be using DataFrame spark API to demonstrate this.

What is Window class?

The Window class in PySpark is used to define window specifications for window functions.

Specifications such as:

  1. partition by ( to group the data and to apply window functions on each partition )
  2. order by ( to sort the partition )
  3. rowsBetween ( to define window size )

To demonstrate this consider you a DataFrame orders_df with columns:

country | weeknum | numinvoices | totalquantity | invoicevalue

Sample Data

Window class Usage:

%python

from pyspark.sql import Window

myWindow = Window.partitionBy("country") \

.orderBy("weeknum") \

.rowsBetween(Window.unboundedPreceding, Window.currentRow)

Frame boundary and available options:

Frame boundary define the subsets of rows within the partition to which window function is applied. We have the following options:

  • Window.unboundedPreceding - refers to first row in the partition.
  • Window.unboundedFollowing - refers to the last row in the partition.
  • Window.currentRow- refers to the current row.

We have explained the window class. At this stage we are good to apply the window functions.

What is a Window Function ?

Window functions in Apache Spark operate on a group of rows (referred to as a window) and calculate a return value for each row based on the group of rows.

Common Window Functions

  • rank()
  • dense_rank()
  • row_number()

rank() - skips ranks in case of a tie. Example usage - Entrance Exam for a university.

dense_rank() - doesn't skip a rank. Example usage - To declare winners in an olympic.

row_number() - assigns a number to each row within a partition.

NOTE: Apart from the standard functions you can also apply other aggregate functions such as sum(), avg() on a window, based on the use case.

Usage of rank(), dense_rank() and row_number():

%python

from pyspark.sql import Window

myWindow = Window.partitionBy("country") \

.orderBy(desc("invoicevalue"))

from pyspark.sql.functions import row_number, rank, dense_rank, lag, lead, sum

// (from pyspark.sql.functions import *)

results_df = orders_df.withColumn("rank", rank().over(myWindow))

results_df.show()

// Similarly usage for dense_rank

results_df = orders_df.withColumn("dense rank", dense_rank().over(myWindow))

results_df.show()

// Similarly usage for row_number

results_df = orders_df.withColumn("row number", row_number().over(myWindow))

results_df.show()

Table showing difference between rank, dense rank and row number

We have 2 more important window functions: lead and lag.

Lead() - compares with the next row

Lag() - compares with the previous row

LAG() Usage:

%python
from pyspark.sql import Window

myWindow = Window.partitionBy("country") \
                       .orderBy("weeknum")

from pyspark.sql.functions import lag, lead

results_df = orders_df.withColumn("previous_week", lag("invoicevalue").over(myWindow))

final_df = results_df.withColumn("invoice_diff" , expr("invoicevalue - previous_week"))        

Hope you understood what is a window and how to use a window function over a window.



NOTE - This topic is very important in case you are planning to appear for a data engineering interview. So, understand it well.

All the best. : )

要查看或添加评论,请登录

Navneet Singh G.的更多文章

社区洞察

其他会员也浏览了