Spark Tidbits - Lesson 6

Spark Tidbits - Lesson 6

The spark.read and spark.write are the main functions to read and write files. One powerful feature of the read method is the ability to recursively read files from a given root directory. This functionality might be required for full loads that have audit history or incremental loads that keep growing daily.

Today, we are going to learn how to filter after reading so that only the most recent data is saved as a hive table. Both the dataframes and spark SQL syntaxes will be explored within this article.

The image below shows the Adventure Works Sales data files in pipe delimited format. We can see that our data factory pipeline in Microsoft Fabric has been running on various days that I teach. We want to read up all the files but filter for the newest date.



The PySpark code below should be familiar to the big data engineer. The focus should be on two areas: a recursive file lookup is being used and the file schema is being provided for a pipe delimited file that has no header.


#
#  1 - read in date info
#

# include library
from pyspark.sql.functions import *

# path + schema
var_path = 'Files/raw/saleslt/dim_date'
var_schema = 'DateKey int, FullDateAlternateKey date, DayNumberOfWeek int, EnglishDayNameOfWeek string, SpanishDayNameOfWeek string, FrenchDayNameOfWeek string, DayNumberOfMonth int, DayNumberOfYear int, WeekNumberOfYear int, EnglishMonthName string, SpanishMonthName string, FrenchMonthName string, MonthNumberOfYear int, CalendarQuarter int, CalendarYear int, CalendarSemester int, FiscalQuarter int, FiscalYear int, FiscalSemester int'

# create dataframe
df_raw = spark.read.format("csv") \
    .schema(var_schema) \
    .option("header", 'false') \
    .option("delimiter", '|') \
    .option("recursiveFileLookup", "true") \
    .load(var_path)
        

If we display the dataframes, we can see the rows that we are familiar with. However, we currently have duplicate rows. In fact, we have a total of 5 copies of the original row.


Three dataframes are shown below: df_raw - the raw file data without manipulation; df_date - the data with additional columns added; and filt_date - the data filtered by the most recent date.

It is important to know the time the data was read into the system as well as the source file of the information. The withColumn method allows the developer to define or redefine a column. The current_timestamp function records the load time and the input_file_name function returns the full path to the input file.

The split function can be used to parse out the full path into folder name and file name. It is important to note I am using a sortable date (YYYYMMDD) for the folder name.

#
#  2 - filter for recent files
#

# make copy
df_date = df_raw

# add load date
df_date = df_date.withColumn("_load_date", current_timestamp())

# add folder date
df_date = df_date.withColumn("_folder_path", input_file_name())

# add folder date
df_date = df_date.withColumn("_folder_date", split(input_file_name(), '/')[8])

# add file name
df_date = df_date.withColumn("_file_name", split(split(input_file_name(), '/')[9], '\?')[0])

# max date
max_date = df_date.select(max('_folder_date').alias('max_date')).collect()[0][0]

# just recent data
filt_date = df_date.filter(col("_folder_date") == lit(max_date))        

The select and filter functions are very important.


The first one is used to capture the maximum folder date. These are the rows that we want to save to the table. We want to use the alias function to name the output and the collect function with indexing to save the result as a string. The second one is used to create new dataframes for the most recent file. The col function returns an object given a name and the lit function tells spark this is a value. In short, you have to learn a-lot of methods to perform the same actions that can be easily described with Spark SQL.


The code compares record counts from the filtered and unfiltered dataframes.

# debugging
print(f"all files - {df_date.count()} \n")

# debugging
print(f"recent file - {filt_date.count()} \n")        

The image below shows the most recent data is 1/6 of the unfiltered size.


The image below shows the new system columns that we added: "_load_date", "_folder_path", "_folder_date", and "_file_name".

If we zoom into the path column, we can see it contains the blob version number as well as file size.



The last step is to save the results as a table.

#
# 3 - Save as table
#

# drop old
spark.sql("drop table if exists test_dataframes_dim_date")

# add new
filt_date.write.saveAsTable("test_dataframes_dim_date")
        

Lets retry this exercise by using Spark SQL to solve the same business problem.

