Marvelous MLOps #57: Building an End-to-end MLOps Project with Databricks

Marvelous MLOps #57: Building an End-to-end MLOps Project with Databricks

Author: Benito Martin

Last October, I had the privilege of enrolling in the newly launched MLOps Course on Databricks, led by Maria Vechtomova and Ba?ak Eskili. Developing a project and gaining expert insights and best practices from industry leaders is always an excellent opportunity, no matter how much — or how little — you know about deploying a model into production.

In this blog, I’ll walk through my capstone project. The course covered a range of topics, and, although I have experience in some of them, this was my first time using Databricks. If you are new into MLOps, I can highlight the following key learnings you will gain from the live lectures and project development:

  • End-to-End Model Deployment on Databricks: Understand how to preprocess data, engineer features, train models, and deploy them using Databricks’ platform.
  • Feature Engineering with Databricks Feature Store: Learn how to create feature tables, implement Change Data Feed (CDF), and leverage Databricks Feature Store to ensure consistent feature computation across training and inference.
  • Experiment Tracking with MLflow: Gain experience in tracking experiments, logging parameters, metrics, and models, and ensuring reproducibility in machine learning workflows.
  • Model Serving Architectures: Explore different model serving architectures (feature serving, model serving, and model serving with feature lookup) to deploy models efficiently in production environments.
  • A/B Testing for Model Comparison: Understand how to implement A/B testing to compare models with different hyperparameters and route predictions based on the model’s performance.
  • Databricks Asset Bundles (DAB): Learn how to manage Databricks projects using Infrastructure-as-Code (IaC) principles with Databricks Asset Bundles for automation and CI/CD.
  • Monitoring and Drift Detection: Set up monitoring for deployed models, track metrics, and detect model drift over time using tools like Databricks’ inference tables and custom monitoring workflows.
  • Continuous Integration and Deployment (CI/CD): Implement CI/CD workflows to ensure that models are continuously validated, deployed, and replaced based on performance improvements.
  • Scalability and Real-Time Production Readiness: Understand how to scale models for production, handle real-time serving, and ensure that models remain adaptable to dynamic data.

You can find the complete code for this project in this repository.

Getting Started

The first step is to create an account on Databricks and connect it to a cloud provider. I chose AWS since I’m more familiar with it, but you can use Azure or GCP as alternatives.

Once your account is set up, the next step is to create a Databricks workspace. This workspace is essential as it provisions the necessary resources for your project using an AWS CloudFormation template (or its equivalent for other providers). The provisioning process takes a few minutes, and once complete, you can open the workspace to access all the tools needed to develop the project, such as the Unity Catalog, Job Runs, and Bundles.

Pro Tip: When creating the workspace, ensure you select one of the regions that supports online tables, as these are required for feature serving during the course. I initially used “eu-central-1” but had to switch to “eu-west-1” later on.

To use any of the tools on Databricks, you’ll need to set up a compute resource (cluster) as well. While working with AWS, I discovered that creating a workspace automatically provisions two AWS resources that run continuously: a NAT Gateway and an Elastic IP. Together, these cost around $2/day. To optimize costs, I developed two scripts (available here) that allow you to delete and recreate these resources as needed.

Project Repository

At the start of the course, Maria and Ba?ak create a dedicated project repository for each participant. This repository, which you need to clone, contains the main project files and dependencies to get you started.

Right from the beginning, you can see their emphasis on maintaining high project quality. The repository includes:

  • Ruff for linting and formatting.
  • Pre-commit hooks to automate code checks.
  • A well-structured pyproject.toml file for managing dependencies (both development and production) and package configurations and building.

For package management, we used uv, a lightweight and intuitive tool that simplifies dependency and virtual environment management. Initially, the repository used setuptools as the build system. For my project, however, I decided to switch to the hatchling backend.

