Logical Data Warehouse With Azure Synapse Serverless SQL - Incremental Data Loading

Logical Data Warehouse With Azure Synapse Serverless SQL - Incremental Data Loading

I recently showed my colleagues a presentation in order to introduce Azure Synapse Servereless, including a couple of demos. One set of notions that triggered many question was intended to present different uses cases and scenarios using Synapse Serverless SQL Pool: what's logical data warehouse? Why not using the data warehouse we already know (using dedicated sql pool) ??Are all data warehousing patterns (slowly changing dimensions, incremental loading, history loading...) reproducible using serverless sql pool ? Therefore I thought it will be useful to share today, to a larger audience, the demo on how to load incremenetal data to logical data warehouse. Dig in and let me know in comments if you have any questions!

One of the multiple architectures we can design using Azure synapse Analytics as the compute and orchestration layer could be as follows :

Aucun texte alternatif pour cette image


ASSUMPTIONS

You are already familiar with Azure Synapse Serverless SQL Pool, Logical Data Warehouse concepts and you start playing with some syntaxes on Serverless SQL Pool like OPENROWSET, CETAS - CREATE EXTERNAL TABLE AS SELECT, CET - CREATE EXTERNAL TABLE.

CHALLENGES

As of today (2021-08-23) the only way to write data into the lake using synapse serverless sql pool is the famous syntax CETAS (CREATE EXTERNAL TABLE AS SELECT). However this has a downside: we cannot repeat the execution of CETAS on the same location to overwrite existing data. Once data is writen into a location we cannot modify it :(

I thought that dropping the external table will resolve the issue. Bad surprise! It was not the case because this last instruction is dropping just the table from the logical data warehouse while the underlying folder and files will not be deleted.

Second attempt turns out to be succesful and here is how I've done it! Rather than finding a way to delete the external table folder and files (1) I'll try to re-structure the main folder (the external table main location) on subfolders and each time data is writen using CETAS it goes to a new subfolder. (2) I'll arrange for versionning the subfolders to avoid serving the stale data to business users.

For the dimensions on the curated zone the pattern will look like this:

Aucun texte alternatif pour cette image

On the other hand for the facts side, I will partition them by business' YearMonth value (one of the source date columns) to be able to reload data for a specific month or year without reloading the whole table. Furthermore, inside each partition folder, I'll add the version subfolder:

Aucun texte alternatif pour cette image

CAN WE USE DELTA TABLE FOR THIS USE CASE?

As of today (2021-08-23) we cannot update Delta Lake tables using Azure Synase Serverless SQL Pool. Here is the link describing the limitations. So I'll use normal parquet files.

INITIAL SETUP

The script bellow will create:

  • A new database (to avoid unexpected conversion errors and boost queries performance it's recommended to use the current collation Latin1_General_100_BIN2_UTF8)
  • Database scoped credential (that uses the synapse managed identity to access to the external location)
  • 2 Schemas (stg,dw) for staging and the logical datawarehouse
  • 2 External data sources (pointing to the lake and an open storage account to load taxi yellow data)
  • 2 External files formats (csv, parquet)

-- Create database if not exist
USE [master]
GO


IF db_id('nyctaxidw') IS NULL?
EXEC('CREATE DATABASE nyctaxidw COLLATE Latin1_General_100_BIN2_UTF8')
GO


USE [nyctaxidw]
GO

-- Create master key
CREATE MASTER KEY ENCRYPTION BY PASSWORD = 'xxxxxxxxx'--
GO
---- Create databases scoped credential that use Managed Identity?
CREATE DATABASE SCOPED CREDENTIAL WorkspaceIdentity
WITH IDENTITY = 'Managed Identity'
GO

-- Create new schema (stg,dw)
IF NOT EXISTS ( SELECT? * FROM? ? sys.schemas? WHERE? ?name = N'stg' )?
? ? EXEC('CREATE SCHEMA [stg] AUTHORIZATION [dbo]');
GO


IF NOT EXISTS ( SELECT? * FROM? ? sys.schemas? WHERE? ?name = N'dw' )?
? ? EXEC('CREATE SCHEMA [dw] AUTHORIZATION [dbo]');
GO

-- Create external data source pointing to data lake
IF NOT EXISTS (SELECT * FROM sys.external_data_sources WHERE name = 'eds_nyctaxi')?
	CREATE EXTERNAL DATA SOURCE [eds_nyctaxi]?
	WITH (
		LOCATION? ?= 'https://synapselake.dfs.core.windows.net/data',
? ? ? ? CREDENTIAL =? WorkspaceIdentity??
	)
Go

-- Create exteral data source pointing to public azure storage
IF NOT EXISTS (SELECT * FROM sys.external_data_sources WHERE name = 'eds_openstorage_nyctaxi')?
	CREATE EXTERNAL DATA SOURCE [eds_openstorage_nyctaxi]?
	WITH (
		LOCATION? ?= 'https://azureopendatastorage.blob.core.windows.net/nyctlc',?
	)
Go

-- Create external csv file format?
IF NOT EXISTS (SELECT * FROM sys.external_file_formats WHERE name = 'eff_csv_nyctaxi')?
? ? CREATE EXTERNAL FILE FORMAT eff_csv_nyctaxi
? ? WITH (??
? ? ? ? FORMAT_TYPE = DELIMITEDTEXT,? ??
? ? ? ? FORMAT_OPTIONS ( FIELD_TERMINATOR = ',',?
? ? ? ? ? ? ? ? ? ? ? ? ?STRING_DELIMITER = '"',
? ? ? ? ? ? ? ? ? ? ? ? ?FIRST_ROW = 2,
? ? ? ? ? ? ? ? ? ? ? ? ?PARSER_VERSION = '2.0')
? ? );
GO

-- Create external parquet file format
IF NOT EXISTS (SELECT * FROM sys.external_file_formats WHERE name = 'eff_pqt_nyctaxi')?
	CREATE EXTERNAL FILE FORMAT [eff_pqt_nyctaxi]?
	WITH ( FORMAT_TYPE = PARQUET)
GO        

We will skip the standardized zone to keep this example simple.

LOADING DIMENSION TABLES

The dimension tables will be completely recomputed each time we trigger the loading workflow. So data on the landing zone for dimensions contains historical and newely added files. I'll start by creating the external tables bellow for the staging layer, on top of csv files saved on landing zone:

USE [nyctaxidw]
GO?

--==========================================================
--? ? ? ? ? ? ? ? ? TaxiRateCode
--==========================================================
IF NOT EXISTS ( SELECT * FROM sys.external_tables WHERE object_id = OBJECT_ID('stg.TaxiRateCode') )
CREATE EXTERNAL TABLE stg.TaxiRateCode
(
? ? RateCodeID INT,
? ? RateCode VARCHAR(100)
)??
WITH (
? ? LOCATION = 'landingzone/nyctaxi/referencial/taxiratecode/*.csv',
? ? DATA_SOURCE = eds_nyctaxi,??
? ? FILE_FORMAT = eff_csv_nyctaxi
)
GO

SELECT * FROM stg.TaxiRateCode

--==========================================================
--? ? ? ? ? ? ? ? ? TaxiPaymentType
--==========================================================
IF NOT EXISTS ( SELECT * FROM sys.external_tables WHERE object_id = OBJECT_ID('stg.TaxiPaymentType') )
CREATE EXTERNAL TABLE stg.TaxiPaymentType
(
? ? PaymentTypeID INT,
? ? PaymentType VARCHAR(100)
)??
WITH (
? ? LOCATION = 'landingzone/nyctaxi/referencial/taxipaymenttype/*.csv',
? ? DATA_SOURCE = eds_nyctaxi,??
? ? FILE_FORMAT = eff_csv_nyctaxi
)
GO

SELECT * FROM stg.TaxiPaymentType        

To create and initialize the dimension tables (with "dw" schema) on top of the curated zone, we need to use a combination of CETAS and CET respecting the following order:

  • Use CETAS to create an external table referencing the first version (Ver=0) with one dummy row data. If we inspect the lake after executing this instruction we will find a parquet file for each table on these locations [ curatedzone/nyctaxi/dimension/dimtaxiratecode/Ver=0, curatedzone/nyctaxi/dimension/dimtaxipaymenttype/Ver=0].
  • Delete the external table already created on the previous step (Note: data, files and folders will remain the same).
  • Create the final external table (representing the dimension) using "dw" schema and referencing these locations [ curatedzone/nyctaxi/dimension/dimtaxiratecode/Ver=*/*.parquet, curatedzone/nyctaxi/dimension/dimtaxipaymenttype/Ver=*/*.parquet].

USE [nyctaxidw]
GO?

--==========================================================
--? ? ? ? ? ? ? ? ? DimTaxiRateCode
--==========================================================

CREATE EXTERNAL TABLE DimTaxiRateCode#NA
WITH (
? ? LOCATION = 'curatedzone/nyctaxi/dimension/dimtaxiratecode/Ver=0',
? ? DATA_SOURCE = eds_nyctaxi,??
? ? FILE_FORMAT = eff_pqt_nyctaxi
)??
AS
SELECT CAST(-1 AS INT) AS RateCodeID,
? ? ? ?CAST('Not Defined' AS VARCHAR(100)) AS RateCodeLabel,
? ? ? ?CAST(0 AS INT) AS Ver
GO

DROP EXTERNAL TABLE DimTaxiRateCode#NA
GO

IF NOT EXISTS ( SELECT * FROM sys.external_tables WHERE object_id = OBJECT_ID('dw.DimTaxiRateCode') )
CREATE EXTERNAL TABLE dw.DimTaxiRateCode(
? ? ?RateCodeID INT,
? ? ?RateCodeLabel VARCHAR(100),
? ? ?Ver INT
)
WITH (
? ? LOCATION = 'curatedzone/nyctaxi/dimension/dimtaxiratecode/Ver=*/*.parquet',
? ? DATA_SOURCE = eds_nyctaxi,??
? ? FILE_FORMAT = eff_pqt_nyctaxi
)
GO?

SELECT TOP 100 * FROM dw.DimTaxiRateCode
GO

--==========================================================
--? ? ? ? ? ? ? ? ? DimTaxiPaymentType
--==========================================================

CREATE EXTERNAL TABLE DimTaxiPaymentType#NA
WITH (
? ? LOCATION = 'curatedzone/nyctaxi/dimension/dimtaxipaymenttype/Ver=0',
? ? DATA_SOURCE = eds_nyctaxi,??
? ? FILE_FORMAT = eff_pqt_nyctaxi
)??
AS
SELECT CAST(-1 AS INT) AS PaymentTypeID,
? ? ? ?CAST('Not Defined' AS VARCHAR(100)) AS PaymentTypeLabel,
? ? ? ?CAST(0 AS INT) AS Ver
GO

DROP EXTERNAL TABLE DimTaxiPaymentType#NA
GO

IF NOT EXISTS ( SELECT * FROM sys.external_tables WHERE object_id = OBJECT_ID('dw.DimTaxiPaymentType') )
CREATE EXTERNAL TABLE dw.DimTaxiPaymentType(
? ? ?PaymentTypeID INT,
? ? ?PaymentTypeLabel VARCHAR(100),
? ? ?Ver INT
)
WITH (
? ? LOCATION = 'curatedzone/nyctaxi/dimension/dimtaxipaymenttype/Ver=*/*.parquet',
? ? DATA_SOURCE = eds_nyctaxi,??
? ? FILE_FORMAT = eff_pqt_nyctaxi
)
GO?

SELECT TOP 100 * FROM dw.DimTaxiPaymentType
GO        

To load my dimensions I'll create for each one a stored procedure. The steps described bellow will be executed each time we call the stored procedure:

  • Generate new version by getting the last (MAX) existing one (using OPENROWSET , filepath() functions) and increment the value by 1.

DECLARE? @Version INT =
ISNULL(
? ? ? ?(SELECT
? ? ? ? ? ? MAX([result].filepath(1))
? ? ? ? FROM
? ? ? ? ? ? OPENROWSET(
? ? ? ? ? ? ? ? BULK 'curatedzone/nyctaxi/dimension/dimtaxiratecode/Ver=*/*.parquet',
? ? ? ? ? ? ? ? DATA_SOURCE='eds_nyctaxi',
? ? ? ? ? ? ? ? FORMAT='PARQUET'
? ? ? ? ? ? ) AS [result] )
,?
0) + 1?        

  • Create external table (using CETAS) referencing the new version (calculated in the previous step). The SELECT phase of the CETAS will extract data from the staging layer and apply some business rules. This external table will act like a temp table. Because I have to generate a new external table, depending on the most recent version, I will be using Dynamic SQL.

-- Store data to a new version using CETAS
SET @SQL =?
'CREATE EXTERNAL TABLE DimTaxiRateCode#v' + CAST(@Version AS VARCHAR(10)) +??
' WITH (
? ? LOCATION = ''curatedzone/nyctaxi/dimension/dimtaxiratecode/Ver=' + CAST(@Version as VARCHAR(10)) +''',
? ? DATA_SOURCE = eds_nyctaxi,??
? ? FILE_FORMAT = eff_pqt_nyctaxi
)??
AS
SELECT RateCodeID, MAX(RateCode) as RateCodeLabel,' +? CAST(@Version AS VARCHAR(10)) + ' AS Ver' +
' FROM stg.TaxiRateCode STG
GROUP BY RateCodeID
ORDER BY RateCodeID
'
EXEC(@SQL)        

  • Delete the previous created external table

-- Drop external table for current @version
SET @SQL = 'DROP EXTERNAL TABLE DimTaxiRateCode#v' + CAST(@Version AS VARCHAR(10))
IF EXISTS ( SELECT * FROM sys.external_tables WHERE object_id = OBJECT_ID('DimTaxiRateCode#v' + CAST(@Version as VARCHAR(10))))

EXEC(@SQL)        

The stored procedure for the first dimension will be:

USE [nyctaxidw]
GO?

ALTER PROCEDURE dw.LoadDimTaxiRateCode
AS?
BEGIN

DECLARE @SQL VARCHAR(4000)?

IF EXISTS (
? ? SELECT TOP 1 1 FROM dw.DimTaxiRateCode DW? LEFT JOIN stg.TaxiRateCode STG
? ? ON DW.RateCodeID = STG.RateCodeID
? ? WHERE DW.RateCodeID IS NULL
)
BEGIN

-- get existing version and generate new one by increment the value by 1
DECLARE? @Version INT =?
ISNULL(
? ? ? ?(SELECT
? ? ? ? ? ? MAX([result].filepath(1))
? ? ? ? FROM
? ? ? ? ? ? OPENROWSET(
? ? ? ? ? ? ? ? BULK 'curatedzone/nyctaxi/dimension/dimtaxiratecode/Ver=*/*.parquet',
? ? ? ? ? ? ? ? DATA_SOURCE='eds_nyctaxi',
? ? ? ? ? ? ? ? FORMAT='PARQUET'
? ? ? ? ? ? ) AS [result] )
,?
0) + 1
?
-- Store data to a new version using CETAS
SET @SQL =?
'CREATE EXTERNAL TABLE DimTaxiRateCode#v' + CAST(@Version AS VARCHAR(10)) +??
' WITH (
? ? LOCATION = ''curatedzone/nyctaxi/dimension/dimtaxiratecode/Ver=' + CAST(@Version as VARCHAR(10)) +''',
? ? DATA_SOURCE = eds_nyctaxi,??
? ? FILE_FORMAT = eff_pqt_nyctaxi
)??
AS
SELECT RateCodeID, MAX(RateCode) as RateCodeLabel,' +? CAST(@Version AS VARCHAR(10)) + ' AS Ver' +
' FROM stg.TaxiRateCode STG
GROUP BY RateCodeID
ORDER BY RateCodeID
'
EXEC(@SQL)

