A Practical PySpark Scenario
Charles Woodruff
Freelance ? Data Engineer ? Site Reliability Engineer/SRE ? AWS Solutions Architect Professional ? AWS Security ? Cloud Engineer ? Certified Kubernetes Administrator
For the past several weeks, I have been studying PySpark and several other data related tools, using live classes, Udemy, YouTube, and official documentation. The best way for me to deepen my understanding is to put the knowledge to use.
So let's look at a real scenario, and see how PySpark can be used to solve it.
Scenario
You were given the following nested JSON dataset and tasked to find the 'total revenue per order' and the 'payment method for each order'. An single entry in this dataset looks like this.
{
"order_id": 1,
"customer_id": 101,
"items": [
{"product_id": 201, "quantity": 2, "price": 10.0},
{"product_id": 202, "quantity": 1, "price": 20.0}
],
"payment": {
"method": "credit_card",
"amount": 30.0
}
Problem Breakdown
The goal is to sum the price of all products made in this order and identify the payment method.
The first thing to do when using PySpark is to initialize a Spark session by importing the appropriate library, which in this case is pyspark.sql.
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, col
Next, initialize the Spark session
spark = SparkSession.builder.appName("CaptureOrderData").getOrCreate()
Assign the above dataset to a variable ('data').
data = [
{
"order_id": 1,
"customer_id": 101,
"items": [
{"product_id": 201, "quantity": 2, "price": 10.0},
{"product_id": 202, "quantity": 1, "price": 20.0}
],
"payment": {
"method": "credit_card",
"amount": 30.0
}
}
]
Create a dataframe (structured data similar to an excel spreadsheet or an SQL table).
df = spark.createDataFrame(data)
This is how the dataframe looks.
Notice the "items" column contains two entries - one for product_id 201 and another for product_id 202 - as a single entry. These need to be divided into 2 separate rows which is done using the explode() method.
领英推荐
df_updated = df.withColumn("item", explode(col("items")))
Each product now has its own row.
A closer look at the table shows several columns still need to be cleaned.
Look at the 'items' column.
It contains quantity, price, and product_id information. These need to be placed in individual columns.
Let's create a new dataframe, and select the columns needed.
df_results = df_updated.select(
col("order_id"),
col("customer_id"),
col("item.product_id"),
col("item.quantity"),
col("item.price"),
col("payment.method").alias("payment_method"),
col("payment.amount").alias("payment_amount")
)
The new dataframe is below.
Now for the final step.
The goal was to find the total number of items in the order and the final cost.
This is done through grouping and aggregation.
revenue_df = df_final.groupBy("order_id", "payment_method").agg(
spark_sum("quantity").alias("total_quantity"),
spark_sum("payment_amount").alias("total_payment")
The completed dataframe provides the final solution.
The answer is the order was $60 paid with a credit card.
Multi-Cloud DevOps Engineer & AWS Community Builder| AWS | Azure | Linux | Python | Containers-Docker&Kubernetes| Terraform
1 个月I hope to get more into data in the future. You’re a beast Charles!