Spark Tidbits - Lesson 3
There are many implementations of Apache Spark within the three major cloud vendors. I will be covering Microsoft Fabric lakehouse, Apache Spark in Azure Synapse Analytics and Azure Databricks Service over the course of these articles.
Big data engineering has to do with folders and files. The two Microsoft products support a library called mssparkutils. Databricks has its own library called dbutils. Regardless of implementation, the ls command lists the contents of a given directory.
#
# 1 - list folder contents
#
dbutils.fs.ls("/databricks-datasets/lending-club-loan-stats")
The Databricks service comes with a bunch of sample datasets. Today, we are going to work with the lending club loan data. The folder below contains a read me text file and one comma separated values file.
#
# 2 - Read in CSV file
#
# Read in file
raw = spark.read.csv("/rissug/bronze/lending_club/loan_stats_2018q2.csv", header=True)
# Show number of cols
cnt = len(raw.columns)
print(f"The loan dataset has {cnt} columns.")
Output from executing code snippet using the cluster name clsworkspace01.
This dataset has too many columns. Therefore we want to pair down the dataset, refine existing columns and create new ones. The results of the processing will be saved as a table. We will explore how to use the Dataframes syntax today.
#
# 3 - create database named tidbits
#
# drop existing
stmt = "drop database if exists tidbits cascade"
spark.sql(stmt)
# drop existing
stmt = "create database tidbits"
spark.sql(stmt)
The above statement re-creates a new hive database. Please see image below for results. Both Synapse and Databricks support the idea of hive databases. In the lakehouse architecture, a single hive database (Tables) is already paired with the storage (Files) once the object is created.
The select method or vertical filtering allows the developer to choose what fields to bring into the new dataframes variable named loans1.
领英推荐
#
# 4A - pick columns for use
#
# Choose subset of data
loans1 = raw.select(
"loan_status",
"int_rate",
"revol_util",
"issue_d",
"earliest_cr_line",
"emp_length",
"verification_status",
"total_pymnt",
"loan_amnt",
"grade",
"annual_inc",
"dti",
"addr_state",
"term",
"home_ownership",
"purpose",
"application_type",
"delinq_2yrs",
"total_acc"
)
The functions library needs to be imported into the spark notebook before we can use the methods. I suggest you use this website to lookup each function used below.
The code below refines the interest rate, revolving utilization and employee length of service fields. There are two very import methods to discuss. First, the withColumn method allows the developer to re-define or create a new column. Second, we use the cast method to covert between data types.
#
# 4B - refine existing columns
#
# Include functions
from pyspark.sql.functions import *
# Convert to number (int rate)
loans1 = loans1.withColumn('int_rate', regexp_replace('int_rate', '%', '').cast('float'))
# Convert to number (revolving util)
loans1 = loans1.withColumn('revol_util', regexp_replace('revol_util', '%', '').cast('float'))
# Use regex to clean up
loans1 = loans1.withColumn('emp_length', trim(regexp_replace(loans1.emp_length, "([ ]*+[a-zA-Z].*)|(n/a)", "") ))
loans1 = loans1.withColumn('emp_length', trim(regexp_replace(loans1.emp_length, "< 1", "0") ))
loans1 = loans1.withColumn('emp_length', trim(regexp_replace(loans1.emp_length, "10\\+", "10") ).cast('float'))
The following code snippet create four new columns: bad loan, issue year, earliest year and credit length.
#
# 4C - create new columns
#
# Create bad loan flag
loans1 = loans1.withColumn("bad_loan", (~loans1.loan_status.isin(["Current", "Fully Paid"])).cast("string"))
# Convert to number (issue year)
loans1 = loans1.withColumn('issue_year', substring(loans1.issue_d, 5, 4).cast('double') )
# Convert to number (earliest year)
loans1 = loans1.withColumn('earliest_year', substring(loans1.earliest_cr_line, 5, 4).cast('double'))
# Calculate len in yrs
loans1 = loans1.withColumn('credit_length_in_years', (loans1.issue_year - loans1.earliest_year))
The last step is to write loans originated in New England to a delta table. The filter (where) method can be used to slice the data horizontally.
#
# 4D - save as delta table
#
# just new england states
loans1 = loans1.filter("addr_state = 'CT' or addr_state = 'MA' or addr_state = 'ME' or addr_state = 'NH' or addr_state = 'RI' or addr_state = 'VT' ")
# create a new table
loans1.write.mode("overwrite").saveAsTable("tidbits.new_england_loans_01")
The show tables spark SQL command allows us to list the tables in the tidbits database.
%sql
show tables in tidbits
Once the dataset is stored as a table, we can start learning about our information using Spark SQL. For instance, how many loans has the lending club given out by State in New England?
%sql
select
addr_state,
count(*) as total
from
tidbits.new_england_loans_01
group by
addr_state
Today, we talked about slicing the data both vertically and horizontally using the select and filter methods. There are many functions that you can use with the with column method to refine or create new columns. Next time, we will revisit this same example using Spark SQL.