-- Drop external table for current @version
SET @SQL = 'DROP EXTERNAL TABLE DimTaxiRateCode#v' + CAST(@Version AS VARCHAR(10))
IF EXISTS ( SELECT * FROM sys.external_tables WHERE object_id = OBJECT_ID('DimTaxiRateCode#v' + CAST(@Version as VARCHAR(10))))
EXEC(@SQL)

END?
END

--EXEC?dw.LoadDimTaxiRateCode         

LOADING FACT TABLE

On the facts side, the staging external table will be referencing a data source pointing to an open storage account [https://azureopendatastorage.blob.core.windows.net/nyctlc] to extract nyc taxi data (using the parquet file format already created):

--==========================================================
--? ? ? ? ? ? ? ? ? TaxiYellow
--==========================================================
IF NOT EXISTS ( SELECT * FROM sys.external_tables WHERE object_id = OBJECT_ID('stg.TaxiYellow ') )
CREATE EXTERNAL TABLE stg.TaxiYellow (
	[vendorID] varchar(8000),
	[tpepPickupDateTime] datetime2(7),
	[tpepDropoffDateTime] datetime2(7),
	[passengerCount] int,
	[tripDistance] float,
	[puLocationId] varchar(8000),
	[doLocationId] varchar(8000),
	[startLon] float,
	[startLat] float,
	[endLon] float,
	[endLat] float,
	[rateCodeId] int,
	[storeAndFwdFlag] varchar(8000),
	[paymentType] varchar(8000),
	[fareAmount] float,
	[extra] float,
	[mtaTax] float,
	[improvementSurcharge] varchar(8000),
	[tipAmount] float,
	[tollsAmount] float,
	[totalAmount] float
	)
	WITH (
	LOCATION = 'yellow/puYear=*/puMonth=*/*.parquet',
	DATA_SOURCE = eds_openstorage_nyctaxi,
	FILE_FORMAT = eff_pqt_nyctaxi
	)
GO

SELECT TOP 100 * FROM stg.TaxiYellow
GO        

Now we can query the staging fact table in order to create and initialize the fact table partitions. I'll use a stored procedure with 2 parameters: year and month. Like that i can initialize all partitions by calling the stored procedure as many times as the number of the couple Year, Month.

Let's assume that we will execute the dw.InitFactTaxiYellow stored procedure (described bellow) with Year = 2021 and Month = 01:

  • The CETAS will create an external table pointing to the first version (Ver=0) for the partition 202101 with one dummy row data. If we inspect the lake after executing this instruction we will find a parquet file here [ curatedzone/nyctaxi/fact/taxiyellow/PickupYear=2021/PickupMonth=01/Ver=0].
  • After the first step we can delete the external table already created (Note: data, files and folders will remain the same).

USE?[nyctaxidw]
GO?

CREATE?PROCEDURE?dw.InitFactTaxiYellow?@Year?INT,?@Month?INT
AS?
BEGIN

DECLARE?@Version?INT?=?0
DECLARE?@SQL?VARCHAR(4000)

--?Init?Partition
SET?@SQL?=?
'CREATE?EXTERNAL?TABLE?TaxiYellow'+CAST(@Year?AS?VARCHAR(4))+CAST(@Month?AS?VARCHAR(2))+'#v'?+?CAST(@Version?AS?VARCHAR(10))?+?
'?WITH?(
????LOCATION?=?''curatedzone/nyctaxi/fact/taxiyellow/PickupYear='+CAST(@Year?AS?VARCHAR(4))+'/PickupMonth='+CAST(@Month?AS?VARCHAR(2))+'/Ver='?+?CAST(@Version?as?VARCHAR(10))?+''',
????DATA_SOURCE?=?eds_nyctaxi,??
????FILE_FORMAT?=?eff_pqt_nyctaxi
)??
AS
SELECT??
?????CAST(NULL?AS??varchar(8000))?AS?vendorID
????,CAST(NULL?AS?datetime2(7))?AS?PickupDateTime
????,CAST(NULL?AS?datetime2(7))?AS?DropoffDateTime
????,CAST(NULL?AS?int)?AS?PickupLocationId
????,CAST(NULL?AS?int)?AS?DropoffLocationId
????,CAST(NULL?AS?int)?AS?RateCodeId
????,CAST(NULL?AS?int)?AS?PaymentTypeId
????,CAST(NULL?AS?float)?AS?StartLon
????,CAST(NULL?AS?float)?AS?StartLat
????,CAST(NULL?AS?float)?AS?EndLon
????,CAST(NULL?AS?float)?AS?EndLat
????,CAST(NULL?AS??varchar(8000))?AS?StoreAndFwdFlag
????,CAST(NULL?AS?int)?AS?PassengerCount
????,CAST(NULL?AS?float)?AS?TripDistance
????,CAST(NULL?AS?float)?AS?FareAmount
????,CAST(NULL?AS?float)?AS?Extra
????,CAST(NULL?AS?float)?AS?MtaTax
????,CAST(NULL?AS?varchar(8000))?AS?ImprovementSurcharge
????,CAST(NULL?AS?float)?AS?TipAmount
????,CAST(NULL?AS?float)?AS?TollsAmount
????,CAST(NULL?AS?float)?AS?TotalAmount
????,CAST(0?AS?INT)?AS?PickupYear
????,CAST(0?AS?INT)?AS?PickupMonth
????,CAST(0?AS?INT)?AS?Ver
'
EXEC?(@SQL)