[project]
name = "credit-default-databricks"
version = "0.0.11"                                               
description = "MLOps Project with Databricks: Credit Default Dataset"
keywords = ["MLOps", "Databricks", "Machine Learning"]
authors = [{ name = "Benito Martin"}]
readme = "README.md"
requires-python = ">=3.11,<3.12"                              
dependencies = ["lightgbm>=4.5.0, <5",
                "scikit-learn>=1.5.1, <2",
                "cloudpickle>=3.0.0, <4",
                "mlflow==2.17.2",                              
                "numpy>=1.26.4, <2",
                "pandas>=2.2.2, <3",
                "pyarrow>=14.0.0, <15",                        
                "cffi>=1.17.1, <2",
                "scipy>=1.14.1, <2",
                "matplotlib>=3.9.2, <4",
                "databricks-feature-engineering==0.6",         
                "databricks-feature-lookup==1.2.0",            
                "imbalanced-learn>=0.11.0, <1",                
                "loguru>=0.6.0, <1",                           
                "pre-commit>=2.20.0, <3",                      
                "pydantic==2.9.0, <3",                         
                "databricks-sdk==0.32.0",                      
                ]

[project.optional-dependencies]
dev = ["databricks-connect>=15.4.1, <16",
       "ipykernel>=6.29.5, <7",
       "pip>=24.2",
       "pytest>=8.1.1, <9",                                   
       "ruff>=0.2.0, <1" ,                                     
       "python-dotenv>=1.0.0, <2"                              
]

# Build system configuration
[build-system]                                                
requires = ["hatchling>=1.23.0"]
build-backend = "hatchling.build"

[tool.hatch.build]
packages = ["src"]

# Wheel build configuration
[tool.hatch.build.targets.wheel]
packages = ["src/credit_default"]

[tool.ruff]
line-length = 120

[tool.ruff.lint]
select = [
    "F",  # pyflakes rules
    "E",  # pycodestyle error rules
    "W",  # pycodestyle warning rules
    "B",  # flake8-bugbear rules
    "I",  # isort rules
]
ignore = [
    "E501",  # line-too-long
]

[tool.ruff.format]
indent-style = "space"        

During the first lecture, they showed how to connect and authenticate your Databricks workspace with VS Code by downloading the Databricks extension. This integration allows you to run your code locally while synchronizing with the Databricks UI.

One of the features I found particularly interesting in Databricks is how Jupyter notebooks created in the Databricks UI are automatically transformed into Python files when synchronized with your local environment. This functionality makes it much easier to refine your code into a clean, structured Python script, ideal for production-ready workflows or further development outside the notebook environment.

Prepare the Data

Files:

  • data_cleaning_spark.py
  • data_preprocessing_spark.py
  • prepare_data_notebook.py
  • utils.py

Once familiar with the Databricks environment, we had to choose a datasetfor the project. In my case, I selected a binary classification dataset related to credit card default payments. This dataset provides financial and demographic information on credit card clients in Taiwan, covering the period from April to September 2005. It includes 25 variables, such as credit limits, payment history, bill statements, and repayment statuses, alongside demographic factors like age, gender, education, and marital status. The target variable, default.payment.next.month, indicates whether a client defaulted on their payment in the following month.

First, I saved the raw dataset into a Volume in the Unity Catalog, along with the Python wheel package. This package can be directly installed on the cluster, enabling all jobs to run with the required dependencies.

Next, I created several files for data cleaning, preprocessing (including scaling and train/test split), as well as a configuration file for logging and data validation using Pydantic to ensure the data met defined schemas and constraints. This is crucial, as if the data does not fulfill certain requirements, the downstream processes won’t work.

Finally, I saved both the train and test datasets into the Databricks Unity Catalog as Spark Delta tables by running the prepare_data_notebook.py that trigger the preprocessing steps.