#
#  4 - read in date into temp view
#

# convert df to view
df_raw.createOrReplaceTempView("tmp_dim_date")
        

The first step is to convert the raw dataframes into a temporary view. See above code for details. The second step is to create new columns, filter the dataset by max date and save the result as a table. Since Spark SQL supports common table expressions, we can do all this work in one cell with less typing of code.

%%sql

--
-- 5 - create table
--

-- drop old
drop table if exists test_sparksql_dim_date;

-- add new
create table test_sparksql_dim_date as
with cte1 AS
(
  select 
    *, 
    current_timestamp() as _load_date,
    input_file_name() as _folder_path,
    split_part(input_file_name(), '/', 9) as _folder_name,
    split_part(split_part(input_file_name(), '/', 10), '?', 1) as _file_name
  from 
    tmp_dim_date
),
cte2 AS
(
  select * 
  from cte1 as t
  where t._folder_name = (select max(_folder_name) from cte1)
)
select * from cte2        

I will leave it up to the read to confirm the final row count. What you might not know is that some methods like spark.sql return dataframes. All existing methods can be used to manipulate the results. The code below gets a list of tables in the Microsoft Fabric Tables section and filters it by names starting with "test_".


#
#  Show new tables
#

# get list
df = spark.sql('show tables;').filter("tableName like 'test_%' ")
display(df)
        


The screen shot below shows two tables produced using two different methods with the same results.

To recap, I really wanted to talk about the group by and having clauses; However, my Adventure Works datasets has an audit trail for full loads. Therefore, I thought it was worthwhile to explain how to recursively read in files and filter data by load date.

One might point out that there is a dropDuplicate method for dataframes. But this method might not work if changes are allowed to the non primary key values. For instance, I am a chocolate maker that produces a milk chocolate bar with id MB1. The recipe might be changing over time but we want just the most recent recipe.


要查看或添加评论,请登录

John Miner的更多文章

  • Why use Tally Tables in the Fabric Warehouse?

    Why use Tally Tables in the Fabric Warehouse?

    Technical Problem Did you know that Edgar F. Codd is considered the father of the relational model that is used by most…

  • Streaming Data with Azure Databricks

    Streaming Data with Azure Databricks

    Technical Problem The core functionality of Apache Spark has support for structured streaming using either a batch or a…

    1 条评论
  • Upcoming Fabric Webinars from Insight

    Upcoming Fabric Webinars from Insight

    Don't miss the opportunity to boost your data skills with Insight and Microsoft. This webinar series will help you…

  • How to develop solutions with Fabric Data Warehouse?

    How to develop solutions with Fabric Data Warehouse?

    Technology Details The SQL endpoint of the Fabric Data Warehouse allows programs to read from and write to tables. The…

  • Understanding file formats within the Fabric Lakehouse

    Understanding file formats within the Fabric Lakehouse

    I am looking forward to talking to the Cloud Data Driven user group on March 13th. You can find all the presentation…

    3 条评论
  • Engineering a Lakehouse with Azure Databricks with Spark Dataframes

    Engineering a Lakehouse with Azure Databricks with Spark Dataframes

    Problem Time does surely fly. I remember when Databricks was released to general availability in Azure in March 2018.

  • Create an Azure Databricks SQL Warehouse

    Create an Azure Databricks SQL Warehouse

    Problem Many companies are leveraging data lakes to manage both structured and unstructured data. However, not all…

    2 条评论
  • How to Load a Fabric Warehouse?

    How to Load a Fabric Warehouse?

    Technology The data warehouse in Microsoft Fabric was re-written to use One Lake storage. This means each and every…

  • My Year End Wrap Up for 2024

    My Year End Wrap Up for 2024

    Hi Folks, It has been a very busy year. At the start of this year I wanted to learn Fabric in depth.

    1 条评论
  • Virtualizing GCP data with Fabric Shortcuts

    Virtualizing GCP data with Fabric Shortcuts

    New Technology Before the invention of shortcuts in Microsoft Fabric, big data engineers had to create pipelines to…

社区洞察

其他会员也浏览了