Slowly changing dimensions with dbt, Databricks, and MySQL

Slowly changing dimensions with dbt, Databricks, and MySQL

In the previous article, we set up a dbt project running in Databricks. For this article, we will explore how we can source data from an operational database (always work with your database team to make sure the database can handle the load) using the new Databricks Lakehouse Federation.

First, you'll need a database to connect to. In my case I set up a database using Aurora Serverless for MySQL v2. Make sure you have connectivity set up between the database and the Databricks network (in my case as it's a demo database I simply opened it to everything). Databricks has great documentation on how to set this up that you can follow in the previous link on Lakehouse Federation.

Once that's done, let's populate our database with some data so we have something to work with.

We'll first put our database credentials in a secret scope. For this demo we'll simply use the Databricks CLI to store the secrets. You can find out how to install it here and then authenticate. Then create the scope and the secrets. If you were using terraform another approach could be to store AWS credentials in the secret scope and then get the current secret values from AWS.

We can now create data using a simple notebook.

You'll notice that even if we try to print the secrets they are redacted.

Creating the initial table

try:
    # Connect to the MySQL database
    connection = mysql.connector.connect(
        **connection_info
    )
    
    if connection.is_connected():
        db_Info = connection.get_server_info()
        print("Connected to MySQL Server version ", db_Info)
        
        # Create a cursor object
        cursor = connection.cursor()
        
        # SQL statement to create a table
        create_table_query = """
        CREATE TABLE IF NOT EXISTS users (
            id INT AUTO_INCREMENT PRIMARY KEY,
            description VARCHAR(255) NOT NULL
        );
        """
        
        # Execute the SQL command
        cursor.execute(create_table_query)
        print("Table 'users' created successfully.")

        # List of users to insert
        users_to_insert = [
            (f"User Description {x}",)
            for x in range(100)
        ]
        
        # SQL statement for inserting data
        insert_data_query = "INSERT INTO users (description) VALUES (%s)"
        
        # Execute the SQL command to insert data
        cursor.executemany(insert_data_query, users_to_insert)
        print("Users inserted successfully.")
        
        # Commit changes
        connection.commit()

except Error as e:
    print("Error while connecting to MySQL", e)
finally:
    if connection.is_connected():
        cursor.close()
        connection.close()
        print("MySQL connection is closed")        

Another notebook cell so that we can modify the data to get changes to track

try:
    # Connect to the MySQL database
    connection = mysql.connector.connect(
        **connection_info
    )
    
    if connection.is_connected():
        db_Info = connection.get_server_info()
        print("Connected to MySQL Server version ", db_Info)
        
        # Create a cursor object
        cursor = connection.cursor()

        # Query to get all existing user IDs
        cursor.execute("SELECT id FROM users")
        user_ids = cursor.fetchall()  # Fetch all user IDs
        
        if user_ids:
            # Select a random ID for deletion
            user_id_to_delete = random.choice(user_ids)[0]
            
            # Delete the user with the selected ID
            cursor.execute("DELETE FROM users WHERE id = %s", (user_id_to_delete,))
            print(f"Deleted user with ID: {user_id_to_delete}")
            
            # Insert a new user
            new_user_description = "New User Description"
            cursor.execute("INSERT INTO users (description) VALUES (%s)", (new_user_description,))
            new_user_id = cursor.lastrowid
            print(f"Inserted new user with ID: {new_user_id}")
        
        # Query to get all existing user IDs
        cursor.execute("SELECT id FROM users")
        user_ids = [id_tuple[0] for id_tuple in cursor.fetchall()]  # Extract IDs into a list
        
        # Check if there are enough users to update
        if len(user_ids) >= 10:
            # Select a subset of user IDs at random to update
            ids_to_update = random.sample(user_ids, 10)
        else:
            # If less than 10 users, update all available users
            ids_to_update = user_ids
        
        # Update statement for changing the description
        update_query = "UPDATE users SET description = %s WHERE id = %s"
        
        # Updating descriptions for the selected users
        for user_id in ids_to_update:
            new_description = f"Updated Description {user_id} {random.randint(1, 100)}"
            cursor.execute(update_query, (new_description, user_id))
            
        print(f"Updated descriptions for users: {ids_to_update}")
        
        # Commit changes
        connection.commit()

except Error as e:
    print("Error while connecting to MySQL", e)
finally:
    if connection.is_connected():
        cursor.close()
        connection.close()
        print("MySQL connection is closed")        

Now that we have query federation and a database table with data we can read from it through dbt as a source. Let's do a basic source configuration.

We'll first build an scd type 2 table using dbt snapshots.

Configure the snapshot in dbt_project.yml.

We're using the check strategy because our table doesn't have a reliable update time.

Then let's create the snapshot file.

We can run it executing dbt snapshot. Each time you can run the notebook cell that will modify the data to get more interesting results.

So far the users_snapshot table looks like this for me:

We'll schedule the dbt project to execute this snapshot frequently since we'll miss changes between the snapshots. The rest of the dbt project can be scheduled less frequently. Again, check with your database team what frequency the database can handle and also with your data team to know how granular you need the changes to be (spinning up clusters has a cost too). dbt recommends frequencies like once an hour or daily. Beyond that you might want to look into a more advanced change data capture technique such as using Debezium which would stream changes to Apache Kafka by reading the database's bin logs.