class DataPreprocessor:
    """
    A class for preprocessing credit default data, including scaling features.

    Attributes:
        data_cleaning (DataCleaning): An instance of the DataCleaning class used for data preprocessing.
        cleaned_data (pd.DataFrame): The cleaned DataFrame after preprocessing.
        features_robust (list): List of feature names for robust scaling.
        X (pd.DataFrame): Features DataFrame after cleaning.
        y (pd.Series): Target Series after cleaning.
        preprocessor (ColumnTransformer): ColumnTransformer for scaling the features.
    """

    def __init__(self, filepath: str, config: Config, spark: SparkSession):
        """
        Initializes the DataPreprocessor class.

        Args:
            filepath (str): The path to the CSV file containing the data.
            config (Config): The configuration model containing preprocessing settings.
        """
        self.catalog_name = config.catalog_name
        self.schema_name = config.schema_name
        self.spark = spark

        try:
            # Initialize DataCleaning to preprocess data
            logger.info("Initializing data cleaning process")
            self.data_cleaning = DataCleaning(filepath, config, spark)
            self.cleaned_data = self.data_cleaning.preprocess_data()
            logger.info("Data cleaning process completed")

            # Define robust features for scaling from config
            self.features_robust = config.features.robust

            # Define features and target
            self.X = self.cleaned_data.drop(columns=[target.new_name for target in config.target])
            self.y = self.cleaned_data[config.target[0].new_name]

            # Set up the ColumnTransformer for scaling
            logger.info("Setting up ColumnTransformer for scaling")
            self.preprocessor = ColumnTransformer(
                transformers=[
                    ("robust_scaler", RobustScaler(), self.features_robust)  # Apply RobustScaler to selected features
                ],
                remainder="passthrough",  # Keep other columns unchanged
            )
        except KeyError as e:
            logger.error(f"KeyError encountered during initialization: {str(e)}")
            raise
        except Exception as e:
            logger.error(f"An error occurred during initialization: {str(e)}")
            raise

    def get_processed_data(self) -> Tuple:
        """
        Retrieves the processed features, target, and preprocessor.

        Returns:
            Tuple: A tuple containing:
                - pd.DataFrame: The features DataFrame.
                - pd.Series: The target Series.
                - ColumnTransformer: The preprocessor for scaling.
        """
        try:
            logger.info("Retrieving processed data and preprocessor")
            logger.info(f"Feature columns in X: {self.X.columns.tolist()}")

            # Log shapes of processed data
            logger.info(f"Data preprocessing completed. Shape of X: {self.X.shape}, Shape of y: {self.y.shape}")

            return self.X, self.y, self.preprocessor

        except Exception as e:
            logger.error(f"An error occurred during data preprocessing: {str(e)}")

    def split_data(self, test_size=0.2, random_state=42) -> Tuple[pd.DataFrame, pd.DataFrame]:
        """Split the cleaned DataFrame into training and test sets."""
        X_train, X_test, y_train, y_test = train_test_split(
            self.X, self.y, test_size=test_size, random_state=random_state
        )
        train_set = pd.concat([X_train, y_train], axis=1)
        test_set = pd.concat([X_test, y_test], axis=1)
        return train_set, test_set

    def save_to_catalog(self, train_set: pd.DataFrame, test_set: pd.DataFrame, spark: SparkSession):
        """Save the train and test sets into Databricks tables."""
        train_set_with_timestamp = spark.createDataFrame(train_set).withColumn(
            "Update_timestamp_utc", to_utc_timestamp(current_timestamp(), "UTC")
        )

        test_set_with_timestamp = spark.createDataFrame(test_set).withColumn(
            "Update_timestamp_utc", to_utc_timestamp(current_timestamp(), "UTC")
        )

        train_set_with_timestamp.write.mode("overwrite").saveAsTable(
            f"{self.catalog_name}.{self.schema_name}.train_set"
        )

        test_set_with_timestamp.write.mode("overwrite").saveAsTable(f"{self.catalog_name}.{self.schema_name}.test_set")

        spark.sql(
            f"ALTER TABLE {self.catalog_name}.{self.schema_name}.train_set "
            "SET TBLPROPERTIES (delta.enableChangeDataFeed = true);"
        )

        spark.sql(
            f"ALTER TABLE {self.catalog_name}.{self.schema_name}.test_set "
            "SET TBLPROPERTIES (delta.enableChangeDataFeed = true);"
        )        

Experiment Tracking and Feature Engineering

File: feature_mlflow_experiment_notebook.py

Once the train and test datasets were pre-processed, the next step was to conduct a series of experiments using MLflow, ranging from basic model training to advanced custom model functions and leveraging Feature Tables from the Databricks Feature Store.

The Feature Store acts as a centralized repository that helps data scientists to:

  1. Reuse and share features across teams.
  2. Maintain consistency by ensuring the same code computes features for both training and inference.

For the project we focused on the feature engineering model, where we created our Feature Table based on the train set specific columns, a primary key as constraint and enabled Change Data Feed (CDF) to track changes/updates. This is a critical point as CDF is not enabled by default and during the inference process, we added additional data to simulate model/data drift, which is common in production environments.

# First, create the feature table with original data
create_table_sql = f"""
CREATE OR REPLACE TABLE {config.catalog_name}.{config.schema_name}.features_balanced
(Id STRING NOT NULL,
 {', '.join([f'{col} DOUBLE' for col in columns])})
"""
spark.sql(create_table_sql)

