The net caught the lakehouse

The net caught the lakehouse

In a previous article, I talked about how you can easily use Databrick's integration with dbt to run it in jobs. In this article, it looks like some fisher's lost net got possessed and the lakehouse ended up enmeshed (pun intended). If you're still wondering I'll be talking about how you can run sqlmesh in Databricks quite easily thanks to the wonderful work the team at Tobiko Data has done.

You can leverage my experience

I've gone through this already so you can save some headaches. If you try to run a notebook for sqlmesh using the direct git integration in the task, there are issues as sqlmesh tries to write files to a location and the permissions appear to be missing in this mode. To skip this headache, simply use the repos integration in Databricks or put your files directly in the workspace. There's even a new feature to put these git repos wherever you want in the workspace. You can look at this documentation if you need an automated way to update the repo with new commits.

Doing a basic setup

In a previous article I explored different options for installing sqlmesh in Databricks. As the straight pip install in the notebook worked great, we'll stick with that. To make it a bit faster when developping, I also installed it on the cluster (you can skip the cell or if you run it it'll just make a check and finish a lot faster).

So that's our first cell for our notebook that will run Databricks.

%pip install sqlmesh[databricks]        

The next step is to import sqlmesh so that we can access the great magic commands that sqlmesh comes with. That's our second cell (you'll see there's not that many we need).

import sqlmesh        

We'll then take a small detour to look at our sqlmesh config.yml and see how we set up the gateway for Databricks.

gateways:
    databricks_job:
        connection:
            type: databricks
            catalog: sqlmesh_upstream        

That's it! There's no need for any information about the cluster or credentials which is truely amazing. Sqlmesh will simply use the spark session available in the notebook in the job. You can control under which user the job runs in the job itself.

We'll then set the context (back to our notebook) for sqlmesh to work with our project in our thid cell.

%context --gateway databricks_job ./models/../        

I recommend you put the notebook in the project directly so that it can always be the same or very similar (could use job parameters) across projects. Specifying . doesn't work, but the sqlmesh team is very responsive in their slack and suggested this way which works.

We just need one final fourth cell to get sqlmesh to actually run.

%plan --auto-apply --run        

There is a run command also that you can use if you've already applied the plan for example if you do synchronized deployments with the CI/CD bot. Here we're doing auto-apply and run so that if we pushed new code that we already planned in dev and verified it will deploy asynchronously on the next run. Otherwise it will simply run.

And that's it! In four lines and a tiny config we got sqlmesh to run in Databricks notebooks. From there you just have to create a regular notebook job to run at the frequency of your fastest model and sqlmesh will take care of running each model according to its configured schedule.

Improving our setup

Our simple setup works alright for development, but it's currently using Databricks for the state connection. Sqlmesh maintains a lot of information about its state in tables and does a lot of small operations that just aren't what Databricks (Snowflake is a bit better for this but not ideal either) is meant for. To speed things up, we'll set up a postgres database to serve as the state connection. With Databricks Lakehouse Federation we'll have an easy way to query these tables if we want to.

Before that, let's explore a quick option you can use in development which is to set a duckdb state connection.

gateways:
    databricks_job:
        connection:
            type: databricks
            catalog: sqlmesh_upstream
        state_connection:
            type: duckdb
            database: state.db        

This will make all state operations quite fast and will persist across runs through the workspace storage, but it's not as robust as using a postgres database so let's see how we might set that up.

I'm working on AWS on my own account so I'll be using RDS to set up a postgres database. We don't need much capacity and don't want to pay so let's just follow the instructions for the free tier template.

If you do this at work check with your infra, networking, or other contacts what VPC you should use, but for simplicity for us we'll just put it in the VPC that was created when we set up Databricks.

To make sure the connection will work in Databricks, I simply enabled all of the security groups

To remain on the theme of security, we'll store the host, port, user, password, etc in a secret scope. To keep things simple, we'll simply use the Databricks CLI to do that.

We'll use a small script to create the database in postgres and a user for sqlmesh to use.

import psycopg2
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT

def create_db_and_user():
    # Connect to the PostgreSQL server
    secret_scope = "postgres"
    conn = psycopg2.connect(
        dbname='postgres',
        user=dbutils.secrets.get(secret_scope, "user"),
        password=dbutils.secrets.get(secret_scope, "password"),
        host=dbutils.secrets.get(secret_scope, "host"),
        port=dbutils.secrets.get(secret_scope, "port")
    )
    
    # Automatically commit any action without needing to call conn.commit() explicitly
    conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
    
    # Create a cursor object
    cur = conn.cursor()

    # Step 1: Create the new database
    try:
        cur.execute('CREATE DATABASE sqlmesh_state;')
        print("Database sqlmesh_state created successfully")
    except Exception as e:
        print(f"An error occurred: {e}")
    
    sqlmesh_user = dbutils.secrets.get(secret_scope, "sqlmesh_username")
    sqlmesh_user_password = dbutils.secrets.get(secret_scope, "sqlmesh_user_password")
    # Step 2: Create a new user with full access to the new database
    try:
        cur.execute(f"CREATE USER {sqlmesh_user} WITH PASSWORD '{sqlmesh_user_password}';")
        cur.execute(f"GRANT ALL PRIVILEGES ON DATABASE sqlmesh_state TO {sqlmesh_user};")
        print(f"User {sqlmesh_user} created successfully and granted full access to sqlmesh_state")
    except Exception as e:
        print(f"An error occurred: {e}")
    
    # Close the cursor and connection
    cur.close()
    conn.close()