Now that we have our snapshots let's build an scd type 6 table on top of the scd type 2 to be able to query it more conveniently. Depending on the size of the data this could be a view, a table, or an incremental materialization if the dimension is really massive.

with current_values as (
    select
        id
      , description
    from {{ ref('users_snapshot')}}
    where dbt_valid_to is null
),
row_values as (
    select
        id
      , description
      , dbt_scd_id
      , dbt_updated_at
      , dbt_valid_from
      , dbt_valid_to
    from {{ ref('users_snapshot')}}
)

select
  row_values.id
, row_values.description as row_description
, coalesce(current_values.description, row_values.description) as current_description
, row_values.dbt_valid_to is null as is_current
, current_values.id is null as is_deleted
, dbt_scd_id
, dbt_updated_at
, dbt_valid_from
, dbt_valid_to


from row_values left join current_values on (row_values.id = current_values.id)        

Let's now generalize the sql query into a reusable macro.

Let's add the macro to our project:

{% macro scd_type_6(model, primary_key, columns) %}

with current_values as (
    select
        {{ primary_key }}
        {% for column in columns %}
        , {{ column }}
        {% endfor %}
    from {{ model }}
    where dbt_valid_to is null
),
row_values as (
    select
        {{ primary_key }}
        {% for column in columns %}
        , {{ column }}
        {% endfor %}
      , dbt_scd_id
      , dbt_updated_at
      , dbt_valid_from
      , dbt_valid_to
    from {{ model }}
)

select
  row_values.{{ primary_key }}
    {% for column in columns %}
    , row_values.{{ column }} as row_{{ column }}
    , coalesce(current_values.{{ column }}, row_values.{{ column }}) as current_{{ column }}
    {% endfor %}
, row_values.dbt_valid_to is null as is_current
, current_values.{{ primary_key }} is null as is_deleted
, dbt_scd_id
, dbt_updated_at
, dbt_valid_from
, dbt_valid_to


from row_values left join current_values on (row_values.{{ primary_key }} = current_values.{{ primary_key }})

{% endmacro %}        

We can now change our model to:

{{ scd_type_6(ref("users_snapshot"), "id", ["description"])}}        

We're now ready to create multiple dimensions!

Databricks Lakehouse Federation is a great tool here that allows us to leverage what we're already using for transformations (dbt) to also do data ingestion without having to set up a separate data ingestion tool. When using it in production, your analytical loads would probably hinder your operational database so a likely configuration would be to only grant permissions on the federated catalog to the service user running the snapshot job and having everyone else query the snapshots. Meanwhile, in development, you might not have this limitation and the query federation will let you easily prototype and explore.

In conclusion, we've used Lakehouse Federation to easily ingest data from MySQL using existing dbt functionality.

Advait Raje

Getting more people on bikes using Data, Northwestern Alumni

1 年

Databricks Lakehouse Federation is such a powerful feature. Similar use case. Just Azure SQL instead of MySQL.

Mohamed Ikbal B.

Data Engineering Expert - Sanofi

1 年

I enjoy the artwork as much as I enjoy the content! ??

要查看或添加评论,请登录

Alexis Chicoine的更多文章

  • Evolving Delta Tables

    Evolving Delta Tables

    Delta tables are great, but finding out how to alter them isn't always straightforward because there's many different…

    1 条评论
  • Avoid bad performance from optimizing too soon with liquid clustering

    Avoid bad performance from optimizing too soon with liquid clustering

    Liquid clustering is a new technology to organize delta tables. It allows you to set clustering keys on a table and to…

  • Some unusual joins

    Some unusual joins

    If you've done any sql, you should be quite familiar with inner join, left join, full join, and maybe cross join. All…

  • Using AI to convert json to terraform

    Using AI to convert json to terraform

    Introduction If you're using terraform for Databricks like me, you might have wished you could get terraform code out…

  • Grow your skills with duckdb

    Grow your skills with duckdb

    I've recently been discovering duckdb and it has been a useful tool for working with data. It's a relational database…

    1 条评论
  • Serverless or Pennyless?

    Serverless or Pennyless?

    In my previous article on streaming, I used delta live tables from Databricks in a classical mode with vms living in my…

  • Let's burn some cash streaming

    Let's burn some cash streaming

    In this part 3 on streaming with delta live tables in Databricks, I'll share with you my experience of scaling things…

    2 条评论
  • Let's stream some more

    Let's stream some more

    In my last article on streaming, we used delta live tables to read from a kinesis stream. In this article, we'll…

  • Let's get streaming

    Let's get streaming

    If you've been in a situation like mine where you need to run expensive batch jobs every 5 minutes or less, you might…

  • Sqlmesh empowering local development Part 2

    Sqlmesh empowering local development Part 2

    In my previous article on using sqlmesh to develop pipelines locally, I showed how you can use the transpilation powers…

    2 条评论

社区洞察

其他会员也浏览了