# Add primary key and enable Change Data Feed (CDF)
spark.sql(
    f"ALTER TABLE {config.catalog_name}.{config.schema_name}.features_balanced ADD CONSTRAINT features_balanced_pk PRIMARY KEY(Id);"
)
spark.sql(
    f"ALTER TABLE {config.catalog_name}.{config.schema_name}.features_balanced SET TBLPROPERTIES (delta.enableChangeDataFeed = true);"
)

# Create training set based on feature table
training_set = fe.create_training_set(
    df=train_set,
    label="Default",
    feature_lookups=[
        FeatureLookup(
            table_name=f"{config.catalog_name}.{config.schema_name}.features_balanced",
            feature_names=columns,
            lookup_key="Id",
        )
    ],
    exclude_columns=["Update_timestamp_utc"],
)        

After creating the Feature Table, we trained a model and registered it in Unity Catalog using the feature engineering options on Databricks and the signature option on MLflow, which is mandatory to define the input and outputs formats and ensures seamless model integration. Without a signature, the model cannot be registered in the Unity Catalog.

Additionally, when a model is trained and logged with a Feature Table, it automatically captures the feature specifications used during training. During inference, it retrieves and joins the appropriate features from the corresponding Feature Tables.

# Load feature-engineered DataFrame
training_df = training_set.load_df().toPandas()
test_set = spark.table(f"{config.catalog_name}.{config.schema_name}.test_set").toPandas()

# Split features and target (exclude 'Id' from features)
X_train = training_df[columns]
y_train = training_df["Default"]
X_test = test_set[columns]
y_test = test_set["Default"]

features_robust = [
    "Limit_bal",
    "Bill_amt1",
    "Bill_amt2",
    "Bill_amt3",
    "Bill_amt4",
    "Bill_amt5",
    "Bill_amt6",
    "Pay_amt1",
    "Pay_amt2",
    "Pay_amt3",
    "Pay_amt4",
    "Pay_amt5",
    "Pay_amt6",
]

# Setup preprocessing and model pipeline
preprocessor = ColumnTransformer(
    transformers=[("robust_scaler", RobustScaler(), features_robust)],
    remainder="passthrough",
)

# Create the pipeline with preprocessing and the LightGBM classifier
pipeline = Pipeline(steps=[("preprocessor", preprocessor), ("classifier", LGBMClassifier(**parameters))])


# Set and start MLflow experiment
mlflow.set_experiment(experiment_name="/Shared/credit-feature")

with mlflow.start_run(tags={"branch": "serving"}) as run:
    run_id = run.info.run_id
    pipeline.fit(X_train, y_train)
    y_pred = pipeline.predict(X_test)

    # Calculate and print metrics
    auc_test = roc_auc_score(y_test, y_pred)

    print("Test AUC:", auc_test)

    # Log model parameters, metrics, and model
    mlflow.log_param("model_type", "LightGBM with preprocessing")
    mlflow.log_params(parameters)
    mlflow.log_metric("AUC", auc_test)

    signature = infer_signature(model_input=X_train, model_output=y_pred)

    # Log model with feature engineering
    fe.log_model(
        model=pipeline,
        flavor=mlflow.sklearn,
        artifact_path="lightgbm-pipeline-model-feature",
        training_set=training_set,
        signature=signature,
    )


mlflow.register_model(
    model_uri=f"runs:/{run_id}/lightgbm-pipeline-model-feature",
    name=f"{config.catalog_name}.{config.schema_name}.credit_model_feature",
)        

The power of combining MLflow with the Databricks Feature Store enables streamlined experiment tracking and artifact logging, ensuring reproducibility and consistent feature computation across training and inference, which reduces errors. Additionally, enabling CDF allows the model to dynamically handle updates, making it well-suited for production scenarios.

Serving Architectures

File: model_serving_feat_lookup_notebook.py

The previous Feature Table that I created is an offline table, which can be used for offline inference using the registered model. However, for real-time serving through a dedicated endpoint, Online Tables must be used. Online Tables are serverless, read-only replicas of the Feature Table, designed to deliver low-latency, high-throughput access to data. These tables support real-time or batch serving by enabling the model to read precomputed features from the online store at inference time. The model then joins these features with the input data provided by the client request to the serving endpoint. To transfer data from the Feature Table to the Online Table, a Delta Live Table (DLT) pipeline is employed.

