Building a Recommendation Engine in less than 20 lines of code - On Azure Databricks
Suresh Sethuramaswamy
Field Engineering at Databricks|Ex-Microsoft | 40 under 40 | Forbes Tech Council| Senior Member IEEE Princeton
Azure Databricks is an Apache Spark-based analytics platform optimized for the Microsoft Azure cloud services platform. Designed with the founders of Apache Spark, Databricks is integrated with Azure to provide one-click setup, streamlined workflows, and an interactive workspace that enables collaboration between data scientists, data engineers, and business analysts.
An Unified Analytics Platform
The Data Interfaces
There are several key interfaces that you should understand when you go to use Spark.
Dataset
- The Dataset is Apache Spark's newest distributed collection and can be considered a combination of DataFrames and RDDs. It provides the typed interface that is available in RDDs while providing a lot of conveniences of DataFrames. It will be the core abstraction going forward.
DataFrame
- The DataFrame is collection of distributed Row types. These provide a flexible interface and are similar in concept to the DataFrames you may be familiar with in python (pandas) as well as in the R language.
RDD (Resilient Distributed Dataset)
- Apache Spark's first abstraction was the RDD or Resilient Distributed Dataset. Essentially it is an interface to a sequence of data objects that consist of one or more types that are located across a variety of machines in a cluster. RDD's can be created in a variety of ways and are the "lowest level" API available to the user. While this is the original data structure made available, new users should focus on Datasets as those will be supersets of the current RDD functionality.
Databricks Runtime & Spark Clusters
Azure Databricks is designed for Azure! This means:
- Decoupling Storage and Compute
- Ephemerial Clusters
- Multiple Clusters
- Autoscaling / Serverless
Azure Databricks Clusters:
- Clusters Spin up in minutes (~5min) :-)
- Two types of clusters: Interactive (shared) /Job Clusters
- Interactive clusters can also be Azure Databricks Serverless Pools
- SQL Endpoints (JDBC/ODBC) for Power BI, Tableau, etc.
Databricks Workspace & Notebooks
- Interactive notebooks with support for multiple languages (SQL, Python, R and Scala)
- Real-time collaboration
- Notebook revision history and version control
- One-click visualizations
- Workspace ACLs
- Library Management
- Ability to convert a notebook into a Job
Mounting data in blob stores to Azure Databricks for fast and scalable data storage
dbutils - provides various utilities for users to interact with the rest of Databricks.
#Lets mount the Folder from BlobStore into DBFS using dbutils
dbutils.fs.mount(
source = "wasbs://[email protected]/",
mount_point = "/mnt/training-sources/",
extra_configs = {"fs.azure.sas.source.thatbigdata.blob.core.windows.net": "SAS-KEY"}) ls /mnt/training-sources/
Lets list down the files
%fs - A Cell magic to instruct the interpretor that the following line is a File system command, we can run any typical Linux commands here.
Lets look at the products data we have in csv file
%fs ls /mnt/training-sources/
Reading the CSV in Dataframes - schema inference ,Unit Test ,Custom Schema and Parquet
Lets read the product.csv into a dataframe and infer the schema from the file itself using spark's inbuilt CSV reader. Databricks notebook can handle Scala,Python,R and SQL, but i chose python for its wide popularity in the datascience community
# The code below reads the data into a dataframe, infers the schema automatically from the csv file
csv_file = "dbfs:/mnt/training-sources/initech/productsCsv/product.csv"
product_df = (spark.read # The DataFrameReader
.option("header", "true") # Use first line of all files as header
.option("inferSchema", "true") # Automatically infer data types
.csv(csv_file) # Creates a DataFrame from CSV after reading in the file
)
How about performance ??? Though the code above works seamless, its not advisable on a large dataset as it scans the entire data and its expensive.
Lazy evaluation: Spark is known for its Lazy evaluation, it means unless an action is performed against it, it doesn't do anything.
display(product_df)
Unit Test
Run an assertive unit test to ensure proper data type and column names
%python
product_df.printSchema()
columns = product_df.dtypes
assert len(columns) == 8, "Expected 8 columns but found " + str(len(columns))
assert columns[0][0] == "product_id", "Expected column 0 to be \"product_id\" but found \"" + columns[0][0] + "\"."
assert columns[0][1] == "int", "Expected column 0 to be of type \"int\" but found \"" + columns[0][1] + "\"."
assert columns[1][0] == "category", "Expected column 1 to be \"category\" but found \"" + columns[1][0] + "\"."
assert columns[1][1] == "string", "Expected column 1 to be of type \"string\" but found \"" + columns[1][1] + "\"."
assert columns[2][0] == "brand", "Expected column 2 to be \"brand\" but found \"" + columns[2][0] + "\"."
assert columns[2][1] == "string", "Expected column 2 to be of type \"string\" but found \"" + columns[2][1] + "\"."
assert columns[3][0] == "model", "Expected column 3 to be \"model\" but found \"" + columns[3][0] + "\"."
assert columns[3][1] == "string", "Expected column 3 to be of type \"string\" but found \"" + columns[3][1] + "\"."
assert columns[4][0] == "price", "Expected column 4 to be \"price\" but found \"" + columns[4][0] + "\"."
assert columns[4][1] == "double", "Expected column 4 to be of type \"double\" but found \"" + columns[4][1] + "\"."
assert columns[5][0] == "processor", "Expected column 5 to be \"processor\" but found \"" + columns[5][0] + "\"."
assert columns[5][1] == "string", "Expected column 5 to be of type \"string\" but found \"" + columns[5][1] + "\"."
assert columns[6][0] == "size", "Expected column 6 to be \"size\" but found \"" + columns[6][0] + "\"."
assert columns[6][1] == "string", "Expected column 6 to be of type \"string\" but found \"" + columns[6][1] + "\"."
assert columns[7][0] == "display", "Expected column 7 to be \"display\" but found \"" + columns[7][0] + "\"."
assert columns[7][1] == "string", "Expected column 7 to be of type \"string\" but found \"" + columns[7][1] + "\"."
print("Congratulations, all tests passed!\n")
# Remember there are no primitive datatypes in Spark
# Required for StructField, StringType, IntegerType, etc.
from pyspark.sql.types import *
csv_schema = StructType([
StructField("product_id", LongType(), True),
StructField("category", StringType(), True),
StructField("brand", StringType(), True),
StructField("model", StringType(), True),
StructField("price", DoubleType(), True),
StructField("processor", StringType(), True),
StructField("size", StringType(), True),
StructField("display", StringType(), True)
])
# Lets read the same products.csv file into a dataframe , by assigning the custom schema instead of inferencing it from the file
product_df = (spark.read # The DataFrameReader
.option('header', 'true') # Ignore line #1 - it's a header
.schema(csv_schema) # Use the specified schema
.csv(csv_file) # Creates a DataFrame from CSV after reading in the file
)
Now lets try to write the data into a parquet(most storage and performance optimized file format for querying through spark) format
product_df.write.parquet("dbfs:/tmp/az/output/reading_data.lab.parquet")
Some Cool Machine learning stuff !!!
Product Recommendations - in 10 lines of spark code
One of the most common use cases of Bigdata and Machine learning is recommendation engine. Google uses to publish relevant ads,Amazon uses for product recommendations,Netflix for Movie receommendations
Here are the SparkML Python docs and the Scala docs.
- Phase 1: Exploratory Analysis
Some of the questions an exploratory analysis phase should answer are such as:
- How many observations do I have?
- What are the features?
- Do I have missing values?
- What do summary statistics (e.g. mean and variance) tell me about my data?
Start by importing the data. Bind it to productRatings by running the cell below
product_ratings = spark.read.parquet("dbfs:/mnt/training-sources/productRatings/")
#use the Spark parquet reader like the productRatings DataFrame above to read the products dataframe
product_df = spark.read.parquet("dbfs:/mnt/training-sources/productsShort/")
Now that we have ratings data in the format of item,user,rating and the products data, lets start with the collaborative filtering algorithm
Phase 2: Collaborative Filtering
The image below (from Wikipedia) shows an example of predicting the user's rating using collaborative filtering. At first, people rate different items (like videos, products, articles, images, games). After that, the system is making predictions about a user's rating for an item, which the user has not rated yet. These predictions are built upon the existing ratings of other users, who have similar ratings with the active user. For instance, in the image below the system has made a prediction, that the active user will not like the video
Lets split the dataset for training,validation and testing
#We'll hold out 60% of the data for training, 20% for validation, and leave 20% for testing
seed = 1800009193
(training_df, validation_df, test_df) = product_ratings.randomSplit([.6, .2, .2], seed=seed)My Ratings
My Ratings
Let me add some random ratings for items of my choice
my_user_id = 0
my_rated_products = [
(1, my_user_id, 5), # Replace with your ratings.
(2, my_user_id, 5),
(3, my_user_id, 5),
(4, my_user_id, 5),
(6, my_user_id, 1),
(7, my_user_id, 1),
(9, my_user_id, 1),
(9, my_user_id, 1),
(9, my_user_id, 1),
]
Lets create a dataframe of my above product interest matrix
my_ratings_df = spark.createDataFrame(my_rated_products, ['product_id','user_id','rating'])
Now, lets join your ratings with the 'product_df`' to see your ratings with the product metadata on product_id field
display(my_ratings_df.join(product_df, ['product_id']))
Union your ratings with the `trainingDF` to see your ratings with the product metadata
training_with_my_ratings_DF = training_df.union(my_ratings_df)
Alternating Least Squares
In this part, we will use the Apache Spark ML Pipeline implementation of Alternating Least Squares, ALS (Python) or ALS (Scala). ALS takes a training dataset (DataFrame) and several parameters that control the model creation process.
The process we will use for determining the best model is as follows:
- Pick a set of model parameters. The most important parameter to model is the rank, which is the number of columns in the Users matrix (green in the diagram above) or the number of rows in the Products matrix (blue in the diagram above). In general, a lower rank will mean higher error on the training dataset, but a high rank may lead to overfitting. We will train models with a rank of 2 using the trainingDF dataset.
- Set the appropriate parameters on the ALS object:
- The "User" column will be set to the values in our user_id DataFrame column.
- The "Item" column will be set to the values in our product_id DataFrame column.
- The "Rating" column will be set to the values in our rating DataFrame column.
- We'll be using a regularization parameter of 0.1.
- Note: Read the documentation for the ALS class carefully. It will help you accomplish this step.
- Have the ALS output transformation (i.e., the result of ALS.fit()) produce a new column called "prediction" that contains the predicted value.
- Create multiple models using ALS.fit(), one for each of our rank values. We'll fit against the training data set (trainingDF).
- We'll run our prediction against our validation data set (validationDF) and check the error.
- Use .setColdStartStrategy("drop") so that the model can deal with missing values.
from pyspark.ml.recommendation import ALS
# Let's initialize our ALS learner
als = ALS()
# Now we set the parameters for the method
# Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
(als.setPredictionCol("prediction")
.setUserCol("user_id")
.setItemCol("product_id")
.setRatingCol("rating")
.setMaxIter(5)
.setSeed(seed)
.setRegParam(0.1)
.setRank(2)
.setColdStartStrategy("drop")
)
Validation:
#TO-DO
model = als.fit(training_with_my_ratings_DF) #fill in with training_with_my_ratings_DF
# Run the model to create a prediction. Predict against the validationDF.
predict_df = model.transform(validation_df)
# Evaluate the model by computing the RMSE on the test data
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))
display(predict_df)
Phase 3: Get the Product recommendations
#Filter the predictions DF for your user id something like "user_id = ID"
predictions = model.recommendForAllUsers(10) # 10 here represents the top 10 results
my_predictions = predictions.filter("user_id = 0")
Lets join the dataset with product meta data to obtain product name
from pyspark.sql.functions import *
my_recs = my_predictions.select("user_id", explode("recommendations").alias("recommendations")).select("user_id", "recommendations.product_id", "recommendations.rating").join(product_df, ['product_id'])
Lets look at the filtered top 10 products predictions based on my interest
display(my_recs)
Conclusion:
Databricks as an unified analytics platform , takes away most of the platform challenges for spark workloads. It offers a simple and powerful interface for data engineers to schedule jobs , monitor clusters. Data scientists can focus on model training rather than spending time in tuning spark environment configs. Also the recently added feature called Databricks Delta tables support ACID transactions which was a long time expectation of data engineers in bigdata world . Power of Spark now available for everyone without hassles !!!