create_db_and_user()
        

This script was brought to you by ChatGPT! (I edited it a bit to use the values from the scope)

The script uses our credentials in the secret scope to create a new database and user for storing sqlmesh's state information.

We can set up query federation to see it in Databricks. (At this point the catalog would be mostly empty, but I made this after already running sqlmesh.)

We can see what intervals our models ran for. We'll see how to leverage this to create a custom sensor-like task to connect two separate sqlmesh projects in a future article.

We now need to make a few modifications to our sqlmesh config and then to the notebook to use this new state connection.

gateways:
    databricks_job:
        connection:
            type: databricks
            catalog: sqlmesh_upstream
        state_connection:
            type: postgres
            database: sqlmesh_state
            host: {{ env_var('POSTGRES_HOST')}}
            port: {{ env_var('POSTGRES_PORT')}}
            user: {{ env_var('POSTGRES_USER')}}
            password: {{ env_var('POSTGRES_PASSWORD')}}        

We add a new state connection and the details will come from environment variables. Sqlmesh supports both a system with environment variables with automatic names from the path or the approach I'm using here which I prefer that is to use this jinja syntax like you would do in dbt.

We'll then adjust our notebook so that it fills in these environment variables using the values in our secret store.

All we have to do is add this cell before the %context cell.

import os
scope = "postgres"
os.environ["POSTGRES_HOST"] = dbutils.secrets.get(scope, "host")
os.environ["POSTGRES_PORT"] = dbutils.secrets.get(scope, "port")
os.environ["POSTGRES_USER"] = dbutils.secrets.get(scope, "sqlmesh_username")
os.environ["POSTGRES_PASSWORD"] = dbutils.secrets.get(scope, "sqlmesh_user_password")        

That's it! Now sqlmesh will be using our postgres database to manage its state. It will be fast and resilient.

While we're updating our config, let's also increase the concurrent_tasks as sqlmesh defaults it to 1 for Databricks.

gateways:
    databricks_job:
        connection:
            type: databricks
            catalog: sqlmesh_upstream
            concurrent_tasks: 4
        state_connection:
            type: postgres
            database: sqlmesh_state
            host: {{ env_var('POSTGRES_HOST')}}
            port: {{ env_var('POSTGRES_PORT')}}
            user: {{ env_var('POSTGRES_USER')}}
            password: {{ env_var('POSTGRES_PASSWORD')}}        

We can go modify our model's schedule directly in the workspace files and then rerun the %plan cell to see what happens.

Our example doesn't have a lot of models to really showcase concurrent tasks, but with our state connection to postgres there's a lot less queries that need to run on our cluster and that speeds things up a lot. With postgres as the state connection it ran in 18 seconds while with Databricks it took 51 seconds.

Here's the equivalent run with the Databricks state connection

That's 35 steps vs 9 so let's just give Databricks the chance to shine by offloading these tiny transactions it's not meant for to a cheap postgres instance. Your runtimes and your wallet will thank you for this little bit of extra work. We can even query the state with the Lakehouse Federation so it's a really good setup.

Conclusion

In this article, I've shown you how to set up a job in Databricks to run sqlmesh easily. I showed you that sqlmesh can use the cluster's spark session for super easy connectivity and how to use a postgres database for sqlmesh's state to speed things up.


要查看或添加评论,请登录

Alexis Chicoine的更多文章

  • Evolving Delta Tables

    Evolving Delta Tables

    Delta tables are great, but finding out how to alter them isn't always straightforward because there's many different…

    1 条评论
  • Avoid bad performance from optimizing too soon with liquid clustering

    Avoid bad performance from optimizing too soon with liquid clustering

    Liquid clustering is a new technology to organize delta tables. It allows you to set clustering keys on a table and to…

  • Some unusual joins

    Some unusual joins

    If you've done any sql, you should be quite familiar with inner join, left join, full join, and maybe cross join. All…

  • Using AI to convert json to terraform

    Using AI to convert json to terraform

    Introduction If you're using terraform for Databricks like me, you might have wished you could get terraform code out…

  • Grow your skills with duckdb

    Grow your skills with duckdb

    I've recently been discovering duckdb and it has been a useful tool for working with data. It's a relational database…

    1 条评论
  • Serverless or Pennyless?

    Serverless or Pennyless?

    In my previous article on streaming, I used delta live tables from Databricks in a classical mode with vms living in my…

  • Let's burn some cash streaming

    Let's burn some cash streaming

    In this part 3 on streaming with delta live tables in Databricks, I'll share with you my experience of scaling things…

    2 条评论
  • Let's stream some more

    Let's stream some more

    In my last article on streaming, we used delta live tables to read from a kinesis stream. In this article, we'll…

  • Let's get streaming

    Let's get streaming

    If you've been in a situation like mine where you need to run expensive batch jobs every 5 minutes or less, you might…

  • Sqlmesh empowering local development Part 2

    Sqlmesh empowering local development Part 2

    In my previous article on using sqlmesh to develop pipelines locally, I showed how you can use the transpilation powers…

    2 条评论

社区洞察

其他会员也浏览了