During the course, multiple architectures were explored:

  1. Feature Serving: This approach delivers precomputed feature values directly from the feature store to downstream applications or services. It is often used in batch or real-time scenarios where models rely on consistent, high-quality features prepared ahead of inference.
  2. Model Serving: This involves deploying a trained model as a service endpoint. At inference time, the client sends raw input data to the endpoint, and the model performs all necessary preprocessing and predictions in real-time.
  3. Model Serving with Feature Lookup: A hybrid approach where the model serving endpoint integrates with the feature store. During inference, the model retrieves precomputed features from the feature store (via Online Tables) and combines them with client-provided input data for prediction. This method ensures consistency between training and serving while optimizing for low latency and high throughput.

The course focused primarily on Model Serving with Feature Lookup due to its ability to integrate seamlessly with feature stores, enabling robust and scalable real-time inference workflows.

# Create online table using the features table as source
spec = OnlineTableSpec(
    primary_key_columns=["Id"],
    source_table_full_name=f"{catalog_name}.{schema_name}.features_balanced",
    run_triggered=OnlineTableSpecTriggeredSchedulingPolicy.from_dict({"triggered": "true"}),
    perform_full_copy=False,
)

online_table_name = f"{catalog_name}.{schema_name}.features_balanced_online"
on_table = OnlineTable(name=online_table_name, spec=spec)

# ignore "already exists" error
try:
    online_table_pipeline = workspace.online_tables.create(table=online_table)

except Exception as e:
    if "already exists" in str(e):
        pass
    else:
        raise e

# Create endpoint
workspace.serving_endpoints.create(
    name="credit-default-model-serving-feature",
    config=EndpointCoreConfigInput(
        served_entities=[
            ServedEntityInput(
                entity_name=f"{catalog_name}.{schema_name}.credit_model_feature",
                scale_to_zero_enabled=True,
                workload_size="Small",
                entity_version=1,
            )
        ]
    ),
)
        

In addition to exploring different serving architectures, A/B testing was implemented to evaluate and compare the performance of two models trained with different hyperparameters using a custom MLflow function. The process used a wrapper class to distribute requests between two models (Model A and Model B) based on a deterministic hashing of the input Id. This ensured that the same input would always be routed to the same model during the testing phase.

## Wrapper that takes both models and send predictions to one
class CreditDefaultModelWrapper(mlflow.pyfunc.PythonModel):
    def __init__(self, models):
        self.models = models
        self.model_a = models[0]
        self.model_b = models[1]

    def predict(self, context, model_input):
        if isinstance(model_input, pd.DataFrame):
            credit_id = str(model_input["Id"].values[0])  # Id number
            hashed_id = hashlib.md5(credit_id.encode(encoding="UTF-8")).hexdigest()

            # convert a hexadecimal (base-16) string into an integer
            if int(hashed_id, 16) % 2:
                predictions = self.model_a.predict(model_input.drop(["Id"], axis=1))
                return {"Prediction": predictions[0], "model": "Model A"}

            else:
                predictions = self.model_b.predict(model_input.drop(["Id"], axis=1))
                return {"Prediction": predictions[0], "model": "Model B"}

        else:
            raise ValueError("Input must be a pandas DataFrame.")


models = [model_A, model_B]
wrapped_model = CreditDefaultModelWrapper(models)        

Databricks Asset Bundles (DAB)

Files:

  • preprocess.py
  • train_model.py
  • evaluate_model.py
  • deploy_model.py
  • databricks.yml

Databricks Asset Bundles (DAB) are an Infrastructure-as-Code (IaC)framework designed to streamline the management and deployment of Databricks projects. By organizing code, configurations, and workflows into a cohesive structure, DAB enables the automation and version control of pipelines, ensuring consistency, scalability, reproducibility, and seamless Continuous Integration and Deployment (CI/CD) across environments.

DAB components like settings, resources, workflows, cluster, variables, and target environments are defined in the databricks.yml file. All previous steps, including data preprocessing, experiment tracking, model training, evaluation, and deployment, are transferred into well-formatted Python scripts. These scripts are then included as tasks within the databricks.ymlfile, forming a unified workflow.

