Part 2 - Azure Databricks, Delta Engine and it's Optimizations

Part 2 - Azure Databricks, Delta Engine and it's Optimizations

Building upon my previous blog post where we delved into the Delta engine and its optimization features for files, I would now like to dive into the second and third parts of this topic. Specifically, this blog post will focus on the Delta engine's ability to optimize queries and joins. Without further ado, let's begin!

Optimizing queries?

Optimizing queries using Dynamic File Partitioning (DFP)

Dynamic File Pruning (DFP) is a Delta Lake feature that allows for more efficient data skipping during query execution. It works by examining the filter predicates of a query and determining which files in a Delta table need to be read to fulfill the query. By doing so, DFP skips reading unnecessary files and thus can improve query performance.

DFP is automatically enabled in Delta Lake and does not require any additional configuration. However, in some cases, it may be beneficial to optimize the table to ensure that data is organized in a way that is conducive to efficient DFP. This can be done using the OPTIMIZE command in Delta Lake.

Here is an example to illustrate how DFP works in Databricks:

Let's assume we have a patients table with a patient_id column and a diagnoses table with a patient_id column and a diagnosis_code column, and we want to join the two tables on the patient_id column. We can use DFP to skip unnecessary files during the join operation, as follows:

No alt text provided for this image

In the above example, we can enable DFP for the query by setting the spark.databricks.delta.dynamicFilePruning.enabled configuration option to true. We then join the two tables on the patient_id column and a filter predicate that only includes rows where the diagnosis_code column is "COVID-19". Since we are using a BROADCAST HASH JOIN and the join type is LEFT-SEMI, DFP will be applied automatically for the join operation, skipping any files that do not contain relevant data for the query.

To elaborate further, DFP is applied automatically in Databricks when using the BROADCAST HASH JOIN strategy and the join type is INNER or LEFT-SEMI. In addition, DFP is applied automatically when the inner table is a Delta table and the number of files in the inner table is large enough to justify applying DFP. The threshold for the number of files is configurable using the spark.databricks.delta.dynamicFilePruning.maxFiles parameter, which defaults to 100000.

Hence, depending on the above conditions, the DFP will be applied.?


Optimizing queries using Bloom Filters

Again, I always believe “Old is Gold”. Bloom filters, a technique that exists since the 1970s is used today in delta engine for optimizing their query performance. Let’s first understand how bloom filters work and followed by how Azure Databricks uses those filters.?

So, imagine you're a librarian in a library that has a lot of books. You have a catalog of all the books in the library, but it's pretty big and takes up a lot of space. You also have a lot of patrons who come in and ask you whether the library has a particular book or not. It would be great if you could quickly check whether the library has a book without having to go through the entire catalog, right?

Enter the Bloom filter! It's like a mini-catalog that takes up much less space than the full catalog but can quickly tell you whether a particular book is in the library or not. How does it work? Well, you take each book in the library and use a hash function to map it to a number. Then, you take a bit in the Bloom filter for each of these numbers and set it to 1. When someone asks you whether a particular book is in the library, you hash the book's name and check the corresponding bits in the Bloom filter. If all the bits are 1, you can confidently say that the book is in the library. If any of the bits are 0, you can say that the book is definitely not in the library.

Of course, there's a small chance that a bit could be 1 even though the book isn't in the library (this is called a false positive). But the Bloom filter is designed to keep this chance very small, so you can be pretty confident in your answer. It's like a magic eight ball, but for books!

So, there you have it. A Bloom filter is like a mini-catalog that can quickly tell you whether a book is in the library or not. It's not perfect, but it's pretty darn useful!

Note that Bloom filters are not appropriate for all datasets or query patterns. They are most effective when the columns being indexed have a high cardinality (i.e., a large number of distinct values) and when the queries are selective (i.e., they filter on a small subset of the values in the column). Bloom filters can also produce false positives (i.e., report that a value is in the set when it's not) but not false negatives (i.e., report that a value is not in the set when it is).

We can enable or disable Bloom filters by setting the?spark.databricks.io.skipping.bloomFilter.enabled?session configuration to either?true?or?false, as shown in the screenshot below. This option is already enabled by default in Azure Databricks:

No alt text provided for this image

We can?create Bloom filters using the?CREATE BLOOMFILTER INDEX?command. This applies to all columns on a table or only a subset of them.

CREATE BLOOMFILTER INDEX ON TABLE our_table
FOR COLUMNS(col1 OPTIONS(fpp=0.1), col2 OPTIONS(fpp=0.2))]

In this scenario, we are generating Bloom filters for col1 and col2 columns, where the false positive rates (FPRs) are set at 10% and 20% respectively.

Additionally, we have the option to specify the numItems parameter, which defines the number of unique items that a file may hold, and the maxExpectedFpp parameter, which sets the maximum FPR threshold for which a Bloom filter will not be stored in the disk. Using these parameters, we can efficiently search our data stored in memory, which needs to be computed only once.

