How to migrate stored procedure to Snowflake cloud data warehouse using python

How to migrate stored procedure to Snowflake cloud data warehouse using python

Stored procedures are commonly used to encapsulate logic for data transformation, data validation, and business-specific logic. By combining multiple SQL steps into a stored procedure, you can reduce round trips between your applications and the database.

A stored procedure may contain one or many statements and even call additional stored procedures, passing parameters through as needed.

Snowflake supports JavaScript based stored procedures. In this blog, we are going to display systematic procedure:

Pre-requisites:

·        Snowflake connector for python

·        Python with version 2.7.x and above

Why Python:

·        The Snowflake Connector for Python provides an interface for developing Python applications that can connect to Snowflake and perform all standard operations

·        The connector is a native, pure Python package that has no dependencies on JDBC or ODBC

o  Connection objects for connecting to Snowflake.

o  Cursor objects for executing DDL/DML statements and queries.

For this below, I have taken below sample “Red-shift” stored procedure

CREATE or replace PROCEDURE employee_temp (cond_param IN int, tmp_table_name INOUT varchar(256)) as $$

DECLARE

  row record;

BEGIN

  EXECUTE 'drop table if exists ' || tmp_table_name;

  EXECUTE 'create temp table ' || tmp_table_name || ' as select * from employee where salary >= ' || cond_param;

END;

$$ LANGUAGE plpgsql;

STEP I:

Above red shift, stored procedure has two SQL statement and it has been called without any condition.

To make it more easy, We are going to split the above stored procedure into two parts:

·        Conditional expression

·        SQL statements

Above sample-stored procedure, we do not have any conditional expression to call SQL statements, but we have two SQL statements.

STEP II:

To avoid syntax issue on snowflake environment, we are going to run and check compatibility of SQL, so this will avoid syntax issue/object existing issue while executing stored procedure.

STEP III:

So far we have completed 50 percentage of work, above sample stored procedure will have native connectivity to redshift database, In order to execute in snowflake we are going to use “connection” parameter to connect snowflake database.

Snowflake connection parameter

STEP IV:

Now we have connected snowflake-using python successfully, above sample redshift stored procedure has,

·        Two input parameters

  cond_param

tmp_table_name

·        One output parameter

o  tmp_table_name

Let us create a procedure in python for above sample red-shift stored procedure.

def employee_temp_fn(self):
    """
        Function: sel_result_set
            I.  Drop a temp table
            II. Recreate temp table with coditional parameter

    """
    drop_obj = "drop table if exists " + self.tmp_table_name
    temp_obj = "create temp table " + self.tmp_table_name + " as select * from employee where salary >= " + self.cond_param

    # ---- try Clause for python cursor
    try:

        self.sfconnection.cursor().execute(drop_obj)
        self.logger.info("Object dropped" + self.tmp_name)
        self.sfconnection.cursor().execute(temp_obj)
        self.logger.info("Object created" + self.tmp_name)

    # ---- Exception Cluase
    except Exception as e:
        self.logger.info(e)

    finally:
        self.logger.info("Closed: Snowflake Connection")
        self.sfconnection.close()

STEP V:

import os
import sys
import logging
import snowflake.connector as sf


class redshiftTosnf:

    def __init__(self, cond_param, tmp_table_name):
        """
            Constructor
        """

        # ---- Log configuration
        logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(funcName)s:%(lineno)d -%(message)s')
        self.logger = logging.getLogger(__name__)
        self.logger.setLevel(logging.INFO)

        # ---- Input parameter : sel_result_set
        self.cond_param     = cond_param
        self.tmp_table_name = tmp_table_name

        # ---- Snowflake Connection initialize
        self.sfconnection = sf.connect(
            user        ='user_name',
            account     ='account_name',
            warehouse   ='warehouse_name',
            database    ='database_name',
            schema      ='schema_name',
            role        ='role'
        )

    def employee_temp_fn(self):
        """
            Function: sel_result_set
                I.  Drop a temp table
                II. Recreate temp table with coditional parameter

        """
        drop_obj = "drop table if exists " + self.tmp_table_name
        temp_obj = "create temp table " + self.tmp_table_name + " as select * from employee where salary >= " + self.cond_param

        # ---- try Clause for python cursor
        try:

            self.sfconnection.cursor().execute(drop_obj)
            self.logger.info("Object dropped" + self.tmp_name)
            self.sfconnection.cursor().execute(temp_obj)
            self.logger.info("Object created" + self.tmp_name)

        # ---- Exception Cluase
        except Exception as e:
            self.logger.info(e)

        finally:
            self.logger.info("Closed: Snowflake Connection")
            self.sfconnection.close()


if __name__ == "__main__":
    # ---- Class object initialize
    clsObj = redshiftTosnf(20000, 'employee_temp')

    # ---- Function Call using class Object

    clsObj.employee_temp_fn()
    clsObj.employee_temp_fn()





It is time to execute the function by passing values to two input paramet
ers. 

Hope this article helpful!!

Note:

If you have many stored procedure in existing data warehouse, above process can be automated.

References:

https://docs.aws.amazon.com/redshift/latest/dg/stored-procedure-create.html

https://docs.snowflake.net/manuals/user-guide/python-connector-install.html


要查看或添加评论,请登录

SRIGANESH PALANI的更多文章

社区洞察

其他会员也浏览了