Spark Tidbits - Lesson 11
In 1970, Edgar Codd who was a researcher at IBM published a paper on "A Relational Model of Data for Large Shared Data Banks". This paper is thought to be the spark that led to the first set of relational databases.
Today, we are going to explore how to read tables from a Azure Database For PostgreSQL flexible server into a Microsoft Fabric Lakehouse using Spark.
The first step is to create a new lakehouse for this proof of concept. The image below shows an aptly named lakehouse.
I am using my default workspace for this example. The image below shows an existing stocks lakehouse as well as the new pubs lakehouse. Did I mention that the test database is the pubs schema that has been modified to work with PostgreSQL?
Before we start coding, we need to save important information into our Azure Key Vault. The image below shows the service url, user name and password secret have been entered.
The code below uses the mssparkutils library to retrieve the secrets from the vault.
#
# 1 - grab secrets
#
url = mssparkutils.credentials.getSecret{https://kvs4tips2021.vault.azure.net/", "sec-postgresql-url")
usr = mssparkutils.credentials.getSecret{https://kvs4tips2021.vault.azure.net/", "sec-postgresql-usr")
pwd = mssparkutils.credentials.getSecret{https://kvs4tips2021.vault.azure.net/", "sec-postgresql-pwd")
If you try printing the above variables, the only output shows [REDACTED]. How can we see the content of the variables. Do not fret. The little function below decodes the secret.
#
# 2 - define utility function no 1
#
def grab_secret_text(secret_txt):
plain_txt = str('')
for c in secret_txt:
plain_txt = plain_txt + str(print(c, end = ' ')).strip{None")
return plain_txt
At this time, lets return a non confidential secret. The url is a pretty safe one to display.
#
# 3 - decode the secrets
#
ret = grab_secret_text(url)
print(ret)
The screen shot below shows the URL to our PostgreSQL service.
I kind of put the cart in front of the horse. Spark can read many different types of relational databases; however, we need to add the spark JDBC driver to the cluster. How do we do that in Microsoft Fabric?
We need to create an environment that has the driver in the configuration. Please go to the settings image on the default workspace. As you can see, my workspace is backed by Azure Fabric Capacity.
The next step is to look at the spark settings. Under the set default environment, we can see it being in a off state. Please change the state to on now. After saving this change, click the drop down list for default environment. We will want to create a new environment now.
The next step in the process is to name the environment. Hindsight is 20/20. I would probably prefix the name with "env-" in the future.
There are many settings for the spark cluster. We want to add a customer library. Please use the following url to download the JDBC driver for PostgreSQL. This will land in your downloads directory on your windows laptop. Use the upload feature to add and save the new JAR to the environment.
At this time, I want to write two utility functions. The first function takes the connection information and returns a dataframe representation of the given table. Please see the spark documentation for details.
领英推荐
#
# 4 - define utility function no 2
#
def grab_postgresql_table(url, usr, pwd, tbl):
connection_properties = {
"user": usr,
"password": pwd,
"driver": "org.postgresql.Driver"
}
df = spark.read.jdbc(url=url, table=tbl, properties=connection_properties)
return(df)
The second function drops and adds the dataframe to the hive catalog as a delta table.
#
# 5 - define utility function no 3
#
def make_hive_table(df, tbl):
# del table
stmt = f"drop table if exists {tbl}"
spark.sql(stmt)
# add table
df.write.saveAsTable(tbl)
The pubs database contains 11 tables. We could make 22 distinct calls by hand in one cell. However, this is not a meta data driven design!
Instead, lets use the system table in PostgreSQL to create a list of python dictionaries (code). The PG SQL code below queries the pg_tables object in the pg_catalog schema. The result is almost a correct code for an array of dictionaries. Copy and past this code into our lakehouse notebook.
The metadata array can be seen below. I added a for loop and print statements to validate the metadata.
#
# 6 - create + display meta data
#
ary = [
{"src": "dbo.authors", "trg": "raw_dbo_authors"},
{"src": "dbo.discounts", "trg": "raw_dbo_discounts"},
{"src": "dbo.employee", "trg": "raw_dbo_employee"},
{"src": "dbo.jobs", "trg": "raw_dbo_jobs"},
{"src": "dbo.pub_info", "trg": "raw_dbo_pub_info"},
{"src": "dbo.publishers", "trg": "raw_dbo_publishers"},
{"src": "dbo.roysched", "trg": "raw_dbo_roysched"},
{"src": "dbo.sales", "trg": "raw_dbo_sales"},
{"src": "dbo.stores", "trg": "raw_dbo_stores"},
{"src": "dbo.titleauthor", "trg": "raw_dbo_titleauthor"},
{"src": "dbo.titles", "trg": "raw_dbo_titles"}]
for ele in ary:
print(ele.get("src"))
print(ele.get("trg"))
print("")
We can see that the source and target table names are correct. Lets write the code to extract and load data from the relational database to the lakehouse.
If you run the code seen below at this time, it will fail. Why is that?
#
# 7 - move tables from postgresql to hive
#
for ele in ary:
tbl1 = ele.get("src")
df = grab_postgresql_table(url, usr, pwd, tbl1)
tbl2 = ele.get("trg")
make_hive_table(df, tbl2)
print(f"read from pgsql table - {tbl1} and write to hive table - {tbl2}")
Each Azure database is protected by a firewall. If you want to connect with your laptop, you need to add your IP address into the white listed table. The same is with Azure Services. By default, the database server will not talk to other Azure Services.
Just enable the public access button for Azure Services and save the change.
Try executing code block number seven that moves the 11 tables over to the lakehouse. The screen shot below shows a successful execution.
Last but not least, if we check the list of tables in the lakehouse, we can see eleven tables now exist.
In a nutshell, a developer can easily move data from cloud relational databases into the Microsoft Lakehouse using Spark. One must create an environment that has the correct JDBC driver. This same technique can be applied to any relational database that has a public IP. This include in cloud and on premises databases.
A word of caution, hackers might try a denial of service attack on public IPs that originate from on premises. Please have a firewall appliance, with DDoS protection enabled, in front of your database. Azure Databases already have this in place for all Platform As A Service (PaaS) offerings. Please see the snippet when I did a google search on the topic.