(We can also drop all the bloom filters from the table using DROP command. VACUUM command will also drop all the Bloom Filters on the table, but we need to be careful on using this command. It's not recommended just for dropping Bloom Filters on the table)


Optimizing join performance

Range join optimization

I don't have a story here to explain as this has always been a straightforward requirement. I have been writing this blog for quite some time and was wondering if I kept saying “old is gold” but then I realized I should try using ChatGPT at least once in my blog. Else I would not be following the trend these days. So, I decided to create the content for this through our very ChatGPT through the Azure OpenAI portal!

I must say that the model has generated a better story than mine originally.?

Alrighty, so imagine you're throwing a massive party, and you've invited all your friends over to hang out.
Now, as the party gets going, people start forming groups based on their interests. The bookworms gather in one corner, the gamers huddle around the console, and the foodies flock to the snack table.
But what if you want to join two groups together, like the bookworms and the gamers, for a game of literary trivia? How can you do it without disturbing the rest of the party?
Well, that's where range join optimization in Delta Engine comes in!
Imagine that each group is a table, with the people in each group representing rows. Delta Engine uses range partitioning and sorting to organize the rows in each table based on a common key, like "interests". This way, it only needs to read and join the rows that fall within a specific range of interests, like "books" and "games".
So, Delta Engine quickly and efficiently joins the bookworm and gamer tables, without disturbing the rest of the party. It's like magic!
And just like that, the bookworms and gamers are having a blast together, all thanks to the power of range join optimization in Delta Engine. Who knew that big data processing could be so fun?

To enable range join optimization, we perform similar steps as skew join, viz we enable it through hints. Here the hint is called using the RANGE_JOIN syntax on a SELECT statement with parameters such as target table and bin size.?

Let’s go through the example. Let’s take we have 2 tables, our already created covid_data table, and the ranges table.

Let’s create a new table with some new/revised dates.

No alt text provided for this image

Let’s create a ranges_new table with some date ranges we require.

No alt text provided for this image

Let’s join range join optimization for the start and end ranges we created in the ranges_new table.

No alt text provided for this image

Here you will see that out of 6 rows, 4 rows have been extracted as those fall within the date ranges of the ranges_new table.

Ranges can be specified in multiple ways, for example, they can be in bin sizes of percentiles and many other ways. That’s all about range join optimization.


Skew join optimization

In the previous sections, we briefly hinted at how data can get skewed in some of our partitions. Think of this as a natural skewness. For example, even though your partition key that you selected ‘n’ months before worked till date, your data getting skewed with time is natural. This problem is very sensitive for ML models and the predictions go for a toss in such situations. It's similar in distributed computation systems such as Azure Databricks because many tasks are run in parallel, and having one task taking longer than the rest can block executions that have a dependency on this task. This kind of uneven workload problem will mostly affect join operations because the tables big in size need to be reshuffled.?

Hence to tackle this problem, we use skew join optimization hints in Azure Databricks in the same way as we have seen with range join optimization. Hints are specified using a /* syntax to start and /* to end in a SELECT statement. Example:

SELECT /*+ SKEW(‘column_name’) */ *FROM ……WHERE……

The above syntax is just an example of how we can use skew join optimization. This can also be applied to sub-queries. Similarly, we can also specify relationships and columns with the SKEW command. We can also apply hints to multiple columns. An example of this is as follows:

SELECT /*+ SKEW(‘column_name’, (‘emp_id’, ‘h_dept_id’)) */ *
FROM table_name
WHERE emp_id = put_our_condition AND h_dept_id=a_dept_id

Be very careful while applying skew join optimization since it is computationally an intensive operation and should be run only when required. Hence, we just use it on columns that require this operation.?


I appreciate the time that you all have taken! I hope that you enjoyed learning about the Delta engine series in Azure Databricks. Please keep an eye out for my upcoming blog posts on LLMs and Generative AI in Azure.

Anjali Jungwal

Data Analyst || Data Analyst Skilled in SQL, Excel, Tableau, Python Libraries (Numpy, Pandas, Matplotlib, Scipy), Probability and Statistics, EDA Fundamental, Product Analytics

1 年

Hlw Mr. Hitesh hinduja... Can we talk... I really need to talk to u plz reply my comment otherwise accept my request

要查看或添加评论,请登录

Hitesh Hinduja的更多文章

  • Azure Databricks - Delta Engine and it's Optimizations

    Azure Databricks - Delta Engine and it's Optimizations

    Blog Introduction It’s fun and interesting to see the pace at which technology is accelerating. There is no brake pedal…

    5 条评论
  • Machine Learning on Azure PaaS

    Machine Learning on Azure PaaS

    Hi Everyone, Welcome to the series of blogs on Machine Learning (ML) Services in Azure. Most of the time, we know about…

    3 条评论

社区洞察

其他会员也浏览了