Spark Tidbits - Lesson 10
Apache Spark was design to execute java byte programs in a cluster using general purpose computing. Most cloud implementations use LINUX virtual machines as the nodes in the cluster. Therefore, it is not surprising that mounting storage and working with folders and/or files is supported by the utility libraries. In Azure Databricks use the dbutils library and for Microsoft products (synapse + fabric) use the mssparkutils library.
Today, we are going to work with Microsoft fabric. Lets list out any existing mounts using the code below.
#
# 1 - existing mounts
#
mssparkutils.fs.mounts()
The default mount gives us access to the lakehouse files and tables. There is also a notebook working directory mount.
Right now, I have the weather data loaded in the Files directory. See image below that uses the local path with the shell command to list out folders and files.
We can create custom mounts to both other lakehouse and Azure Data Lake Storage. The code below uses the workspace and lakehouse ids to mount the Adventure Works lakehouse.
#
# 3 - mount lakehouse in same workspace
#
mssparkutils.fs.mount(
"abfss://a668a328-9f67-4678-93f2-10d5afdfe3ad@onelake.dfs.fabric.microsoft.com/0416e287-2a33-4093-8fa6-5f46d7e660d5",
"/advwrks"
)
Please note that mounts only exist during a session. See the local path for details shown below.
Use the getMountPath method of the file system (fs) class to retrieve the fully qualified path when required. The code below shows only the /raw/saleslt directory exists under Files folder in the Adventure Works lakehouse. We want to create a new directory called weather and copy the files from the current lakehouse over to the new storage folder.
The above code was part of step 4 in my learning notebook. Step 5 creates the new folder and executes the file copy.
#
# 5 - create dir + copy files between lakehouses
#
# get file list
files = mssparkutils.fs.ls(f"file://{mssparkutils.fs.getMountPath('/default')}/Files")
print(files)
# create new directory
mssparkutils.fs.mkdirs(f"file://{mssparkutils.fs.getMountPath('advwrks')}/Files/raw/weather")
# copy over files
for file in files:
print(file.path)
src = file.path
dst = f"file://{mssparkutils.fs.getMountPath('advwrks')}/Files/raw/weather/"
mssparkutils.fs.cp(src, dst)
# show the resulting files
mssparkutils.fs.ls(f"file://{mssparkutils.fs.getMountPath('advwrks')}/Files/raw/weather")
The image below shows the three files in the source location.
For some reason, the library created a Cyclic Redundancy Check file for each csv or text file. The image below shows six files in the weather directory.
Microsoft Fabric introduced the concept of shortcuts that are created at the lakehouse object level. However, other Apache Spark implementations uses the mount method. Microsoft Fabric can use either an Account Key or Shared Access Signature.
领英推荐
Due to its limited scope, I stored a SAS key value pair into an existing key vault named kvs4tips2021.
The credentials class has a getSecret method. The code below attempts to read in the secret and mount the ADLS storage.
#
# 6 - mount adls storage, copy files to lakehouse
#
token = mssparkutils.credentials.getSecret("https://kvs4tips2021.vault.azure.net/", "sec-sas-key-4-adls")
mssparkutils.fs.mount(
"abfss://[email protected]",
"/adls",
{"sasToken": token}
)
By default, the notebook runs under the current session user. In my case, the [email protected] user does not have access to the Azure Key Vault. Please see the http error code 403 below.
Once access has been granted to the user, the Python code works just fine. The image below shows the new mount point called adls.
If we take a look at the storage folder, we can see there are five delta tables stored as sub folders.
The following code uses the copy file command with the recurse option.
# copy stocks directory
src = f"file://{mssparkutils.fs.getMountPath('adls')}/stocks"
dst = f"file://{mssparkutils.fs.getMountPath('/default')}/Files"
mssparkutils.fs.cp(src, dst, recurse=True)
Using the lake house explorer, on can see the five folders have been copied to a sub directory called stocks with our sample lakehouse named lh_mounts_n_files.
In a nutshell, there is plenty of PySpark code out there using libraries to mount storage and manage folders/files. Databricks, Fabric and Synapse all have the ability to read secrets from a key vault. If you have to use credentials, using a key vault is the preferred way to centralize and control information.
Mounting storage in Databricks results in a mount point under the /mnt directory. However, in Fabric and Synapse we have the spark session id as part of the path. Therefore, use the get mount path method to convert the mount name to a fully qualified path.
Next time, I will be talking about how to read and write data to a PostgreSQL database from Microsoft Fabric.
John Miner Thanks for Sharing! ??