Below is an example corresponding to the train_model.py workflow, which integrates key parameters like the Git SHA and job_run_id for ensuring code traceability and reproducibility across the pipeline.

# Set up logging
setup_logging(log_file="")

try:
    # Parse arguments
    parser = argparse.ArgumentParser()
    parser.add_argument("--root_path", action="store", default=None, type=str, required=True)
    parser.add_argument("--git_sha", action="store", default=None, type=str, required=True)
    parser.add_argument("--job_run_id", action="store", default=None, type=str, required=True)
    args = parser.parse_args()

    root_path = args.root_path
    git_sha = args.git_sha
    job_run_id = args.job_run_id
    logger.debug(f"Git SHA: {git_sha}")
    logger.debug(f"Job Run ID: {job_run_id}")
    logger.info("Parsed arguments successfully.")

    # Load configuration
    logger.info("Loading configuration...")
    config_path = f"{root_path}/project_config.yml"
    config = load_config(config_path)
    logger.info("Configuration loaded successfully.")

    # Initialize Databricks workspace client
    workspace = WorkspaceClient()
    logger.info("Databricks workspace client initialized.")

    # Initialize Spark session
    spark = SparkSession.builder.getOrCreate()
    fe = feature_engineering.FeatureEngineeringClient()
    logger.info("Spark session initialized.")

    # Extract configuration details
    catalog_name = config.catalog_name
    schema_name = config.schema_name
    target = config.target[0].new_name
    parameters = config.parameters
    features_robust = config.features.robust
    columns = config.features.clean
    columns_wo_id = columns.copy()
    columns_wo_id.remove("Id")

    # Convert train and test sets to Pandas DataFrames
    train_pdf = spark.table(f"{catalog_name}.{schema_name}.train_set").toPandas()
    test_set = spark.table(f"{catalog_name}.{schema_name}.test_set").toPandas()

    # Now use create_training_set to create balanced training set
    # Drop the original features that will be looked up from the feature store
    # Define the list of columns you want to drop, including "Update_timestamp_utc"

    feature_table_name = f"{catalog_name}.{schema_name}.features_balanced"
    columns_to_drop = columns_wo_id + ["Update_timestamp_utc"]
    train_set = spark.table(f"{catalog_name}.{schema_name}.train_set").drop(*columns_to_drop)
    logger.info(f"Train set columns for feature table: {train_set.columns}")

    # Create training set from feature table
    mlflow.set_tracking_uri("databricks")
    mlflow.set_registry_uri("databricks-uc")

    training_set = fe.create_training_set(
        df=train_set,
        label=target,
        feature_lookups=[
            FeatureLookup(
                table_name=feature_table_name,
                feature_names=columns_wo_id,
                lookup_key="Id",
            )
        ],
        exclude_columns=["Update_timestamp_utc"],
    )
    training_df = training_set.load_df().toPandas()
    logger.info("Training set created and loaded.")

    # Prepare train and test datasets
    X_train = training_df[columns_wo_id]
    y_train = training_df[target]
    X_test = test_set[columns_wo_id]
    y_test = test_set[target]

    # Define preprocessing and pipeline
    preprocessor = ColumnTransformer(
        transformers=[("robust_scaler", RobustScaler(), features_robust)],
        remainder="passthrough",
    )
    pipeline = Pipeline(steps=[("preprocessor", preprocessor), ("classifier", LGBMClassifier(**parameters))])

    # MLflow setup
    mlflow.set_experiment(experiment_name="/Shared/credit-feature")
    logger.info("MLflow setup completed.")

    # Train model and log in MLflow
    with mlflow.start_run(tags={"branch": "bundles", "git_sha": git_sha, "job_run_id": job_run_id}) as run:
        pipeline.fit(X_train, y_train)
        y_pred = pipeline.predict(X_test)
        auc_test = roc_auc_score(y_test, y_pred)
        logger.info(f"Test AUC: {auc_test}")

        # Log model details
        mlflow.log_param("model_type", "LightGBM with preprocessing")
        mlflow.log_params(parameters)
        mlflow.log_metric("AUC", auc_test)
        input_example = X_train.iloc[:5]
        signature = infer_signature(model_input=input_example, model_output=pipeline.predict(input_example))  # y_pred

        # Log model with feature engineering
        # We will register in next step, if model is better than the previous one (evaluate_model.py)
        fe.log_model(
            model=pipeline,
            flavor=mlflow.sklearn,
            artifact_path="lightgbm-pipeline-model-fe",
            training_set=training_set,
            signature=signature,
            input_example=input_example,
        )
        model_uri = f"runs:/{run.info.run_id}/lightgbm-pipeline-model-fe"
        dbutils.jobs.taskValues.set(key="new_model_uri", value=model_uri)  # noqa: F821
        logger.info(f"Model registered: {model_uri}")

