Azure Databricks - Delta Engine and it's Optimizations

Azure Databricks - Delta Engine and it's Optimizations

Blog Introduction

It’s fun and interesting to see the pace at which technology is accelerating. There is no brake pedal to it. When I started writing this blog, ChatGPT was in news everywhere, and now it's more than that, ChatGPT, GPT-4, and Co-pilot are all over. Let me tell you, readers, it's not just AI, we at Microsoft have more for all of you soon!

My previous blog was on Machine Learning on Azure PaaS. Here you go in case anyone has missed it. In this blog, I will be talking more about Azure Databricks, specifically about “Delta Engine and its optimizations”. You will be learning some interesting optimizations that have happened in Delta. Since every blog has been my past learnings, I try to share here the need of why we felt the need for these optimizations and some backstory around it. So, tighten your seat belts and story time soon!!

Let’s start with Azure Open AI ChatGPT before my story. Readers should tell me which story they liked the most, “A bot story or my personal story”. Let’s get started.


What is Delta Lake?

Output from Azure Open AI ChatGPT Playground:

“Imagine you're at a potluck, and everyone brings their own dish to share. Some people bring delicious, well-prepared dishes, but others bring questionable casseroles that may or may not be safe to eat. That's where Delta Lake comes in - it's like the food safety inspector at the potluck, making sure that all the dishes are properly cooked, labeled, and safe to eat. So next time you're at a potluck, just remember that Delta Lake is the food safety superhero that saves the day!”

Yes, this is exactly what Delta Lake's definition is. Now let’s deep dive a bit and understand why we had a need for delta lake over and above data lakes.?

Why is there a need for Delta Lake?

The need for improved data storage and processing capabilities has led to the emergence of Data Lakes. As these tools have become more adept at collecting and managing data, organizations are faced with a challenging decision: should they opt for a traditional Data Lake solution or invest in more modern solutions such as Delta Lake?

A Data Lake is a centralized repository that allows you to store all your structured and unstructured data at any scale, without having to first structure or clean it. You can run different types of analytics on your data, from dashboards and visualizations to big data processing, real-time analytics, and machine learning.

While Data Lakes are suitable for storing all kinds of data, they lack some critical features. For instance, they do not support ACID transactions, data quality enforcement, or consistency/isolation. This makes it almost impossible to mix appends and reads, and batch and stream jobs.

Delta Lake offers a more efficient way to analyze large datasets. Its ability to continuously append new data without running ETL jobs makes it easier for developers and analysts to maintain an updated version of their data lake. Moreover, Delta Lake enables efficient versioning, snapshotting, and time travel operations on the stored dataset. This allows users to keep detailed histories of how their datasets have changed over time while still maintaining current versions.

Delta Lake also offers many other features that enable comprehensive management of Big Data projects. Data engineers can achieve dynamic schema support through schema enforcement and schema evolution capabilities.

The key difference between Data Lakes and Delta Lakes lies in their approach to storing data. Traditional Data Lakes store all records in one system, while Delta Lakes employs an optimized version of data storage known as Optimized Lake Format. This format enables users to access the same data from multiple applications while simultaneously updating databases with fresh information.

The benefits of Delta Lake extend beyond mere efficiency. Compared to traditional Data Lakes, Delta Lake offers numerous advantages that make it a much better solution for organizations looking to maximize their potential with data management. For instance, it provides greater speed in querying large datasets through index optimization techniques, which can drastically improve query performance. Additionally, it boasts built-in support for ACID (Atomicity Consistency Isolation Durability) compliance, ensuring reliable replication across multiple nodes within the cluster as well as immediate visibility into any discrepancies or changes occurring during read/write operations in real-time.

Thanks Patnaikuni Krishna Chaitanya for contributing to the "Need for Delta Lake" section.

Let’s now get into story time by studying optimizations in Delta and their backstory:


Optimizations in Delta

We learned about Delta Engine which is the query engine of Delta Lake above. What if I tell you that this powerful engine has some amazing inbuilt optimizations that allow us to optimize the data in our delta lake in multiple ways? But before telling you this, I would love to narrate a story to you all on how these things come into action and how the need felt for these inbuilt optimizations.?

It was in the year 2013/2014 when I was handling petabytes of data in my ETL workloads. It was a strong Hadoop ecosystem that we had built where Flume, Kafka, MapReduce, HBase & HDFS were our hero services. Then comes our Apache Spark which gained massive popularity in the year 2014 onwards. We all know how Hadoop MapReduce was replaced by Spark which in the end leverages the Hadoop MapReduce distributed computing framework as its foundation. It was built on all the limitations of MapReduce (namely the challenges of having multiple passes over the data, and each pass to be written as a separate MapReduce job). I would avoid getting into many details for the context of our blog.?

