MLflow and Databricks for CausalOps
Awadelrahman Ahmed
Databricks MVP | MLflow Ambassador | Data & AI Architect | AWS Community Builder | PhD Fellow in Informatics
In machine learning, most models are designed to find patterns and make predictions based on correlations in data. But in many real-world applications, knowing what causes certain outcomes is even more valuable. This is what causal models are about: instead of simply finding associations, they aim to uncover cause-and-effect relationships. The field of causality (causal discovery, causal inference or causal AI) is critical for making decisions that don’t just respond to patterns but actually influence outcomes.
Taking these causal models from research to real-world applications—a process I’ll refer to as CausalOps—presents unique challenges, as causal models often don’t produce a traditional ML-like model object that can be easily serialized, logged and deployed.
In this article, I’ll walk you through using MLflow and Databricks to create an end-to-end workflow for CausalOps. With MLflow’s new Models from Code feature, you can log and track causal models directly from code, while Databricks Endpoints and Databricks Apps allow for scalable deployment and management.
This article is an outcome of exploring these tools for causal models use case, so this will be more of a practical, hands-on guide. Even though there are proprietary causal platforms, the insights here demonstrate how open-source tools can make causal discovery both accessible and operationally effective!
Set Your Expectations
In this article, we’ll follow a step-by-step process to accomplish our goal:
By the end, you’ll know how to bring causal models into production using MLflow and Databricks, we I will also note specific current limitations that can be faced when using this framework.
Step 1: Create a Causal Model
To kick off our development, we’ll start by creating a causal model using the Peter-Clark (PC) algorithm , a classic causal discovery method based on conditional independence tests.
Causal discovery is the process of identifying cause-and-effect relationships from data, rather than just correlations. It aims to uncover the underlying causal structure that explains how different variables influence each other.
For this, we’re leveraging gCastle , a causal structure learning toolchain developed by Huawei’s Noah’s Ark Lab for causal learning and evaluation, it is a Python toolbox for causal discovery with codebase available here .
In this article, we’ll keep it simple by using the PC algorithm from gCastle, but we can experiment other algorithms in later posts. Here’s a minimal example of taking in a dataframe, learning a PC causal model and outputting a causal matrix (yes, it is as simple as that, thanks to gcastle):
from castle.algorithms import PC
# Initialize the PC algorithm
pc = PC()
# Learn the causal structure from input data
pc.learn(model_input)
# Access the resulting causal matrix
causal_matrix = pc.causal_matrix
Step 2: Logging the Causal Model with MLflow
Now that we know how to create a causal model using the PC algorithm from gCastle, the next step is to think how to log this model with MLflow.
Since causal discovery algorithms like PC are implemented as code rather than as typical model objects, we can’t log them in the same way as conventional ML models. This is where MLflow’s Models from Code feature is extremely useful!
By using mlflow.pyfunc.PythonModel we can log our causal discovery process as a “model” in MLflow, preserving the code and enabling versioning, tracking, and deployment.
mlflow.pyfunc.PythonModel provides a universal interface for defining custom models in MLflow. By wrapping our causal model logic in a Python class that inherits from PythonModel, we can log and manage our causal discovery code like any other MLflow model.
But before this, as we will be using Databricks Unity Catalog (UC) , we should have a quick checklist:
To use this Models from Code feature, we need to have TWO files (find more details about Models from Code Logging in this post ):
1. pc_causal_model.py file:
Contains the model logic, wrapped in a custom class that inherits from mlflow.pyfunc.PythonModel.
import mlflow
import pandas as pd
from mlflow.pyfunc import PythonModel # Base class for custom MLflow models
from mlflow.models import set_model # Function to register the model with MLflow
from castle.algorithms import PC # for causal discovery
# Define a custom MLflow model class
class PCCausalModel(PythonModel):
# Define the predict method, which will be called during inference
def predict(self, context, model_input):
pc = PC() # Initialize the PC algorithm
pc.learn(model_input) # Run the causal discovery process on the input data
return pc.causal_matrix # Return the resulting causal matrix
# Register the model in MLflow, making it loadable and deployable
set_model(model=PCCausalModel())
We import MLflow’s PythonModel and the PC algorithm from castle.algorithms for causal discovery. The PCCausalModel class inherits from PythonModel and we define a predict() method, which initializes the PC algorithm, applies it to model_input to learn causal relationships, and returns the causal matrix.
Finally (and importantly), set_model(model=PCCausalModel()) is to tell MLflow that this is the model to be registered.
This file represents the code script for the model. We create and save it.
We also need another code to log this model code. Yes, we have two scripts: this one we have just saved and a driver code, which can be created as a notebook.
2. driver.ipynb file:
As we mentioned, this is responsible for logging the model defined in pc_causal_model.py to MLflow. But there is one more challenge that is specific to causal models that we should be aware of here:
? Unity Catalog Signature Requirement is a Challenge for Causal Models ?
To successfully log and register models in UC it is required to have a fixed model signature, which can be challenging for causal AI models.
Why? Because, causal models often have variable input structures (e.g., dataframes with different columns) and produce outputs, like causal adjacency matrices, that vary in size based on the input. This conflicts with UC's strict input-output schema requirements.
There are a few ways to work around that, which I am investigating and will share more in a future post. For this particular post, however, we can imagine a causal model with a specific signature defined by our test dataset. I generated a dataset with seven numerical variables, which we’ll use as an example input. The output will then be a 7x7 causal matrix.
Now, if you think this will not work if you pass a different schema, you are right, in this setting it will raise an error, but let us postpone this discussion for now??.
So, now, to allow MLflow to infer the signature, we need an input example (a few rows of the dataset) and output example (our PC.causal_matrix):
import mlflow
import pandas as pd
from castle.algorithms import PC
input_df=pd.read_csv("data/dataset.csv")
input_df.head()
pc=PC()
input_example=input_df.head(50)
pc.learn(input_example)
output_example= pc.causal_matrix
output_example
The output example will look like this, as you see it is a 7x7 matrix :
Then we can infer the signature :
signature = mlflow.models.infer_signature(input_example,output_example)
print(signature)
You might have noticed that the output schema dimension is (-1, 7). This is MLflow's way of interpreting our 7x7 causal matrix as a prediction of n data points with 7 predictions each—something that aligns with typical ML models. For our case, we just need to follow its rules for now, and it still works??.
领英推荐
Because in UC, we need to provide the three name spaces catalog.schema.object, we can quickly create a schema. I name my schema "causal" in a catalog called "models":
%sql
CREATE SCHEMA IF NOT EXISTS models.causal
Then to log the model as code following this syntax:
# This is the path of the file we created earler with the model logic
model_path = "pc_causal_model.py"
# Set the MLflow experiment where this run will be logged
mlflow.set_experiment("/Users/[email protected]/causal_pc")
# Start an MLflow run for logging the model
with mlflow.start_run():
# Log the model to MLflow using the pyfunc interface
model_info = mlflow.pyfunc.log_model(
artifact_path="model", # Specify where to store the model artifacts
python_model=model_path, # Path to the Python file containing the model class
signature=signature, # Define the model signature (input and output schema)
registered_model_name="models.causal.pc" # model name in Model Registry
)
If everything is done correctly, you should be able to see the results reflected in various places:
2. You will also see a registered model with a version, accessible from the Models tab or through the catalog and schema:
It is a good practice that we test our model from the notebook before deploying it:
# Load the model
loaded_model = mlflow.pyfunc.load_model(model_info.model_uri)
# Predict
print(loaded_model.predict(input_df.head(100)))
This should return a causal_matrix:
Step 3: Deploy via Databricks Serving Endpoint
Up to this point, our causal model has been logged as code, registered in MLflow, and tested in our notebook. Now, we’re ready to deploy it to an endpoint! Here’s how we do it:
import requests
import json
# Define the endpoint name for deployment
endpoint_name = "causal_endpoint"
# Specify the registered model name and version in MLflow
model_name = "models.causal.pc"
model_version = 1
# Specify the deployment compute type (options: CPU, GPU_SMALL, GPU_LARGE, etc.)
workload_type = "CPU"
# Specify the compute scale-out size (options: Small, Medium, Large, etc.)
workload_size = "Small"
# Set whether to scale to zero (only applicable for CPU endpoints)
scale_to_zero = False
# Define the API root URL for your Databricks workspace
API_ROOT = "THIS IS YOUR WORKSPACE URL"
# Define your Databricks Personal Access Token (replace with your actual token)
API_TOKEN = "THIS IS YOUR WORKSPACE PERSONAL ACCESS TOKEN"
# Prepare the data payload for the request to create a serving endpoint
data = {
"name": endpoint_name,
"config": {
"served_entities": [
{
"entity_name": model_name,
"entity_version": model_version,
"workload_size": workload_size,
"scale_to_zero_enabled": scale_to_zero,
"workload_type": workload_type,
}
]
},
}
# Define the request headers, including authorization with the API token
headers = {"Context-Type": "text/json", "Authorization": f"Bearer {API_TOKEN}"}
# Make a POST request to create the serving endpoint with the specified configuration
response = requests.post(
url=f"{API_ROOT}/api/2.0/serving-endpoints", json=data, headers=headers
)
# Print the response to confirm if the endpoint was created successfully
print(response) # should be 200 OK successful
If the response is 200, you should see your endpoint in the creating state in the serving tab, and soon will be ready:
Now we can test the model again (you can call it Unit Testing??). Earlier, we tested it by loading it from the registry; now, we’ll test it from the endpoint after deployment. Here’s the code:
# Load input data from a CSV file into a DataFrame
input_df = pd.read_csv("data/dataset.csv")
# Convert the DataFrame to a dictionary format expected by the endpoint (list of records)
data = {
"inputs": input_df.to_dict(orient='records'),
}
# Set up request headers, including the API token for authorization
headers = {"Context-Type": "text/json", "Authorization": f"Bearer {API_TOKEN}"}
# Make a POST request to the deployed model endpoint to get predictions
response = requests.post(
url=f"{API_ROOT}/serving-endpoints/{endpoint_name}/invocations",
json=data,
headers=headers
)
#parse the response
causal_matrix = pd.DataFrame(response.json().get("predictions"))
causal_matrix.values
This should print our causal matrix:
Step 4: Build a Databricks Apps
By Step 3, we’ve completed the core of our deployment: creating the causal model, registering it, deploying it, and receiving responses. We could say congrats and stop here, but we can go a step further and have more fun??.
Imagine a frontend that seamlessly interacts with the endpoint, allowing users to upload their CSV files and receive a causal matrix in return. Even better, we could display the causal graph directly in the frontend.
This is especially exciting because Databricks’ new Databricks Apps feature makes it incredibly easy to implement.
In the next step, we’ll build a Streamlit app using Databricks Apps to interact with our deployed model.
On your Databricks workspace, from the New menu, select New App. Choose Streamlit, and start with the simplest template, "Hello World." We’ll then adjust this template to fit our needs.
After clicking Next, you’ll be prompted to enter an app name and, optionally, a description. Once you click Create, you can monitor the creation progress in the Apps tab under Compute.
You can access the deployed app via the provided link, but more importantly, you’ll also receive a link to the app’s code. This link directs you to the codebase located in a folder called databricks_apps within your default user folder, where we can modify the code as needed.
?? To conserve space, I’ve placed the the app code this repository . I called it Causal Matrix?? ??.
To set up the app, make sure you have these four files in the app code in Databricks. Replace the "streamlit-hello-world-app" app.py code with the content from app.py in the repository, update the app.yaml file with your environment variables, and modify requirements.txt to include necessary dependencies. Copy utils.py as well. I have also uploaded the dataset.csv following the expected schema, so you can try it.
Finally, deploy the app from the App Console in Databricks. Here is how it looks:
Conclusion
Throughout this article, we delved into several key concepts and tools to bridge causal models with operational machine learning workflows.
We leveraged MLflow's Models from Code feature to log causal models as code, a critical step for capturing the logic of causal discovery that traditional model objects don’t encapsulate.
However, this process came with unique challenges (e.g., UC signature requirement), by registering models in the Databricks Unity Catalog, we ensured organized tracking setting a solid foundation for model management.
Finally, deploying the model using Databricks Serving Endpoints and creating a Streamlit app with Databricks Apps allowed us to make causal models accessible through an interactive interface, empowering users to engage with cause-and-effect insights directly.
I hope this article has shown how combining MLflow and Databricks features can make causal AI not only feasible but practically impactful in real-world settings!
Data Scientist at Riachuelo | Machine Learning and Causal Inference | Conformal Prediction | Operations Research
2 周Mariana Moura