except Exception as e:
    logger.error(f"An error occurred: {e}")
    sys.exit(1)  # Exit with failure code        

The databricks.yml file is triggered using three main Databricks commands that facilitate the automation and management of the workflow:

  1. databricks bundle validate - Validates all definitions in the bundle, ensuring that everything is properly configured before deployment.
  2. databricks bundle deploy - Deploys the bundle, setting up the defined workflows and resources in Databricks.
  3. databricks bundle run - Executes the tasks defined in the bundle, starting from the preprocessing step and proceeding through model training, evaluation, and deployment.

In this workflow, every task must run successfully before triggering the subsequent task. To ensure the workflow is always up-to-date, we generated new data to refresh the preprocessing step. This allows for retraining of the model and evaluating its performance. If the new model performs better than the previous one, it triggers the deployment of the new model, replacing the old one. If the preprocessing step or model evaluation fails, the subsequent tasks will not be triggered, ensuring the pipeline only moves forward when all conditions are met.

# Databricks asset bundle definition
bundle:
  name: mlops-databricks-credit-default

include:
  - bundle_monitoring.yml

# These are the default artifact settings if not otherwise overridden in
# the following "targets" top-level mapping.
artifacts:
  default:
    type: whl
    build: uv build
    path: .

# These are for any custom variables for use throughout the bundle.
variables:
  root_path:
    description: root_path for the bundle files
    default: /Workspace/Users/${workspace.current_user.userName}/.bundle/${bundle.name}/${bundle.target}/files
  git_sha:
    description: git_sha
    default: 1aa0037267af83a7685bea002a332739563fa3c9
  schedule_pause_status:
    description: schedule pause status
    default: UNPAUSED