Now one question that always came to my mind was what if the data keeps on increasing and needs a powerful computation engine to process the data “on-the-fly” with seamless streaming services? That’s where Spark came into existence. Spark limited its scope to a computing engine. This means that Spark handles loading data from storage systems and performs computations on it and not permanent storage as the end itself. This allowed the spark to be used with a wide variety of persistent storage systems that include Azure Storage, AWS S3, and many more. With this, several things were born, and the most important was “optimizations in spark”.?

Optimizations in spark is a huge area and I would avoid covering here. But to share with you some of my experiences when I started working in Spark, I had to use several optimizations to have faster processing of data. Some of them were:

  1. Use coalesce() over repartition()
  2. Caching the data for frequent query runs
  3. Being very careful with shuffle partitions
  4. Pheww, the broadcast join strategy?

And the list goes on. I can generate a list of at least 20-25 optimizations that were required and are also very much required today. However, with time, the engines are becoming smarter. The computation engines are also coming with some of the inbuilt optimizations where with a sip of a tea/coffee, we enjoy faster processing.

We have 3 categories of optimizations in Delta: -

1) Optimizing files in Delta Engine

2) Optimizing queries in Delta Engine

3) Optimizing Join Performances

In part-1 of this blog, we will be covering Optimizing files in Delta Engine and the part-2 of the blog will cover rest of two techniques.

Without any further delay, let me deep dive into these amazing optimizations that we have today with more coming in the future:

Optimizing file management with Delta Engine

Delta engine has strong optimized layouts and improved data indexing. Today we see below the type of optimizations we have with the added benefit of these operations taking place in an automatic manner, just by using Delta Lake

1) Merging small files using bin-packing

Let me share with you some of my past experiences. People working in Hadoop might have heard of problems with “small files and MapReduce, small files and HDFS”. This was a very classic problem that millions of small files management had. This is a big topic of discussion but to just give you a glimpse, the Hadoop community came up with several optimizations like HAR files, etc that suited the HDFS to a certain extent by putting less load on namenode memory. In nutshell, several things came up and HBase was ideal for dealing with “Large amount of small files” due to its architectural style. However, where there is a gap, there is a need. This always remained a topic of interest and became a part of the optimization trajectory. This was a glimpse of the struggle to share with all my readers and give them a background story of why such optimizations exist today. As a reader, I always focus that back story and roots play a major role in learning a new skill/technique/technology, etc.?

Coming to Delta Engine, allows us to coalesce small files into larger files. This technique is called “bin-packing”. Now you all might be surprised to know that this is not something totally new. The context of packing smaller files into the smallest number of larger, more efficient files is borrowed from a famous mathematical analysis named “Z-ordering”. This existed almost 25+ years before. Bin-packing is like Z-ordering, an algorithm that was originally conceived to optimize how to pack objects of different volumes into a finite number of bins while using the smallest bins possible. Hence, I always believe “Old is Gold” ??

Let’s see how we use this:

Since this table is small, hence the optimization will not change anything on the files holding the table data, as can be seen in the following screenshot:

No alt text provided for this image

Delta Lake's OPTIMIZE command is used to optimize tables, resulting in better query performance by scanning a smaller number of files. This optimization is idempotent, meaning it takes effect only the first time it is triggered. The Snapshot Isolation feature ensures table integrity during optimization, so readers of the table are not affected. However, OPTIMIZE cannot be run automatically because it requires upfront small files in the directory. Compression during optimization is also computationally expensive and can affect low-latency streams. How often tables should be optimized depends on the trade-off between performance and cost of the optimization resources.


2) Skipping the data

So, it was in the year 2016 while I was working on one SQL query (in one of my past companies), and I was already aware that since this query has a huge data to scan and output results, it always took me at least 25-30 minutes (sometimes more when it's in working hours due to shared cluster) for the run to be completed. The fun part is, since I ran that query several times in several days, I used to wonder why is the SQL engine not having any AI kind of system that allows the query to scan all that data where the actual output of the query resides ??. Say, If I know that there are 30 columns and I just need an output of 4 columns from that with some additional processing, why do I even worry about the remaining 26 columns, can’t there be an engine that maybe just skims through those columns and do a faster scan rather than doing normal scanning. But sometimes it feels funny to think about such useless ideas (especially when you have nothing to do while the query runs). Fast forwarding today, let's welcome the feature of “Skipping data” in the delta engine. Let's see what it does:

So, as the new data arrives in the lake, Delta Lake keeps track of file-level statistics related to the input/output granularity of the data. The best part is, these statistics are obtained automatically and stores information on the minimum and the maximum values at the query time. These statistics help us identify columns that have a certain level of correlation, plan queries accordingly, and avoid unnecessary I/O operations. It makes me feel wow at times. During lookup queries, our Azure Databricks Data Skipping feature will learn the obtained statistics to skip files that are not necessary to read in this operation.?

Limiting the context of the blog, this feature univariately as you saw is great but when you combine this with partitioning, it works quite well. Hopefully, in my future blogs, I would want to demo to you all how it works like a charm. But, let me tell you something, the Data Skipping feature is an indexing probabilistic technique, and when there is a word probability, there are false positives always. We don't need to configure this feature as it is activated by default and applied whenever possible, but we can configure how many columns we want to track during the statistics through parameter dataSkippingNumIndexedCols. Now if you read the previous statement, this means that the operation is bounded to the underlying structure of your data. Every AI/Probabilistic technique is always bounded to the structure of the data today, even if it's Jarvis of Iron Man!

That’s it about this feature. I wish I could have spoken more about this feature and how it works with other features (ALTER table etc) but limiting the length of the blog, this is all about it.


3) Z-order clustering

This is also one of the most liked optimization features of the delta. Haha, I don’t have a story here. But let's understand it this way. Imagine you have data scattered across all the files and when you run the queries, every single time it has to be read against that query (similar to the problem I narrated above). Now, this leads to an increase in the metadata size with several other issues of directories and file compressions. We all know partitions help here to some extent and they try to make our distribution of information more even. What if we help our partitions by making the data more structured for them? Here comes the Z-ordering feature. This feature of delta lake tries to allocate related data in the same location. This can significantly improve the performance of our queries on the table. More than that, this feature combined with the data-skipping method to read files and partition files can be very helpful together.?

I created a table with sample data and then performed a z-order for further optimization.

No alt text provided for this image

Now, you all might be getting a question, say if we cluster similar data together, does it remain evenly distributed? Please do not get confused, this operation is used to relocate related files but not to evenly distribute data. Now, can we say this can lead to data skewness? Yes, you are right, we will see in the further blog how we deal with skewness in such scenarios.


4) Managing data recency

In the previous chapter, we learned that by default, querying a Delta table retrieves the latest version of the table, as Delta tables are automatically updated. However, we can modify this behavior to suit our needs, especially for historical analysis when updated tables are not necessary. This can lead to faster query execution, reducing latency.

To set this behavior, we can use the spark.databricks.delta.stalenessLimit option, which takes a time-string value as a parameter. This option applies only to the current session, so other users querying the table are not affected. The time parameter is in milliseconds and can be set within a range, for example, from 1 hour to 1 day. When the staleness limit is reached, the table state is updated.

It is essential to note that this parameter does not prevent table updates but rather avoids queries from waiting for the table to be updated.


5) Understanding checkpoints

Delta Lake uses checkpoints to aggregate the state of a Delta table for computational purposes. Checkpoints are written every 10 commits and stored in the _delta_log directory of a table's path. This process saves time as Delta Lake no longer needs to read large JSON files representing a transaction log to obtain the latest state.

Checkpoints hold information on all transactions and modifications made to a table until the current state, removing invalid actions by following reconciliation rules. This reduces the overall size of the log and makes it more efficient to read for reconstructing snapshots. Checkpoints are vital for snapshot isolation and access to previous data versions.

Statistical information on column usage is also stored in checkpoints for Data Skipping purposes.

See below the table properties that have been set to “True” for checkpointing

No alt text provided for this image

Similarly, we can also do this on the streaming data frame by writing the code in python. An example of checkpointing streaming data frame in python is below?

covid_data.writeStream

??.format("parquet")

??.option("path", output_path)

??.option("checkpointLocation", checkpoint_path)

??.start()

This code starts a streaming query that writes the data from the covid_data DataFrame to a Parquet file at the output_path location. The checkpoint_path option specifies the location where the query metadata will be stored to enable fault tolerance and exactly-once processing. You can replace output_path and checkpoint_path with the appropriate paths for your use case.


6) Automatically optimizing files with Delta Engine

a) Optimized Writes

b) Auto Compaction

Delta Engine offers the Auto Optimize feature to enhance the performance of Delta tables. It simplifies the management of Delta tables by automatically consolidating small files during DML operations in larger tables and maintaining clean metadata. The feature includes two options for implementation: Optimized Writes and Auto Compaction. Optimized Writes ensures that table partitions are evenly arranged into 128 MB-sized files that represent each partition, enhancing write operations. Auto Compaction is a command that runs after each write operation with a set compaction size of 128 MB.

Auto Optimize is particularly useful for frequently modified tables, such as those that handle low-latency data streams or undergo frequent table merges or high rates of data ingestion. To enable this feature, you need to enable the relevant options, such as our_delta_table.autoOptimize.optimizeWrite and our_delta_table.autoOptimize.autoCompact, on specific tables. Here our_delta_table is covid_data.

No alt text provided for this image

In my personal experience, I would always say to enable these operations manually as these come with some concerns always. Example: If there are multiple DML operations running concurrently on the same table by multiple different users, it has a high probability of causing conflicts in the transaction logs by not showing us any errors or failures. I am happy to discuss this in detail with someone who wants to understand practically more. In the context of blog limit, I limit this in short.


7) Using caching to improve performance

a) Delta caching

b) Spark caching

c) Caching a subset of data

Memory caching exist for years now. It was introduced back in 1965’s by a famous Computer Scientist Maurice Wilkes with a name given to caching as “Slave memory” which is no longer politically correct. Most of the optimizations that we are reading today have been there for ages and it's interesting to see how they are used in different scenarios with time. Note, the roots always remain the same. Today, every system architecture/design of all the companies have caching as a major building block in their systems. May it be streaming companies like Netflix, or e-commerce companies like Amazon, etc, and I will say “every single company” has this in their architecture. Let’s understand how does delta engine leverages caching.

Caching is an operation in which we keep data stored close to where it will be processed, thus improving its performance. Azure Databricks applies it in two different ways:

  1. Delta Caching
  2. Spark Caching

It depends on the specific situation we are handling to improve the reading speed of our tables. They can also be used simultaneously.?

Delta Caching (It is now renamed as a Disk cache) - The disk cache automatically makes copies of remote Parquet files into the local node storage to accelerate its processing. The remote locations that it can access are Databricks File System (DBFS), Hadoop Distributed File System (HDFS), Azure Blob Storage, and Azure Data Lake Storage (ADLS). It can operate faster than spark cache due to optimized decompression and an output format consistent with processing requirements. Data can be preloaded using a CACHE statement, then stored on the local disk, and can be read quickly due to the high read speeds of modern SSDs. Take a look below at we can enable disk caching by selecting one of the disk cache accelerated workers from the cluster configuration:

No alt text provided for this image
We can enable Delta caching by selecting one of the Delta Cache Accelerated workers from the cluster configuration, as illustrated

Spark Caching - Spark caching is also very famous and can store results from subqueries and handles several file formats besides Parquet. Queries and tables to be cached must be manually specified. Data here is stored in memory as we all might be knowing, which can hurt the performance to a certain extent. Spark caching is a huge topic and there are amazing articles already available to read on this topic over the internet. Those explain several methods and parameters that need to be passed at the storage levels. Anyone interested in reading can go to this link here.

Similar to how Spark Cache is configured, you have options to configure Disk Cache (previously known as Delta Cache). It is recommended to always use Disk Cache-accelerated clusters for worker instances that are already optimized instances running SSD disks. They are already configured to work with Disk cache as default, but you also have manual options to configure. Take a look here.


That's it for today. In part-1 of this blog post, we covered about optimizing file management in Delta Engine. In the next blog post, we will cover the rest of the optimizations that include, optimizing queries in Delta using DFP, Bloom Filters and Join optimizations like Range, Skew etc.

Hope you all liked the article.

Adarsh ?Rawat

Former SWE Intern @Myntra | GEHU'25

5 个月

nicely written . ??

Varun Dhawan

Product @ Microsoft | Data + AI

1 年

Nicely done Hitesh! Thanks for sharing your knowledge

要查看或添加评论,请登录

Hitesh Hinduja的更多文章

  • Part 2 - Azure Databricks, Delta Engine and it's Optimizations

    Part 2 - Azure Databricks, Delta Engine and it's Optimizations

    Building upon my previous blog post where we delved into the Delta engine and its optimization features for files, I…

    3 条评论
  • Machine Learning on Azure PaaS

    Machine Learning on Azure PaaS

    Hi Everyone, Welcome to the series of blogs on Machine Learning (ML) Services in Azure. Most of the time, we know about…

    3 条评论

社区洞察

其他会员也浏览了