The net caught the lakehouse
In a previous article, I talked about how you can easily use Databrick's integration with dbt to run it in jobs. In this article, it looks like some fisher's lost net got possessed and the lakehouse ended up enmeshed (pun intended). If you're still wondering I'll be talking about how you can run sqlmesh in Databricks quite easily thanks to the wonderful work the team at Tobiko Data has done.
You can leverage my experience
I've gone through this already so you can save some headaches. If you try to run a notebook for sqlmesh using the direct git integration in the task, there are issues as sqlmesh tries to write files to a location and the permissions appear to be missing in this mode. To skip this headache, simply use the repos integration in Databricks or put your files directly in the workspace. There's even a new feature to put these git repos wherever you want in the workspace. You can look at this documentation if you need an automated way to update the repo with new commits.
Doing a basic setup
In a previous article I explored different options for installing sqlmesh in Databricks. As the straight pip install in the notebook worked great, we'll stick with that. To make it a bit faster when developping, I also installed it on the cluster (you can skip the cell or if you run it it'll just make a check and finish a lot faster).
So that's our first cell for our notebook that will run Databricks.
%pip install sqlmesh[databricks]
The next step is to import sqlmesh so that we can access the great magic commands that sqlmesh comes with. That's our second cell (you'll see there's not that many we need).
import sqlmesh
We'll then take a small detour to look at our sqlmesh config.yml and see how we set up the gateway for Databricks.
gateways:
databricks_job:
connection:
type: databricks
catalog: sqlmesh_upstream
That's it! There's no need for any information about the cluster or credentials which is truely amazing. Sqlmesh will simply use the spark session available in the notebook in the job. You can control under which user the job runs in the job itself.
We'll then set the context (back to our notebook) for sqlmesh to work with our project in our thid cell.
%context --gateway databricks_job ./models/../
I recommend you put the notebook in the project directly so that it can always be the same or very similar (could use job parameters) across projects. Specifying . doesn't work, but the sqlmesh team is very responsive in their slack and suggested this way which works.
We just need one final fourth cell to get sqlmesh to actually run.
%plan --auto-apply --run
There is a run command also that you can use if you've already applied the plan for example if you do synchronized deployments with the CI/CD bot. Here we're doing auto-apply and run so that if we pushed new code that we already planned in dev and verified it will deploy asynchronously on the next run. Otherwise it will simply run.
And that's it! In four lines and a tiny config we got sqlmesh to run in Databricks notebooks. From there you just have to create a regular notebook job to run at the frequency of your fastest model and sqlmesh will take care of running each model according to its configured schedule.
Improving our setup
Our simple setup works alright for development, but it's currently using Databricks for the state connection. Sqlmesh maintains a lot of information about its state in tables and does a lot of small operations that just aren't what Databricks (Snowflake is a bit better for this but not ideal either) is meant for. To speed things up, we'll set up a postgres database to serve as the state connection. With Databricks Lakehouse Federation we'll have an easy way to query these tables if we want to.
Before that, let's explore a quick option you can use in development which is to set a duckdb state connection.
gateways:
databricks_job:
connection:
type: databricks
catalog: sqlmesh_upstream
state_connection:
type: duckdb
database: state.db
This will make all state operations quite fast and will persist across runs through the workspace storage, but it's not as robust as using a postgres database so let's see how we might set that up.
I'm working on AWS on my own account so I'll be using RDS to set up a postgres database. We don't need much capacity and don't want to pay so let's just follow the instructions for the free tier template.
If you do this at work check with your infra, networking, or other contacts what VPC you should use, but for simplicity for us we'll just put it in the VPC that was created when we set up Databricks.
To make sure the connection will work in Databricks, I simply enabled all of the security groups
To remain on the theme of security, we'll store the host, port, user, password, etc in a secret scope. To keep things simple, we'll simply use the Databricks CLI to do that.
领英推荐
We'll use a small script to create the database in postgres and a user for sqlmesh to use.
import psycopg2
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
def create_db_and_user():
# Connect to the PostgreSQL server
secret_scope = "postgres"
conn = psycopg2.connect(
dbname='postgres',
user=dbutils.secrets.get(secret_scope, "user"),
password=dbutils.secrets.get(secret_scope, "password"),
host=dbutils.secrets.get(secret_scope, "host"),
port=dbutils.secrets.get(secret_scope, "port")
)
# Automatically commit any action without needing to call conn.commit() explicitly
conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
# Create a cursor object
cur = conn.cursor()
# Step 1: Create the new database
try:
cur.execute('CREATE DATABASE sqlmesh_state;')
print("Database sqlmesh_state created successfully")
except Exception as e:
print(f"An error occurred: {e}")
sqlmesh_user = dbutils.secrets.get(secret_scope, "sqlmesh_username")
sqlmesh_user_password = dbutils.secrets.get(secret_scope, "sqlmesh_user_password")
# Step 2: Create a new user with full access to the new database
try:
cur.execute(f"CREATE USER {sqlmesh_user} WITH PASSWORD '{sqlmesh_user_password}';")
cur.execute(f"GRANT ALL PRIVILEGES ON DATABASE sqlmesh_state TO {sqlmesh_user};")
print(f"User {sqlmesh_user} created successfully and granted full access to sqlmesh_state")
except Exception as e:
print(f"An error occurred: {e}")
# Close the cursor and connection
cur.close()
conn.close()
create_db_and_user()
This script was brought to you by ChatGPT! (I edited it a bit to use the values from the scope)
The script uses our credentials in the secret scope to create a new database and user for storing sqlmesh's state information.
We can set up query federation to see it in Databricks. (At this point the catalog would be mostly empty, but I made this after already running sqlmesh.)
We can see what intervals our models ran for. We'll see how to leverage this to create a custom sensor-like task to connect two separate sqlmesh projects in a future article.
We now need to make a few modifications to our sqlmesh config and then to the notebook to use this new state connection.
gateways:
databricks_job:
connection:
type: databricks
catalog: sqlmesh_upstream
state_connection:
type: postgres
database: sqlmesh_state
host: {{ env_var('POSTGRES_HOST')}}
port: {{ env_var('POSTGRES_PORT')}}
user: {{ env_var('POSTGRES_USER')}}
password: {{ env_var('POSTGRES_PASSWORD')}}
We add a new state connection and the details will come from environment variables. Sqlmesh supports both a system with environment variables with automatic names from the path or the approach I'm using here which I prefer that is to use this jinja syntax like you would do in dbt.
We'll then adjust our notebook so that it fills in these environment variables using the values in our secret store.
All we have to do is add this cell before the %context cell.
import os
scope = "postgres"
os.environ["POSTGRES_HOST"] = dbutils.secrets.get(scope, "host")
os.environ["POSTGRES_PORT"] = dbutils.secrets.get(scope, "port")
os.environ["POSTGRES_USER"] = dbutils.secrets.get(scope, "sqlmesh_username")
os.environ["POSTGRES_PASSWORD"] = dbutils.secrets.get(scope, "sqlmesh_user_password")
That's it! Now sqlmesh will be using our postgres database to manage its state. It will be fast and resilient.
While we're updating our config, let's also increase the concurrent_tasks as sqlmesh defaults it to 1 for Databricks.
gateways:
databricks_job:
connection:
type: databricks
catalog: sqlmesh_upstream
concurrent_tasks: 4
state_connection:
type: postgres
database: sqlmesh_state
host: {{ env_var('POSTGRES_HOST')}}
port: {{ env_var('POSTGRES_PORT')}}
user: {{ env_var('POSTGRES_USER')}}
password: {{ env_var('POSTGRES_PASSWORD')}}
We can go modify our model's schedule directly in the workspace files and then rerun the %plan cell to see what happens.
Our example doesn't have a lot of models to really showcase concurrent tasks, but with our state connection to postgres there's a lot less queries that need to run on our cluster and that speeds things up a lot. With postgres as the state connection it ran in 18 seconds while with Databricks it took 51 seconds.
Here's the equivalent run with the Databricks state connection
That's 35 steps vs 9 so let's just give Databricks the chance to shine by offloading these tiny transactions it's not meant for to a cheap postgres instance. Your runtimes and your wallet will thank you for this little bit of extra work. We can even query the state with the Lakehouse Federation so it's a really good setup.
Conclusion
In this article, I've shown you how to set up a job in Databricks to run sqlmesh easily. I showed you that sqlmesh can use the cluster's spark session for super easy connectivity and how to use a postgres database for sqlmesh's state to speed things up.