Mastering ETL Processes Using PySpark
Dr. Fatma Ben Mesmia Chaabouni
Assistant Professor in Computer Science @ CU Ulster University, Qatar |Ph.D. in CS | MSc_B.Sc. in CS| NLP-AI and Data Analytics- Blockchain researcher | MBA mentor| Tunisian AI Society Member
Today's tutorial uses the Titanic dataset to demonstrate ETL (Extract, Transform, Load) processes using PySpark. This guide will help you understand how to set up your environment, extract data from a CSV file, transform the data, and load the transformed data back into a file. Each step will be explained with a well-commented code.
Step 1: Environment Setup and SparkSession Creation
Install PySpark
First, you need to install PySpark if you haven't already:
pip install pyspark
Create a SparkSession
The SparkSession is the entry point to programming Spark with the Dataset and DataFrame API. We start by creating a SparkSession:
from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder.appName('Titanic ETL Process').getOrCreate()
Step 2: Data Extraction
Read Data from CSV
We will read the Titanic dataset from a CSV file. Ensure the dataset is in your working directory or provide the correct path to the file.
# Read Titanic dataset from CSV
df = spark.read.csv('titanic.csv', inferSchema=True, header=True)
# Show the schema to verify the data has been loaded correctly
df.printSchema()
# Show the first few rows of the data frame
df.show(5)
Step 3: Data Transformation
Selecting Columns
Since our dataset only contains PassengerId and Survived, we can skip this step.
Filtering Data
We will filter the data to include only passengers who survived.
# Filter passengers who survived
df_survived = df.filter(df['Survived'] == 1)
# Show the first few rows of the dataframe
df_survived.show(5)
Adding New Columns
We can add a new column to indicate whether the passenger survived or not in a more descriptive manner.
领英推荐
from pyspark.sql.functions import when
# Create a new column 'SurvivalStatus' based on the 'Survived' column
df = df.withColumn('SurvivalStatus', when(df['Survived'] == 1, 'Survived')
.otherwise('Not Survived'))
# Show the first few rows of the dataframe
df.show(5)
Renaming Columns
We can rename columns if needed. Here, we already renamed 'Survived' to 'SurvivalStatus' in the previous step, so we will skip this.
Grouping and Aggregating Data
We group the data by Survived and count the number of passengers in each group.
# Group by 'Survived' and count the number of passengers in each group
df_grouped = df.groupBy('Survived').count()
# Show the grouped data
df_grouped.show()
Step 4: Handling Missing Values
Since we have a limited dataset, we will assume there are no missing values to handle.
Step 5: Data Type Conversion
No specific data type conversion is needed for this limited dataset.
Step 6: Advanced Data Manipulations
Using SQL Queries
We can run SQL queries on the DataFrame by creating a temporary view.
# Create a temporary view
df.createOrReplaceTempView('titanic')
# Run an SQL query to select passengers who survived
sql_df = spark.sql('SELECT * FROM titanic WHERE Survived = 1')
# Show the result of the SQL query
sql_df.show(5)
Step 7: Data Loading
Finally, we load the transformed data back into a CSV file.
Writing to CSV
# Write the transformed data to a new CSV file
df.write.csv('titanic_transformed.csv', header=True)
Feel free to connect with me on LinkedIn for more insights into big data and PySpark!
Happy ETL-ing