Automating Data Corrections with Snowflake and Azure
Introduction
Recently, I embarked on a journey to automate data corrections for the past 90 days, using Snowflake and Azure. In this blog post, I'd like to share my experience and the solution I implemented using Snowflake's Javascript Stored Procedures.
The Challenge
Our organization deals with vast amounts of data, and it's crucial to ensure the accuracy and consistency of this data. One particular challenge we faced was the need to correct data in our Snowflake data warehouse based on changes in an Azure storage account. We wanted to automate the process, ensuring that data in our Snowflake tables was up-to-date and aligned with data in Azure.
The Solution
To tackle this challenge, I devised a solution that involved Snowflake's powerful SQL capabilities and Azure's storage services. Here's how it worked:
领英推荐
CREATE OR REPLACE PROCEDURE ETL.STORED_PROC_90_DAY_DATA_CORRECTION()
RETURNS STRING
LANGUAGE JAVASCRIPT
EXECUTE AS OWNER
AS
$$
// Get Details of 90_DAY_CORRECTION for the table
var result = "";
var get_details_sql_query = `SELECT SOURCE_TABLE, AZURE_PATH,DESTINATION_TABLE, AZURE_90_DAY_CORRECTION_CONTAINER, AZURE_90_DAY_CORRECTION_BLOB,
PRIMARY_KEYS,CORRECTION_DATE_COLUMN FROM REFERENCE_TABLE
WHERE IS_90_DAY_CORRECTION = 1`;
var get_details_sql_statement = snowflake.createStatement({sqlText: get_details_sql_query});
/* Creates result set */
var result_scan = get_details_sql_statement.execute();
while (result_scan.next()) {
try {
// Variables from reference Table
var source_table = result_scan.getColumnValue(1);
var azure_90day_correction_file_path = result_scan.getColumnValue(2);
var destination_table = result_scan.getColumnValue(3);
var azure_90day_correction_container = result_scan.getColumnValue(4);
var azure_90day_correction_blob = result_scan.getColumnValue(5);
var primary_keys = result_scan.getColumnValue(6);
var correction_date_column = result_scan.getColumnValue(7);
// Split multiple primary keys into an array
var primaryKeysArray = primary_keys.split(',');
var primaryKeyConditions = primaryKeysArray.map(pk => `src.${pk} = tgt.${pk}`).join(' AND ');
// Variables for Azure
var azure_full_path = azure_90day_correction_file_path + azure_90day_correction_conatiner + "/" + azure_90day_correction_blob;
// Variables for Snowflake
var snowflake_table = destination_table;
var snowflake_stage_table = snowflake_table + "_stage";
// Variables for create SQLs
var create_table_sql = `create or replace Temporary TABLE ${snowflake_stage_table} as Select ${primary_keys} FROM ${snowflake_table} where 1=0;`;
var copy_from_azure_sql = `COPY INTO ${snowflake_stage_table} FROM '${azure_full_path}' FILE_FORMAT = ETL.FILEFORMAT_QUOTED_CSV ON_ERROR = CONTINUE;`;
var update_delete_flag_sql = `UPDATE ${snowflake_table} tgt
SET IS_ACTIVE = FALSE
WHERE NOT EXISTS (SELECT 1 FROM ${snowflake_stage_table} src WHERE ${primaryKeyConditions})
AND tgt.${correction_date_column} >= DATEADD('DAY', -90, CURRENT_DATE());`;
// Variables for create SQL Statements
var create_table_sql_statement = snowflake.createStatement({sqlText: create_table_sql});
var copy_from_azure_sql_statement = snowflake.createStatement({sqlText: copy_from_azure_sql});
var update_delete_flag_sql_statement = snowflake.createStatement({sqlText: update_delete_flag_sql});
// Executions
create_table_sql_statement_result = create_table_sql_statement.execute();
copy_from_azure_sql_statement_result = copy_from_azure_sql_statement.execute();
update_delete_flag_sql_result = update_delete_flag_sql_statement.execute();
// If everything was successful, add the result to the 'result' variable.
result += `values = ${create_table_sql} ${copy_from_azure_sql} ${update_delete_flag_sql}\n`;
} catch (err) {
// If an error occurs, log the error but continue the loop.
result += `Failed: ${err}\n`;
}
}
return result;
$$;
Results and Benefits
Implementing this solution brought several benefits to our organization: