Spark Tidbits - Lesson 3

Spark Tidbits - Lesson 3

There are many implementations of Apache Spark within the three major cloud vendors. I will be covering Microsoft Fabric lakehouse, Apache Spark in Azure Synapse Analytics and Azure Databricks Service over the course of these articles.


Big data engineering has to do with folders and files. The two Microsoft products support a library called mssparkutils. Databricks has its own library called dbutils. Regardless of implementation, the ls command lists the contents of a given directory.


#
#  1 - list folder contents
#

dbutils.fs.ls("/databricks-datasets/lending-club-loan-stats")        


The Databricks service comes with a bunch of sample datasets. Today, we are going to work with the lending club loan data. The folder below contains a read me text file and one comma separated values file.

#
#  2 - Read in CSV file
#

# Read in file
raw = spark.read.csv("/rissug/bronze/lending_club/loan_stats_2018q2.csv", header=True)

# Show number of cols
cnt = len(raw.columns)
print(f"The loan dataset has {cnt} columns.")        

Output from executing code snippet using the cluster name clsworkspace01.

This dataset has too many columns. Therefore we want to pair down the dataset, refine existing columns and create new ones. The results of the processing will be saved as a table. We will explore how to use the Dataframes syntax today.


#
# 3 - create database named tidbits
#


# drop existing
stmt = "drop database if exists tidbits cascade"
spark.sql(stmt)

# drop existing
stmt = "create database tidbits"
spark.sql(stmt)        

The above statement re-creates a new hive database. Please see image below for results. Both Synapse and Databricks support the idea of hive databases. In the lakehouse architecture, a single hive database (Tables) is already paired with the storage (Files) once the object is created.


The select method or vertical filtering allows the developer to choose what fields to bring into the new dataframes variable named loans1.

#
#  4A - pick columns for use
#

# Choose subset of data
loans1 = raw.select(
  "loan_status", 
  "int_rate", 
  "revol_util", 
  "issue_d", 
  "earliest_cr_line", 
  "emp_length", 
  "verification_status", 
  "total_pymnt", 
  "loan_amnt", 
  "grade", 
  "annual_inc", 
  "dti", 
  "addr_state", 
  "term", 
  "home_ownership", 
  "purpose", 
  "application_type", 
  "delinq_2yrs", 
  "total_acc"
)        

The functions library needs to be imported into the spark notebook before we can use the methods. I suggest you use this website to lookup each function used below.

The code below refines the interest rate, revolving utilization and employee length of service fields. There are two very import methods to discuss. First, the withColumn method allows the developer to re-define or create a new column. Second, we use the cast method to covert between data types.


#
#  4B - refine existing columns
#

# Include functions
from pyspark.sql.functions import *

# Convert to number (int rate)
loans1 = loans1.withColumn('int_rate', regexp_replace('int_rate', '%', '').cast('float')) 

# Convert to number (revolving util)
loans1 = loans1.withColumn('revol_util', regexp_replace('revol_util', '%', '').cast('float')) 

# Use regex to clean up 
loans1 = loans1.withColumn('emp_length', trim(regexp_replace(loans1.emp_length, "([ ]*+[a-zA-Z].*)|(n/a)", "") ))
loans1 = loans1.withColumn('emp_length', trim(regexp_replace(loans1.emp_length, "< 1", "0") ))
loans1 = loans1.withColumn('emp_length', trim(regexp_replace(loans1.emp_length, "10\\+", "10") ).cast('float'))
        

The following code snippet create four new columns: bad loan, issue year, earliest year and credit length.


#
#  4C - create new columns
#

# Create bad loan flag
loans1 = loans1.withColumn("bad_loan", (~loans1.loan_status.isin(["Current", "Fully Paid"])).cast("string"))

# Convert to number (issue year)
loans1 = loans1.withColumn('issue_year',  substring(loans1.issue_d, 5, 4).cast('double') ) 

# Convert to number (earliest year)
loans1 = loans1.withColumn('earliest_year', substring(loans1.earliest_cr_line, 5, 4).cast('double'))

# Calculate len in yrs
loans1 = loans1.withColumn('credit_length_in_years', (loans1.issue_year - loans1.earliest_year))
        

The last step is to write loans originated in New England to a delta table. The filter (where) method can be used to slice the data horizontally.


#
#  4D - save as delta table
#

# just new england states
loans1 = loans1.filter("addr_state = 'CT' or addr_state = 'MA' or addr_state = 'ME' or addr_state = 'NH' or addr_state = 'RI' or addr_state = 'VT' ")

# create a new table
loans1.write.mode("overwrite").saveAsTable("tidbits.new_england_loans_01")        

The show tables spark SQL command allows us to list the tables in the tidbits database.

%sql
show tables in tidbits        

Once the dataset is stored as a table, we can start learning about our information using Spark SQL. For instance, how many loans has the lending club given out by State in New England?

%sql
select 
  addr_state, 
  count(*) as total 
from 
  tidbits.new_england_loans_01 
group by 
  addr_state        

Today, we talked about slicing the data both vertically and horizontally using the select and filter methods. There are many functions that you can use with the with column method to refine or create new columns. Next time, we will revisit this same example using Spark SQL.



要查看或添加评论,请登录

John Miner的更多文章

  • Why use Tally Tables in the Fabric Warehouse?

    Why use Tally Tables in the Fabric Warehouse?

    Technical Problem Did you know that Edgar F. Codd is considered the father of the relational model that is used by most…

  • Streaming Data with Azure Databricks

    Streaming Data with Azure Databricks

    Technical Problem The core functionality of Apache Spark has support for structured streaming using either a batch or a…

    1 条评论
  • Upcoming Fabric Webinars from Insight

    Upcoming Fabric Webinars from Insight

    Don't miss the opportunity to boost your data skills with Insight and Microsoft. This webinar series will help you…

  • How to develop solutions with Fabric Data Warehouse?

    How to develop solutions with Fabric Data Warehouse?

    Technology Details The SQL endpoint of the Fabric Data Warehouse allows programs to read from and write to tables. The…

  • Understanding file formats within the Fabric Lakehouse

    Understanding file formats within the Fabric Lakehouse

    I am looking forward to talking to the Cloud Data Driven user group on March 13th. You can find all the presentation…

    3 条评论
  • Engineering a Lakehouse with Azure Databricks with Spark Dataframes

    Engineering a Lakehouse with Azure Databricks with Spark Dataframes

    Problem Time does surely fly. I remember when Databricks was released to general availability in Azure in March 2018.

  • Create an Azure Databricks SQL Warehouse

    Create an Azure Databricks SQL Warehouse

    Problem Many companies are leveraging data lakes to manage both structured and unstructured data. However, not all…

    2 条评论
  • How to Load a Fabric Warehouse?

    How to Load a Fabric Warehouse?

    Technology The data warehouse in Microsoft Fabric was re-written to use One Lake storage. This means each and every…

  • My Year End Wrap Up for 2024

    My Year End Wrap Up for 2024

    Hi Folks, It has been a very busy year. At the start of this year I wanted to learn Fabric in depth.

    1 条评论
  • Virtualizing GCP data with Fabric Shortcuts

    Virtualizing GCP data with Fabric Shortcuts

    New Technology Before the invention of shortcuts in Microsoft Fabric, big data engineers had to create pipelines to…

社区洞察

其他会员也浏览了