How to migrate stored procedure to Snowflake cloud data warehouse using python
SRIGANESH PALANI
Snowflake | Snowflake certified Associate | AWS | Python| Selenium | Streamlit | Neo4J | Mongo DB | GenAI | Javascript | DBT ll Tech Lead @ Tech Mahindra(Opinions are my own and not the views of my employer)
Stored procedures are commonly used to encapsulate logic for data transformation, data validation, and business-specific logic. By combining multiple SQL steps into a stored procedure, you can reduce round trips between your applications and the database.
A stored procedure may contain one or many statements and even call additional stored procedures, passing parameters through as needed.
Snowflake supports JavaScript based stored procedures. In this blog, we are going to display systematic procedure:
Pre-requisites:
· Snowflake connector for python
· Python with version 2.7.x and above
Why Python:
· The Snowflake Connector for Python provides an interface for developing Python applications that can connect to Snowflake and perform all standard operations
· The connector is a native, pure Python package that has no dependencies on JDBC or ODBC
o Connection objects for connecting to Snowflake.
o Cursor objects for executing DDL/DML statements and queries.
For this below, I have taken below sample “Red-shift” stored procedure
CREATE or replace PROCEDURE employee_temp (cond_param IN int, tmp_table_name INOUT varchar(256)) as $$
DECLARE
row record;
BEGIN
EXECUTE 'drop table if exists ' || tmp_table_name;
EXECUTE 'create temp table ' || tmp_table_name || ' as select * from employee where salary >= ' || cond_param;
END;
$$ LANGUAGE plpgsql;
STEP I:
Above red shift, stored procedure has two SQL statement and it has been called without any condition.
To make it more easy, We are going to split the above stored procedure into two parts:
· Conditional expression
· SQL statements
Above sample-stored procedure, we do not have any conditional expression to call SQL statements, but we have two SQL statements.
STEP II:
To avoid syntax issue on snowflake environment, we are going to run and check compatibility of SQL, so this will avoid syntax issue/object existing issue while executing stored procedure.
STEP III:
So far we have completed 50 percentage of work, above sample stored procedure will have native connectivity to redshift database, In order to execute in snowflake we are going to use “connection” parameter to connect snowflake database.
STEP IV:
Now we have connected snowflake-using python successfully, above sample redshift stored procedure has,
· Two input parameters
cond_param
tmp_table_name
· One output parameter
o tmp_table_name
Let us create a procedure in python for above sample red-shift stored procedure.
def employee_temp_fn(self):
"""
Function: sel_result_set
I. Drop a temp table
II. Recreate temp table with coditional parameter
"""
drop_obj = "drop table if exists " + self.tmp_table_name
temp_obj = "create temp table " + self.tmp_table_name + " as select * from employee where salary >= " + self.cond_param
# ---- try Clause for python cursor
try:
self.sfconnection.cursor().execute(drop_obj)
self.logger.info("Object dropped" + self.tmp_name)
self.sfconnection.cursor().execute(temp_obj)
self.logger.info("Object created" + self.tmp_name)
# ---- Exception Cluase
except Exception as e:
self.logger.info(e)
finally:
self.logger.info("Closed: Snowflake Connection")
self.sfconnection.close()
STEP V:
import os
import sys
import logging
import snowflake.connector as sf
class redshiftTosnf:
def __init__(self, cond_param, tmp_table_name):
"""
Constructor
"""
# ---- Log configuration
logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(funcName)s:%(lineno)d -%(message)s')
self.logger = logging.getLogger(__name__)
self.logger.setLevel(logging.INFO)
# ---- Input parameter : sel_result_set
self.cond_param = cond_param
self.tmp_table_name = tmp_table_name
# ---- Snowflake Connection initialize
self.sfconnection = sf.connect(
user ='user_name',
account ='account_name',
warehouse ='warehouse_name',
database ='database_name',
schema ='schema_name',
role ='role'
)
def employee_temp_fn(self):
"""
Function: sel_result_set
I. Drop a temp table
II. Recreate temp table with coditional parameter
"""
drop_obj = "drop table if exists " + self.tmp_table_name
temp_obj = "create temp table " + self.tmp_table_name + " as select * from employee where salary >= " + self.cond_param
# ---- try Clause for python cursor
try:
self.sfconnection.cursor().execute(drop_obj)
self.logger.info("Object dropped" + self.tmp_name)
self.sfconnection.cursor().execute(temp_obj)
self.logger.info("Object created" + self.tmp_name)
# ---- Exception Cluase
except Exception as e:
self.logger.info(e)
finally:
self.logger.info("Closed: Snowflake Connection")
self.sfconnection.close()
if __name__ == "__main__":
# ---- Class object initialize
clsObj = redshiftTosnf(20000, 'employee_temp')
# ---- Function Call using class Object
clsObj.employee_temp_fn()
clsObj.employee_temp_fn()
It is time to execute the function by passing values to two input paramet
ers.
Hope this article helpful!!
Note:
If you have many stored procedure in existing data warehouse, above process can be automated.
References:
https://docs.aws.amazon.com/redshift/latest/dg/stored-procedure-create.html
https://docs.snowflake.net/manuals/user-guide/python-connector-install.html