Automating Data Corrections with Snowflake and Azure

Introduction

Recently, I embarked on a journey to automate data corrections for the past 90 days, using Snowflake and Azure. In this blog post, I'd like to share my experience and the solution I implemented using Snowflake's Javascript Stored Procedures.

The Challenge

Our organization deals with vast amounts of data, and it's crucial to ensure the accuracy and consistency of this data. One particular challenge we faced was the need to correct data in our Snowflake data warehouse based on changes in an Azure storage account. We wanted to automate the process, ensuring that data in our Snowflake tables was up-to-date and aligned with data in Azure.

The Solution

To tackle this challenge, I devised a solution that involved Snowflake's powerful SQL capabilities and Azure's storage services. Here's how it worked:

  1. Gathering Information: I started by gathering information about the tables and data that needed corrections. I identified source tables in Snowflake and the corresponding Azure storage path where the correction data was stored.
  2. Dynamic SQL: One of the key elements of my solution was the use of dynamic SQL. Since the number of primary key columns in the source tables varied, I used JavaScript to split the primary key column names into an array and generate dynamic SQL conditions for matching data.
  3. Data Transfer: I used Snowflake's COPY INTO statement to efficiently transfer data from Azure to Snowflake. This allowed me to keep the data in sync.
  4. Data Correction: The heart of the solution was an SQL statement that updated the IS_ACTIVE column in our Snowflake tables based on the presence of corresponding data in the staging table. If data was missing in the staging table, the row's IS_ACTIVE flag was set to FALSE.
  5. Automation: To make the process truly automated, I encapsulated the entire solution within a Snowflake stored procedure. This allowed me to schedule regular executions and ensure that our data remained accurate and up-to-date without manual intervention.

CREATE OR REPLACE PROCEDURE ETL.STORED_PROC_90_DAY_DATA_CORRECTION()
  RETURNS STRING
  LANGUAGE JAVASCRIPT
  EXECUTE AS OWNER
AS 
$$

// Get Details of 90_DAY_CORRECTION for the table
var result = "";

var get_details_sql_query = `SELECT SOURCE_TABLE, AZURE_PATH,DESTINATION_TABLE, AZURE_90_DAY_CORRECTION_CONTAINER, AZURE_90_DAY_CORRECTION_BLOB,
    PRIMARY_KEYS,CORRECTION_DATE_COLUMN FROM REFERENCE_TABLE 
    WHERE IS_90_DAY_CORRECTION = 1`;

var get_details_sql_statement = snowflake.createStatement({sqlText: get_details_sql_query});

/* Creates result set */
var result_scan = get_details_sql_statement.execute();

while (result_scan.next()) {
    try {
        // Variables from reference Table
        var source_table = result_scan.getColumnValue(1);
        var azure_90day_correction_file_path = result_scan.getColumnValue(2);
        var destination_table = result_scan.getColumnValue(3);
        var azure_90day_correction_container = result_scan.getColumnValue(4);
        var azure_90day_correction_blob = result_scan.getColumnValue(5);
        var primary_keys = result_scan.getColumnValue(6);
        var correction_date_column = result_scan.getColumnValue(7);

        // Split multiple primary keys into an array
        var primaryKeysArray = primary_keys.split(',');
        var primaryKeyConditions = primaryKeysArray.map(pk => `src.${pk} = tgt.${pk}`).join(' AND ');

        // Variables for Azure
        var azure_full_path = azure_90day_correction_file_path + azure_90day_correction_conatiner + "/" + azure_90day_correction_blob;

        // Variables for Snowflake
        var snowflake_table = destination_table;
        var snowflake_stage_table = snowflake_table + "_stage";

        // Variables for create SQLs
        var create_table_sql = `create or replace Temporary TABLE ${snowflake_stage_table} as Select ${primary_keys} FROM ${snowflake_table} where 1=0;`;
        var copy_from_azure_sql = `COPY INTO ${snowflake_stage_table} FROM '${azure_full_path}' FILE_FORMAT = ETL.FILEFORMAT_QUOTED_CSV ON_ERROR =  CONTINUE;`;
        var update_delete_flag_sql = `UPDATE ${snowflake_table} tgt
        SET IS_ACTIVE = FALSE
        WHERE NOT EXISTS (SELECT 1 FROM ${snowflake_stage_table} src WHERE ${primaryKeyConditions})
        AND tgt.${correction_date_column} >= DATEADD('DAY', -90, CURRENT_DATE());`;

        // Variables for create SQL Statements
        var create_table_sql_statement = snowflake.createStatement({sqlText: create_table_sql});
        var copy_from_azure_sql_statement = snowflake.createStatement({sqlText: copy_from_azure_sql});
        var update_delete_flag_sql_statement = snowflake.createStatement({sqlText: update_delete_flag_sql});

        // Executions
        create_table_sql_statement_result = create_table_sql_statement.execute();
        copy_from_azure_sql_statement_result = copy_from_azure_sql_statement.execute();
        update_delete_flag_sql_result = update_delete_flag_sql_statement.execute();

        // If everything was successful, add the result to the 'result' variable.
        result += `values = ${create_table_sql} ${copy_from_azure_sql} ${update_delete_flag_sql}\n`;
    } catch (err) {
        // If an error occurs, log the error but continue the loop.
        result += `Failed: ${err}\n`;
    }
}

return result;
$$;
        

Results and Benefits

Implementing this solution brought several benefits to our organization:

  • Efficiency: With automation in place, data corrections that used to take hours of manual work were now completed in minutes.
  • Data Integrity: Our data became more reliable and aligned with the source, reducing the risk of errors.
  • Time Savings: Our data team had more time to focus on data analysis and strategic tasks rather than manual corrections.

要查看或添加评论,请登录

Janardhan Reddy Kasireddy的更多文章

社区洞察

其他会员也浏览了