# These are the default job and pipeline settings if not otherwise overridden in
resources:
  jobs:
    credit-default:
      name: credit-default-workflow
      schedule:
        quartz_cron_expression: "0 0 6 ? * MON"
        timezone_id: "Europe/Amsterdam"
        pause_status: ${var.schedule_pause_status}
      tags:
        project_name: "credit-default"
      job_clusters:
        - job_cluster_key: "credit-default-cluster"
          new_cluster:
            spark_version: "15.4.x-scala2.12"
            data_security_mode: "SINGLE_USER"
            node_type_id: "i3.xlarge"
            driver_node_type_id: "i3.xlarge"
            autoscale:
              min_workers: 1
              max_workers: 1

      tasks:

        - task_key: "preprocessing"
          job_cluster_key: "credit-default-cluster"
          existing_cluster_id: 1109-205408-dob4qyuc
          spark_python_task:
            python_file: "workflows/preprocess.py"
            parameters:
              - "--root_path"
              - ${var.root_path}
          libraries:
           - whl: ./dist/*.whl

        - task_key: if_refreshed
          condition_task:
            op: "EQUAL_TO"
            left: "{{tasks.preprocessing.values.refreshed}}"
            right: "1"
          depends_on:
            - task_key: "preprocessing"

        - task_key: "train_model"
          depends_on:
            - task_key: "if_refreshed"
              outcome: "true"
          job_cluster_key: "credit-default-cluster"
          existing_cluster_id: 1109-205408-dob4qyuc
          spark_python_task:
            python_file: "workflows/train_model.py"
            parameters:
              - "--root_path"
              - ${var.root_path}
              - "--git_sha"
              - ${var.git_sha}
              - "--job_run_id"
              - "{{job.id}}"
          libraries:
            - whl: ./dist/*.whl

        - task_key: "evaluate_model"
          depends_on:
            - task_key: "train_model"
          job_cluster_key: "credit-default-cluster"
          existing_cluster_id: 1109-205408-dob4qyuc
          spark_python_task:
            python_file: "workflows/evaluate_model.py"
            parameters:
              - "--root_path"
              - ${var.root_path}
              - "--new_model_uri"
              - "{{tasks.train_model.values.new_model_uri}}"
              - "--job_run_id"
              - "{{job.id}}"
              - "--git_sha"
              - ${var.git_sha}
          libraries:
            - whl: ./dist/*.whl

        - task_key: model_update
          condition_task:
            op: "EQUAL_TO"
            left: "{{tasks.evaluate_model.values.model_update}}"
            right: "1"
          depends_on:
            - task_key: "evaluate_model"
        - task_key: "deploy_model"
          depends_on:
            - task_key: "model_update"
              outcome: "true"
          job_cluster_key: "credit-default-cluster"
          existing_cluster_id: 1109-205408-dob4qyuc
          spark_python_task:
            python_file: "workflows/deploy_model.py"
            parameters:
              - "--root_path"
              - ${var.root_path}
          libraries:
            - whl: ./dist/*.whl


# Two targets
targets:

  dev: # AWS West Region workspace
    mode: development
    default: true
    cluster_id: 1109-205408-dob4qyuc
    workspace:
      host: https://dbc-46617658-3f2b.cloud.databricks.com

  prod:
    mode: production # Comment out this line when deploying to production, so that "run_as" above is not required
    default: false
    workspace:
      host: https://dbc-46617658-3f2b.cloud.databricks.com        

Inference and Monitoring

Files:

  • create_inference_data.py
  • lakehouse_monitoring.py
  • send_request_to_endpoint.py
  • refresh_monitor.py
  • bundle_monitoring.yml

In the last lecture of the course, we focused on inference and monitoring to ensure the deployed model performs well in real-time scenarios. After deploying the bundle, we generated synthetic data with drift to simulate the effect of changing data distributions over time, a common challenge in production environments.

To set up monitoring, you first need to enable inference tables in the endpoint to capture requests and generate predictions. Since we were using our existing data with a known ground truth for inference, we merged the inference table with this data. A model_monitoring table with CDF enabled was then created, along with a new monitoring workflow (refresh_monitor.py), which we added to the databricks.yml file. Once the new workflow was triggered, the monitoring table was updated, and two new tables were generated:

  • A profile metrics table, containing summary statistics for each column and for each combination of time window, slice, and grouping columns.
  • A drift metrics table, containing statistics that track changes in the distribution of a metric.

monitoring_table = f"{catalog_name}.{schema_name}.model_monitoring"

workspace.quality_monitors.create(
    table_name=monitoring_table,
    assets_dir=f"/Workspace/Shared/lakehouse_monitoring/{monitoring_table}",
    output_schema_name=f"{catalog_name}.{schema_name}",
    inference_log=MonitorInferenceLog(
        problem_type=MonitorInferenceLogProblemType.PROBLEM_TYPE_CLASSIFICATION,
        prediction_col="prediction",
        timestamp_col="timestamp",
        granularities=["30 minutes"],
        model_id_col="model_name",
        label_col="default",
    ),
)

spark.sql(f"ALTER TABLE {monitoring_table} " "SET TBLPROPERTIES (delta.enableChangeDataFeed = true);")        

The figure below shows the first updated monitoring windows before and after introducing “drift,” where, after adding drift, the metrics began to decline over time.

Conclusion

In this blog, we’ve explored the end-to-end process of setting up and deploying a model on Databricks, from data preprocessing and feature engineering to model training, deployment, and monitoring. By leveraging tools such as MLflow, Databricks Feature Store, and A/B testing, we were able to streamline model development and ensure that our models are robust and ready for production environments. Additionally, integrating monitoring and drift detection allows us to track model performance over time and address potential issues like model drift effectively.

With this comprehensive approach, Maria and Ba?ak have created a course for building scalable, reproducible, and reliable machine learning pipelines that are adaptable to real-time production requirements. If you’re eager to learn and experiment with Databricks, I encourage you to join their course to unlock its full potential.

If you found this post helpful, consider supporting me by:

  • Clapping and following me on Medium! ?? ?? ??
  • Follow my Github ?? ?? ??
  • Starring the repo ???
  • Share my content on LinkedIn! ??????
  • Buy me a coffee or support me on GitHub Sponsors ??????
  • Hiring me! ????????????

Happy coding!

要查看或添加评论,请登录

Marvelous MLOps的更多文章

社区洞察

其他会员也浏览了