--?Drop?external?table?
SET?@SQL?=?'DROP?EXTERNAL?TABLE?TaxiYellow'+?CAST(@Year?AS?VARCHAR(4))+CAST(@Month?AS?VARCHAR(2))+'#v'?+?CAST(@Version?AS?VARCHAR(10))
IF?EXISTS?(?SELECT?*?FROM?sys.external_tables?WHERE?object_id?=?OBJECT_ID('TaxiYellow'+?CAST(@Year?AS?VARCHAR(4))+CAST(@Month?AS?VARCHAR(2))+'#v'?+?CAST(@Version?AS?VARCHAR(10))))
EXEC(@SQL)

END?

--EXEC?dw.InitFactTaxiYellow?2019,?1        

Once I have a directory with all partitions and a starting version [Ver=0] I go on and create the final fact external table inside the current location [curatedzone/nyctaxi/fact/taxiyellow/PickupYear=*/PickupMonth=*/Ver=*/*.parquet]:

--==========================================================
--??????????????????FactTaxiYellow
--==========================================================
CREATE?EXTERNAL?TABLE?dw.FactTaxiYellow?(
????[vendorID]?varchar(8000),
????[PickupDateTime]?datetime2(7),
????[DropoffDateTime]?datetime2(7),
????[PickupLocationId]?int,
????[DropoffLocationId]?int,
????[RateCodeId]?int,
????[PaymentTypeId]?int,
????[StartLon]?float,
????[StartLat]?float,
????[EndLon]?float,
????[EndLat]?float,
????[StoreAndFwdFlag]?varchar(8000),
????[PassengerCount]?int,
????[TripDistance]?float,
????[FareAmount]?float,
????[Extra]?float,
????[MtaTax]?float,
????[ImprovementSurcharge]?varchar(8000),
????[TipAmount]?float,
????[TollsAmount]?float,
????[TotalAmount]?float,
????[PickupYear]?INT,
????[PickupMonth]?INT,
????[Ver]?INT
????)
????WITH?(
????LOCATION?=?'curatedzone/nyctaxi/fact/taxiyellow/PickupYear=*/PickupMonth=*/Ver=*/*.parquet',
????DATA_SOURCE?=?[eds_nyctaxi],
????FILE_FORMAT?=?[eff_pqt_nyctaxi]
????)
GO        

Using the same technique explained for dimensions, we can design a stored procedure to incrementally load the fact table by year and month:

  • Generate a new version by getting the last (MAX) existing one (using OPENROWSET, filepath() function) and increment the value by 1.

--?Get?Verion?Number
SET?@SQL?=?
'
SELECT??@Version?=?
ISNULL(
???????(SELECT
????????????MAX([result].filepath(1))
????????FROM
????????????OPENROWSET(
????????????????BULK?''curatedzone/nyctaxi/fact/taxiyellow/PickupYear='+CAST(@Year?AS?VARCHAR(4))+'/PickupMonth='+CAST(@Month?AS?VARCHAR(2))+'/Ver=*/*.parquet'',
????????????????DATA_SOURCE=''eds_nyctaxi'',
????????????????FORMAT=''PARQUET''
????????????)?AS?[result]?)
,?
0)?+?1
'

exec?sp_executesql?@SQL,?
?????????????????N'@Version?int?output',?@Version?output;        

  • Create external table (using CETAS) pointing to the new version. The SELECT phase of the CETAS will extract data from the staging layer, do some joins and apply some business rules. This external table will act like a temp table.
  • Delete the previous created external table.

The final code for the stored procedure will be as follows:

USE?[nyctaxidw]
GO?

CREATE?PROCEDURE?dw.LoadFactTaxiYellow?@Year?INT,?@Month?INT
AS?
BEGIN

DECLARE?@SQL?NVARCHAR(4000)?
DECLARE??@Version?INT

--?Get?Verion?Number
SET?@SQL?=?
'
SELECT??@Version?=?
ISNULL(
???????(SELECT
????????????MAX([result].filepath(1))
????????FROM
????????????OPENROWSET(
????????????????BULK?''curatedzone/nyctaxi/fact/taxiyellow/PickupYear='+CAST(@Year?AS?VARCHAR(4))+'/PickupMonth='+CAST(@Month?AS?VARCHAR(2))+'/Ver=*/*.parquet'',
????????????????DATA_SOURCE=''eds_nyctaxi'',
????????????????FORMAT=''PARQUET''
????????????)?AS?[result]?)
,?
0)?+?1
'

exec?sp_executesql?@SQL,?
?????????????????N'@Version?int?output',?@Version?output;


--?Store?data?to?a?new?version?using?CETAS
SET?@SQL?=?
'CREATE?EXTERNAL?TABLE?TaxiYellow'+CAST(@Year?AS?VARCHAR(4))+CAST(@Month?AS?VARCHAR(2))+'#v'?+?CAST(@Version?AS?VARCHAR(10))?+??
'?WITH?(
????LOCATION?=?''curatedzone/nyctaxi/fact/taxiyellow/PickupYear='+CAST(@Year?AS?VARCHAR(4))+'/PickupMonth='+CAST(@Month?AS?VARCHAR(2))+'/Ver='?+?CAST(@Version?as?VARCHAR(10))?+''',
????DATA_SOURCE?=?eds_nyctaxi,??
????FILE_FORMAT?=?eff_pqt_nyctaxi
)??
AS
SELECT??
????[vendorID]
????,[tpepPickupDateTime]?AS?PickupDateTime
????,[tpepDropoffDateTime]?AS?DropoffDateTime
????,ISNULL(PickupZoneLookup.LocationID,-1)?AS?PickupLocationId
????,ISNULL(DropOffZoneLookup.LocationID,-1)?AS?DropoffLocationId
????,ISNULL(RateCode.RateCodeID,-1)?AS?RateCodeId
????,ISNULL(PaymentType.PaymentTypeID,?-1)?AS?PaymentTypeId
????,[startLon]?AS?StartLon
????,[startLat]?AS?StartLat
????,[endLon]?AS?EndLon
????,[endLat]?AS?EndLat
????,[storeAndFwdFlag]?AS?StoreAndFwdFlag
????,[passengerCount]?AS?PassengerCount
????,[tripDistance]?AS?TripDistance
????,[fareAmount]?AS?FareAmount
????,[extra]?AS?Extra
????,[mtaTax]?AS?MtaTax
????,[improvementSurcharge]?AS?ImprovementSurcharge
????,[tipAmount]?AS?TipAmount
????,[tollsAmount]?AS?TollsAmount
????,[totalAmount]?AS?TotalAmount
????,YEAR(tpepPickupDateTime)?AS?PickupYear
????,MONTH?(tpepPickupDateTime)?AS?PickupMonth,?'?+??CAST(@Version?AS?VARCHAR(10))?+?'?AS?Ver'?+
'?FROM?[stg].[TaxiYellow]?Yellow?
?LEFT?JOIN?stg.TaxiPaymentType??PaymentType?
????????ON?TRY_CAST(Yellow.paymentType?AS?INT)?=?PaymentType.PaymentTypeID
?LEFT?JOIN?stg.TaxiRateCode?RateCode?
????????ON?TRY_CAST(Yellow.rateCodeId?AS?INT)?=?RateCode.RateCodeID
?LEFT?JOIN?stg.TaxiZoneLookup?PickupZoneLookup?
????????ON?TRY_CAST(Yellow.puLocationId?AS?INT)?=?PickupZoneLookup.LocationID
?LEFT?JOIN?stg.TaxiZoneLookup?DropOffZoneLookup?
????????ON?TRY_CAST(Yellow.doLocationId?AS?INT)?=?DropOffZoneLookup.LocationID
WHERE?YEAR(tpepPickupDateTime)?=?'?+CAST(@Year?AS?VARCHAR(4))+'??AND?
MONTH?(tpepPickupDateTime)?=?'?+?FORMAT(@Month,'00')

EXEC(@SQL)

--?Drop?external?table?for?current?@version
SET?@SQL?=?'DROP?EXTERNAL?TABLE?TaxiYellow'+?CAST(@Year?AS?VARCHAR(4))+CAST(@Month?AS?VARCHAR(2))+'#v'?+?CAST(@Version?AS?VARCHAR(10))
IF?EXISTS?(?SELECT?*?FROM?sys.external_tables?WHERE?object_id?=?OBJECT_ID('TaxiYellow'+?CAST(@Year?AS?VARCHAR(4))+CAST(@Month?AS?VARCHAR(2))+'#v'?+?CAST(@Version?AS?VARCHAR(10))))
EXEC(@SQL)

END?

--EXEC?dw.LoadFactTaxiYellow?2020,?1        

CREATING SQL VIEWS

The logical datawarehouse will be availble for users through SQL views. Meanwhile the only thing that we need to be carefull when dealing with it, is to filter out stale data by enabling just the last versions of dimensions and facts. For example, the SQL view for the dimension TxiRateCode will be:

USE [nyctaxidw]
GO?

CREATE VIEW [dw].[TaxiRateCode]
	AS SELECT RateCodeID,RateCodeLabel FROM dw.DimTaxiRateCode?
	? ?WHERE Ver = 0 OR Ver = (
? ? ? ? SELECT MAX(Ver)?
		FROM dw.DimTaxiRateCode
	? ?)        

and the fact table:

USE [nyctaxidw]
GO?


CREATE VIEW [dw].[TaxiYellow]
	AS SELECT
	? ? ?[vendorID]
		,[PickupDateTime]
		,[DropoffDateTime]
		,[PickupLocationId]
		,[DropoffLocationId]
		,[RateCodeId]
		,[PaymentTypeId]
		,[StartLon]
		,[StartLat]
		,[EndLon]
		,[EndLat]
		,[StoreAndFwdFlag]
		,[PassengerCount]
		,[TripDistance]
		,[FareAmount]
		,[Extra]
		,[MtaTax]
		,[ImprovementSurcharge]
		,[TipAmount]
		,[TollsAmount]
		,[TotalAmount]
		,Fact.PickupYear
		,Fact.PickupMonth
	 FROM dw.FactTaxiYellow Fact inner join 
          ( SELECT PickupYear, PickupMonth, MAX(Ver) AS Ver
            FROM dw.FactTaxiYellow 
            GROUP BY PickupYear, PickupMonth  
           ) AS LastVer
           ON Fact.PickupYear = LastVer.PickupYear AND
              Fact.PickupMonth = LastVer.PickupMonth AND
              Fact.Ver = LastVer.Ver        

ORCHESTRATION

We can orchestrate the logical datawarehouse loading using the synapse integration pipelines:

  • Fact table initialisation (one shot operation, no need to schedule it).

Aucun texte alternatif pour cette image

  • Logical data warehouse loading (to be scheduled by day or month)

Aucun texte alternatif pour cette image

FINAL THOUGHTS

I think that using synapse serverless sql pool is a worthful decision with high business value and few maintainability efforts for the following use cases: data discovery and exploration; simple data transformation (like converting csv files to parquet and applying some business rules on the fly); creating views on top of files located in the lake (logical data warehouse for "read" operations).

?On the other hand, creating logical data warehouse (for read/write operations) and handling incremental load is not always straightforward, the lack of UPSERTING or MERGING data capabilities will drive to the creation of multiple data copies each time we want to overwrite existing files.??


Mahendra Reddy Chilakala

Data Engineer | Azure Data Factory | Azure Synapse | Azure Databricks | Apache Spark | Microsoft Fabric | Python | Power BI | 1 X Azure Certified

9 个月

Wahid Atoui, This is a very helpful article. How are you creating images like that?

回复
Dirk Vrancken

Senior Developer RMoni Digital Quality Management Solution

1 年

Question : shouldn't you use a Right Join iso left join in the script for the dimension tables : ALTER PROCEDURE dw.LoadDimTaxiRateCode AS? BEGIN DECLARE @SQL VARCHAR(4000)? IF EXISTS ( ? ? SELECT TOP 1 1 FROM dw.DimTaxiRateCode DW? LEFT JOIN stg.TaxiRateCode STG ? ? ON DW.RateCodeID = STG.RateCodeID ? ? WHERE DW.RateCodeID IS NULL )

回复
Krishnamohan Nadimpalli

Azure Cloud | Data Engineering| Data Science | Power BI

2 年

Hi Why do you have to use "CREATE MASTER KEY ENCRYPTION BY PASSWORD" statement and where do you use this PASSWORD?

回复
Parul Vernekar

Principal Software Architect at Premier healthcare alliance

2 年

Thanks for this detailed Article. I tried it and works well for larger loads.

Abdelbarre Chafik

Senior Data Engineer

3 年

Good Article as usual Wahid ??

要查看或添加评论,请登录

社区洞察

其他会员也浏览了