Marvelous MLOps #57: Building an End-to-end MLOps Project with Databricks
Author: Benito Martin
Last October, I had the privilege of enrolling in the newly launched MLOps Course on Databricks, led by Maria Vechtomova and Ba?ak Eskili. Developing a project and gaining expert insights and best practices from industry leaders is always an excellent opportunity, no matter how much — or how little — you know about deploying a model into production.
In this blog, I’ll walk through my capstone project. The course covered a range of topics, and, although I have experience in some of them, this was my first time using Databricks. If you are new into MLOps, I can highlight the following key learnings you will gain from the live lectures and project development:
You can find the complete code for this project in this repository.
Getting Started
The first step is to create an account on Databricks and connect it to a cloud provider. I chose AWS since I’m more familiar with it, but you can use Azure or GCP as alternatives.
Once your account is set up, the next step is to create a Databricks workspace. This workspace is essential as it provisions the necessary resources for your project using an AWS CloudFormation template (or its equivalent for other providers). The provisioning process takes a few minutes, and once complete, you can open the workspace to access all the tools needed to develop the project, such as the Unity Catalog, Job Runs, and Bundles.
Pro Tip: When creating the workspace, ensure you select one of the regions that supports online tables, as these are required for feature serving during the course. I initially used “eu-central-1” but had to switch to “eu-west-1” later on.
To use any of the tools on Databricks, you’ll need to set up a compute resource (cluster) as well. While working with AWS, I discovered that creating a workspace automatically provisions two AWS resources that run continuously: a NAT Gateway and an Elastic IP. Together, these cost around $2/day. To optimize costs, I developed two scripts (available here) that allow you to delete and recreate these resources as needed.
Project Repository
At the start of the course, Maria and Ba?ak create a dedicated project repository for each participant. This repository, which you need to clone, contains the main project files and dependencies to get you started.
Right from the beginning, you can see their emphasis on maintaining high project quality. The repository includes:
For package management, we used uv, a lightweight and intuitive tool that simplifies dependency and virtual environment management. Initially, the repository used setuptools as the build system. For my project, however, I decided to switch to the hatchling backend.
[project]
name = "credit-default-databricks"
version = "0.0.11"
description = "MLOps Project with Databricks: Credit Default Dataset"
keywords = ["MLOps", "Databricks", "Machine Learning"]
authors = [{ name = "Benito Martin"}]
readme = "README.md"
requires-python = ">=3.11,<3.12"
dependencies = ["lightgbm>=4.5.0, <5",
"scikit-learn>=1.5.1, <2",
"cloudpickle>=3.0.0, <4",
"mlflow==2.17.2",
"numpy>=1.26.4, <2",
"pandas>=2.2.2, <3",
"pyarrow>=14.0.0, <15",
"cffi>=1.17.1, <2",
"scipy>=1.14.1, <2",
"matplotlib>=3.9.2, <4",
"databricks-feature-engineering==0.6",
"databricks-feature-lookup==1.2.0",
"imbalanced-learn>=0.11.0, <1",
"loguru>=0.6.0, <1",
"pre-commit>=2.20.0, <3",
"pydantic==2.9.0, <3",
"databricks-sdk==0.32.0",
]
[project.optional-dependencies]
dev = ["databricks-connect>=15.4.1, <16",
"ipykernel>=6.29.5, <7",
"pip>=24.2",
"pytest>=8.1.1, <9",
"ruff>=0.2.0, <1" ,
"python-dotenv>=1.0.0, <2"
]
# Build system configuration
[build-system]
requires = ["hatchling>=1.23.0"]
build-backend = "hatchling.build"
[tool.hatch.build]
packages = ["src"]
# Wheel build configuration
[tool.hatch.build.targets.wheel]
packages = ["src/credit_default"]
[tool.ruff]
line-length = 120
[tool.ruff.lint]
select = [
"F", # pyflakes rules
"E", # pycodestyle error rules
"W", # pycodestyle warning rules
"B", # flake8-bugbear rules
"I", # isort rules
]
ignore = [
"E501", # line-too-long
]
[tool.ruff.format]
indent-style = "space"
During the first lecture, they showed how to connect and authenticate your Databricks workspace with VS Code by downloading the Databricks extension. This integration allows you to run your code locally while synchronizing with the Databricks UI.
One of the features I found particularly interesting in Databricks is how Jupyter notebooks created in the Databricks UI are automatically transformed into Python files when synchronized with your local environment. This functionality makes it much easier to refine your code into a clean, structured Python script, ideal for production-ready workflows or further development outside the notebook environment.
Prepare the Data
Files:
Once familiar with the Databricks environment, we had to choose a datasetfor the project. In my case, I selected a binary classification dataset related to credit card default payments. This dataset provides financial and demographic information on credit card clients in Taiwan, covering the period from April to September 2005. It includes 25 variables, such as credit limits, payment history, bill statements, and repayment statuses, alongside demographic factors like age, gender, education, and marital status. The target variable, default.payment.next.month, indicates whether a client defaulted on their payment in the following month.
First, I saved the raw dataset into a Volume in the Unity Catalog, along with the Python wheel package. This package can be directly installed on the cluster, enabling all jobs to run with the required dependencies.
Next, I created several files for data cleaning, preprocessing (including scaling and train/test split), as well as a configuration file for logging and data validation using Pydantic to ensure the data met defined schemas and constraints. This is crucial, as if the data does not fulfill certain requirements, the downstream processes won’t work.
Finally, I saved both the train and test datasets into the Databricks Unity Catalog as Spark Delta tables by running the prepare_data_notebook.py that trigger the preprocessing steps.
class DataPreprocessor:
"""
A class for preprocessing credit default data, including scaling features.
Attributes:
data_cleaning (DataCleaning): An instance of the DataCleaning class used for data preprocessing.
cleaned_data (pd.DataFrame): The cleaned DataFrame after preprocessing.
features_robust (list): List of feature names for robust scaling.
X (pd.DataFrame): Features DataFrame after cleaning.
y (pd.Series): Target Series after cleaning.
preprocessor (ColumnTransformer): ColumnTransformer for scaling the features.
"""
def __init__(self, filepath: str, config: Config, spark: SparkSession):
"""
Initializes the DataPreprocessor class.
Args:
filepath (str): The path to the CSV file containing the data.
config (Config): The configuration model containing preprocessing settings.
"""
self.catalog_name = config.catalog_name
self.schema_name = config.schema_name
self.spark = spark
try:
# Initialize DataCleaning to preprocess data
logger.info("Initializing data cleaning process")
self.data_cleaning = DataCleaning(filepath, config, spark)
self.cleaned_data = self.data_cleaning.preprocess_data()
logger.info("Data cleaning process completed")
# Define robust features for scaling from config
self.features_robust = config.features.robust
# Define features and target
self.X = self.cleaned_data.drop(columns=[target.new_name for target in config.target])
self.y = self.cleaned_data[config.target[0].new_name]
# Set up the ColumnTransformer for scaling
logger.info("Setting up ColumnTransformer for scaling")
self.preprocessor = ColumnTransformer(
transformers=[
("robust_scaler", RobustScaler(), self.features_robust) # Apply RobustScaler to selected features
],
remainder="passthrough", # Keep other columns unchanged
)
except KeyError as e:
logger.error(f"KeyError encountered during initialization: {str(e)}")
raise
except Exception as e:
logger.error(f"An error occurred during initialization: {str(e)}")
raise
def get_processed_data(self) -> Tuple:
"""
Retrieves the processed features, target, and preprocessor.
Returns:
Tuple: A tuple containing:
- pd.DataFrame: The features DataFrame.
- pd.Series: The target Series.
- ColumnTransformer: The preprocessor for scaling.
"""
try:
logger.info("Retrieving processed data and preprocessor")
logger.info(f"Feature columns in X: {self.X.columns.tolist()}")
# Log shapes of processed data
logger.info(f"Data preprocessing completed. Shape of X: {self.X.shape}, Shape of y: {self.y.shape}")
return self.X, self.y, self.preprocessor
except Exception as e:
logger.error(f"An error occurred during data preprocessing: {str(e)}")
def split_data(self, test_size=0.2, random_state=42) -> Tuple[pd.DataFrame, pd.DataFrame]:
"""Split the cleaned DataFrame into training and test sets."""
X_train, X_test, y_train, y_test = train_test_split(
self.X, self.y, test_size=test_size, random_state=random_state
)
train_set = pd.concat([X_train, y_train], axis=1)
test_set = pd.concat([X_test, y_test], axis=1)
return train_set, test_set
def save_to_catalog(self, train_set: pd.DataFrame, test_set: pd.DataFrame, spark: SparkSession):
"""Save the train and test sets into Databricks tables."""
train_set_with_timestamp = spark.createDataFrame(train_set).withColumn(
"Update_timestamp_utc", to_utc_timestamp(current_timestamp(), "UTC")
)
test_set_with_timestamp = spark.createDataFrame(test_set).withColumn(
"Update_timestamp_utc", to_utc_timestamp(current_timestamp(), "UTC")
)
train_set_with_timestamp.write.mode("overwrite").saveAsTable(
f"{self.catalog_name}.{self.schema_name}.train_set"
)
test_set_with_timestamp.write.mode("overwrite").saveAsTable(f"{self.catalog_name}.{self.schema_name}.test_set")
spark.sql(
f"ALTER TABLE {self.catalog_name}.{self.schema_name}.train_set "
"SET TBLPROPERTIES (delta.enableChangeDataFeed = true);"
)
spark.sql(
f"ALTER TABLE {self.catalog_name}.{self.schema_name}.test_set "
"SET TBLPROPERTIES (delta.enableChangeDataFeed = true);"
)
Experiment Tracking and Feature Engineering
File: feature_mlflow_experiment_notebook.py
Once the train and test datasets were pre-processed, the next step was to conduct a series of experiments using MLflow, ranging from basic model training to advanced custom model functions and leveraging Feature Tables from the Databricks Feature Store.
The Feature Store acts as a centralized repository that helps data scientists to:
For the project we focused on the feature engineering model, where we created our Feature Table based on the train set specific columns, a primary key as constraint and enabled Change Data Feed (CDF) to track changes/updates. This is a critical point as CDF is not enabled by default and during the inference process, we added additional data to simulate model/data drift, which is common in production environments.
# First, create the feature table with original data
create_table_sql = f"""
CREATE OR REPLACE TABLE {config.catalog_name}.{config.schema_name}.features_balanced
(Id STRING NOT NULL,
{', '.join([f'{col} DOUBLE' for col in columns])})
"""
spark.sql(create_table_sql)
# Add primary key and enable Change Data Feed (CDF)
spark.sql(
f"ALTER TABLE {config.catalog_name}.{config.schema_name}.features_balanced ADD CONSTRAINT features_balanced_pk PRIMARY KEY(Id);"
)
spark.sql(
f"ALTER TABLE {config.catalog_name}.{config.schema_name}.features_balanced SET TBLPROPERTIES (delta.enableChangeDataFeed = true);"
)
# Create training set based on feature table
training_set = fe.create_training_set(
df=train_set,
label="Default",
feature_lookups=[
FeatureLookup(
table_name=f"{config.catalog_name}.{config.schema_name}.features_balanced",
feature_names=columns,
lookup_key="Id",
)
],
exclude_columns=["Update_timestamp_utc"],
)
After creating the Feature Table, we trained a model and registered it in Unity Catalog using the feature engineering options on Databricks and the signature option on MLflow, which is mandatory to define the input and outputs formats and ensures seamless model integration. Without a signature, the model cannot be registered in the Unity Catalog.
Additionally, when a model is trained and logged with a Feature Table, it automatically captures the feature specifications used during training. During inference, it retrieves and joins the appropriate features from the corresponding Feature Tables.
# Load feature-engineered DataFrame
training_df = training_set.load_df().toPandas()
test_set = spark.table(f"{config.catalog_name}.{config.schema_name}.test_set").toPandas()
# Split features and target (exclude 'Id' from features)
X_train = training_df[columns]
y_train = training_df["Default"]
X_test = test_set[columns]
y_test = test_set["Default"]
features_robust = [
"Limit_bal",
"Bill_amt1",
"Bill_amt2",
"Bill_amt3",
"Bill_amt4",
"Bill_amt5",
"Bill_amt6",
"Pay_amt1",
"Pay_amt2",
"Pay_amt3",
"Pay_amt4",
"Pay_amt5",
"Pay_amt6",
]
# Setup preprocessing and model pipeline
preprocessor = ColumnTransformer(
transformers=[("robust_scaler", RobustScaler(), features_robust)],
remainder="passthrough",
)
# Create the pipeline with preprocessing and the LightGBM classifier
pipeline = Pipeline(steps=[("preprocessor", preprocessor), ("classifier", LGBMClassifier(**parameters))])
# Set and start MLflow experiment
mlflow.set_experiment(experiment_name="/Shared/credit-feature")
with mlflow.start_run(tags={"branch": "serving"}) as run:
run_id = run.info.run_id
pipeline.fit(X_train, y_train)
y_pred = pipeline.predict(X_test)
# Calculate and print metrics
auc_test = roc_auc_score(y_test, y_pred)
print("Test AUC:", auc_test)
# Log model parameters, metrics, and model
mlflow.log_param("model_type", "LightGBM with preprocessing")
mlflow.log_params(parameters)
mlflow.log_metric("AUC", auc_test)
signature = infer_signature(model_input=X_train, model_output=y_pred)
# Log model with feature engineering
fe.log_model(
model=pipeline,
flavor=mlflow.sklearn,
artifact_path="lightgbm-pipeline-model-feature",
training_set=training_set,
signature=signature,
)
mlflow.register_model(
model_uri=f"runs:/{run_id}/lightgbm-pipeline-model-feature",
name=f"{config.catalog_name}.{config.schema_name}.credit_model_feature",
)
The power of combining MLflow with the Databricks Feature Store enables streamlined experiment tracking and artifact logging, ensuring reproducibility and consistent feature computation across training and inference, which reduces errors. Additionally, enabling CDF allows the model to dynamically handle updates, making it well-suited for production scenarios.
领英推荐
Serving Architectures
File: model_serving_feat_lookup_notebook.py
The previous Feature Table that I created is an offline table, which can be used for offline inference using the registered model. However, for real-time serving through a dedicated endpoint, Online Tables must be used. Online Tables are serverless, read-only replicas of the Feature Table, designed to deliver low-latency, high-throughput access to data. These tables support real-time or batch serving by enabling the model to read precomputed features from the online store at inference time. The model then joins these features with the input data provided by the client request to the serving endpoint. To transfer data from the Feature Table to the Online Table, a Delta Live Table (DLT) pipeline is employed.
During the course, multiple architectures were explored:
The course focused primarily on Model Serving with Feature Lookup due to its ability to integrate seamlessly with feature stores, enabling robust and scalable real-time inference workflows.
# Create online table using the features table as source
spec = OnlineTableSpec(
primary_key_columns=["Id"],
source_table_full_name=f"{catalog_name}.{schema_name}.features_balanced",
run_triggered=OnlineTableSpecTriggeredSchedulingPolicy.from_dict({"triggered": "true"}),
perform_full_copy=False,
)
online_table_name = f"{catalog_name}.{schema_name}.features_balanced_online"
on_table = OnlineTable(name=online_table_name, spec=spec)
# ignore "already exists" error
try:
online_table_pipeline = workspace.online_tables.create(table=online_table)
except Exception as e:
if "already exists" in str(e):
pass
else:
raise e
# Create endpoint
workspace.serving_endpoints.create(
name="credit-default-model-serving-feature",
config=EndpointCoreConfigInput(
served_entities=[
ServedEntityInput(
entity_name=f"{catalog_name}.{schema_name}.credit_model_feature",
scale_to_zero_enabled=True,
workload_size="Small",
entity_version=1,
)
]
),
)
In addition to exploring different serving architectures, A/B testing was implemented to evaluate and compare the performance of two models trained with different hyperparameters using a custom MLflow function. The process used a wrapper class to distribute requests between two models (Model A and Model B) based on a deterministic hashing of the input Id. This ensured that the same input would always be routed to the same model during the testing phase.
## Wrapper that takes both models and send predictions to one
class CreditDefaultModelWrapper(mlflow.pyfunc.PythonModel):
def __init__(self, models):
self.models = models
self.model_a = models[0]
self.model_b = models[1]
def predict(self, context, model_input):
if isinstance(model_input, pd.DataFrame):
credit_id = str(model_input["Id"].values[0]) # Id number
hashed_id = hashlib.md5(credit_id.encode(encoding="UTF-8")).hexdigest()
# convert a hexadecimal (base-16) string into an integer
if int(hashed_id, 16) % 2:
predictions = self.model_a.predict(model_input.drop(["Id"], axis=1))
return {"Prediction": predictions[0], "model": "Model A"}
else:
predictions = self.model_b.predict(model_input.drop(["Id"], axis=1))
return {"Prediction": predictions[0], "model": "Model B"}
else:
raise ValueError("Input must be a pandas DataFrame.")
models = [model_A, model_B]
wrapped_model = CreditDefaultModelWrapper(models)
Databricks Asset Bundles (DAB)
Files:
Databricks Asset Bundles (DAB) are an Infrastructure-as-Code (IaC)framework designed to streamline the management and deployment of Databricks projects. By organizing code, configurations, and workflows into a cohesive structure, DAB enables the automation and version control of pipelines, ensuring consistency, scalability, reproducibility, and seamless Continuous Integration and Deployment (CI/CD) across environments.
DAB components like settings, resources, workflows, cluster, variables, and target environments are defined in the databricks.yml file. All previous steps, including data preprocessing, experiment tracking, model training, evaluation, and deployment, are transferred into well-formatted Python scripts. These scripts are then included as tasks within the databricks.ymlfile, forming a unified workflow.
Below is an example corresponding to the train_model.py workflow, which integrates key parameters like the Git SHA and job_run_id for ensuring code traceability and reproducibility across the pipeline.
# Set up logging
setup_logging(log_file="")
try:
# Parse arguments
parser = argparse.ArgumentParser()
parser.add_argument("--root_path", action="store", default=None, type=str, required=True)
parser.add_argument("--git_sha", action="store", default=None, type=str, required=True)
parser.add_argument("--job_run_id", action="store", default=None, type=str, required=True)
args = parser.parse_args()
root_path = args.root_path
git_sha = args.git_sha
job_run_id = args.job_run_id
logger.debug(f"Git SHA: {git_sha}")
logger.debug(f"Job Run ID: {job_run_id}")
logger.info("Parsed arguments successfully.")
# Load configuration
logger.info("Loading configuration...")
config_path = f"{root_path}/project_config.yml"
config = load_config(config_path)
logger.info("Configuration loaded successfully.")
# Initialize Databricks workspace client
workspace = WorkspaceClient()
logger.info("Databricks workspace client initialized.")
# Initialize Spark session
spark = SparkSession.builder.getOrCreate()
fe = feature_engineering.FeatureEngineeringClient()
logger.info("Spark session initialized.")
# Extract configuration details
catalog_name = config.catalog_name
schema_name = config.schema_name
target = config.target[0].new_name
parameters = config.parameters
features_robust = config.features.robust
columns = config.features.clean
columns_wo_id = columns.copy()
columns_wo_id.remove("Id")
# Convert train and test sets to Pandas DataFrames
train_pdf = spark.table(f"{catalog_name}.{schema_name}.train_set").toPandas()
test_set = spark.table(f"{catalog_name}.{schema_name}.test_set").toPandas()
# Now use create_training_set to create balanced training set
# Drop the original features that will be looked up from the feature store
# Define the list of columns you want to drop, including "Update_timestamp_utc"
feature_table_name = f"{catalog_name}.{schema_name}.features_balanced"
columns_to_drop = columns_wo_id + ["Update_timestamp_utc"]
train_set = spark.table(f"{catalog_name}.{schema_name}.train_set").drop(*columns_to_drop)
logger.info(f"Train set columns for feature table: {train_set.columns}")
# Create training set from feature table
mlflow.set_tracking_uri("databricks")
mlflow.set_registry_uri("databricks-uc")
training_set = fe.create_training_set(
df=train_set,
label=target,
feature_lookups=[
FeatureLookup(
table_name=feature_table_name,
feature_names=columns_wo_id,
lookup_key="Id",
)
],
exclude_columns=["Update_timestamp_utc"],
)
training_df = training_set.load_df().toPandas()
logger.info("Training set created and loaded.")
# Prepare train and test datasets
X_train = training_df[columns_wo_id]
y_train = training_df[target]
X_test = test_set[columns_wo_id]
y_test = test_set[target]
# Define preprocessing and pipeline
preprocessor = ColumnTransformer(
transformers=[("robust_scaler", RobustScaler(), features_robust)],
remainder="passthrough",
)
pipeline = Pipeline(steps=[("preprocessor", preprocessor), ("classifier", LGBMClassifier(**parameters))])
# MLflow setup
mlflow.set_experiment(experiment_name="/Shared/credit-feature")
logger.info("MLflow setup completed.")
# Train model and log in MLflow
with mlflow.start_run(tags={"branch": "bundles", "git_sha": git_sha, "job_run_id": job_run_id}) as run:
pipeline.fit(X_train, y_train)
y_pred = pipeline.predict(X_test)
auc_test = roc_auc_score(y_test, y_pred)
logger.info(f"Test AUC: {auc_test}")
# Log model details
mlflow.log_param("model_type", "LightGBM with preprocessing")
mlflow.log_params(parameters)
mlflow.log_metric("AUC", auc_test)
input_example = X_train.iloc[:5]
signature = infer_signature(model_input=input_example, model_output=pipeline.predict(input_example)) # y_pred
# Log model with feature engineering
# We will register in next step, if model is better than the previous one (evaluate_model.py)
fe.log_model(
model=pipeline,
flavor=mlflow.sklearn,
artifact_path="lightgbm-pipeline-model-fe",
training_set=training_set,
signature=signature,
input_example=input_example,
)
model_uri = f"runs:/{run.info.run_id}/lightgbm-pipeline-model-fe"
dbutils.jobs.taskValues.set(key="new_model_uri", value=model_uri) # noqa: F821
logger.info(f"Model registered: {model_uri}")
except Exception as e:
logger.error(f"An error occurred: {e}")
sys.exit(1) # Exit with failure code
The databricks.yml file is triggered using three main Databricks commands that facilitate the automation and management of the workflow:
In this workflow, every task must run successfully before triggering the subsequent task. To ensure the workflow is always up-to-date, we generated new data to refresh the preprocessing step. This allows for retraining of the model and evaluating its performance. If the new model performs better than the previous one, it triggers the deployment of the new model, replacing the old one. If the preprocessing step or model evaluation fails, the subsequent tasks will not be triggered, ensuring the pipeline only moves forward when all conditions are met.
# Databricks asset bundle definition
bundle:
name: mlops-databricks-credit-default
include:
- bundle_monitoring.yml
# These are the default artifact settings if not otherwise overridden in
# the following "targets" top-level mapping.
artifacts:
default:
type: whl
build: uv build
path: .
# These are for any custom variables for use throughout the bundle.
variables:
root_path:
description: root_path for the bundle files
default: /Workspace/Users/${workspace.current_user.userName}/.bundle/${bundle.name}/${bundle.target}/files
git_sha:
description: git_sha
default: 1aa0037267af83a7685bea002a332739563fa3c9
schedule_pause_status:
description: schedule pause status
default: UNPAUSED
# These are the default job and pipeline settings if not otherwise overridden in
resources:
jobs:
credit-default:
name: credit-default-workflow
schedule:
quartz_cron_expression: "0 0 6 ? * MON"
timezone_id: "Europe/Amsterdam"
pause_status: ${var.schedule_pause_status}
tags:
project_name: "credit-default"
job_clusters:
- job_cluster_key: "credit-default-cluster"
new_cluster:
spark_version: "15.4.x-scala2.12"
data_security_mode: "SINGLE_USER"
node_type_id: "i3.xlarge"
driver_node_type_id: "i3.xlarge"
autoscale:
min_workers: 1
max_workers: 1
tasks:
- task_key: "preprocessing"
job_cluster_key: "credit-default-cluster"
existing_cluster_id: 1109-205408-dob4qyuc
spark_python_task:
python_file: "workflows/preprocess.py"
parameters:
- "--root_path"
- ${var.root_path}
libraries:
- whl: ./dist/*.whl
- task_key: if_refreshed
condition_task:
op: "EQUAL_TO"
left: "{{tasks.preprocessing.values.refreshed}}"
right: "1"
depends_on:
- task_key: "preprocessing"
- task_key: "train_model"
depends_on:
- task_key: "if_refreshed"
outcome: "true"
job_cluster_key: "credit-default-cluster"
existing_cluster_id: 1109-205408-dob4qyuc
spark_python_task:
python_file: "workflows/train_model.py"
parameters:
- "--root_path"
- ${var.root_path}
- "--git_sha"
- ${var.git_sha}
- "--job_run_id"
- "{{job.id}}"
libraries:
- whl: ./dist/*.whl
- task_key: "evaluate_model"
depends_on:
- task_key: "train_model"
job_cluster_key: "credit-default-cluster"
existing_cluster_id: 1109-205408-dob4qyuc
spark_python_task:
python_file: "workflows/evaluate_model.py"
parameters:
- "--root_path"
- ${var.root_path}
- "--new_model_uri"
- "{{tasks.train_model.values.new_model_uri}}"
- "--job_run_id"
- "{{job.id}}"
- "--git_sha"
- ${var.git_sha}
libraries:
- whl: ./dist/*.whl
- task_key: model_update
condition_task:
op: "EQUAL_TO"
left: "{{tasks.evaluate_model.values.model_update}}"
right: "1"
depends_on:
- task_key: "evaluate_model"
- task_key: "deploy_model"
depends_on:
- task_key: "model_update"
outcome: "true"
job_cluster_key: "credit-default-cluster"
existing_cluster_id: 1109-205408-dob4qyuc
spark_python_task:
python_file: "workflows/deploy_model.py"
parameters:
- "--root_path"
- ${var.root_path}
libraries:
- whl: ./dist/*.whl
# Two targets
targets:
dev: # AWS West Region workspace
mode: development
default: true
cluster_id: 1109-205408-dob4qyuc
workspace:
host: https://dbc-46617658-3f2b.cloud.databricks.com
prod:
mode: production # Comment out this line when deploying to production, so that "run_as" above is not required
default: false
workspace:
host: https://dbc-46617658-3f2b.cloud.databricks.com
Inference and Monitoring
Files:
In the last lecture of the course, we focused on inference and monitoring to ensure the deployed model performs well in real-time scenarios. After deploying the bundle, we generated synthetic data with drift to simulate the effect of changing data distributions over time, a common challenge in production environments.
To set up monitoring, you first need to enable inference tables in the endpoint to capture requests and generate predictions. Since we were using our existing data with a known ground truth for inference, we merged the inference table with this data. A model_monitoring table with CDF enabled was then created, along with a new monitoring workflow (refresh_monitor.py), which we added to the databricks.yml file. Once the new workflow was triggered, the monitoring table was updated, and two new tables were generated:
monitoring_table = f"{catalog_name}.{schema_name}.model_monitoring"
workspace.quality_monitors.create(
table_name=monitoring_table,
assets_dir=f"/Workspace/Shared/lakehouse_monitoring/{monitoring_table}",
output_schema_name=f"{catalog_name}.{schema_name}",
inference_log=MonitorInferenceLog(
problem_type=MonitorInferenceLogProblemType.PROBLEM_TYPE_CLASSIFICATION,
prediction_col="prediction",
timestamp_col="timestamp",
granularities=["30 minutes"],
model_id_col="model_name",
label_col="default",
),
)
spark.sql(f"ALTER TABLE {monitoring_table} " "SET TBLPROPERTIES (delta.enableChangeDataFeed = true);")
The figure below shows the first updated monitoring windows before and after introducing “drift,” where, after adding drift, the metrics began to decline over time.
Conclusion
In this blog, we’ve explored the end-to-end process of setting up and deploying a model on Databricks, from data preprocessing and feature engineering to model training, deployment, and monitoring. By leveraging tools such as MLflow, Databricks Feature Store, and A/B testing, we were able to streamline model development and ensure that our models are robust and ready for production environments. Additionally, integrating monitoring and drift detection allows us to track model performance over time and address potential issues like model drift effectively.
With this comprehensive approach, Maria and Ba?ak have created a course for building scalable, reproducible, and reliable machine learning pipelines that are adaptable to real-time production requirements. If you’re eager to learn and experiment with Databricks, I encourage you to join their course to unlock its full potential.
If you found this post helpful, consider supporting me by:
Happy coding!