Databricks Advances Spark Structured Streaming With Project Lightspeed ..and RocksDB Stutters

Databricks Advances Spark Structured Streaming With Project Lightspeed ..and RocksDB Stutters

By Adi Gelvan , CEO @ Speedb

Databricks Takes Pole Position in AI

The recent Databricks acquisition of MosaicML had a lot of people scratching their heads.

Is Databricks an AI company?

As Databricks moves old fashioned data warehousing into the AI age with its Lakehouse offering, recent reports from CEO Ali Ghodsi validate Databricks is currently the fastest growing software company, reaching a staggering $1 Billion in revenue for 2022, with a 60% growth rate.??

The MosaicML acquisition for $1.3 billion was a bold move, signaling leadership in real-world AI innovation from Databricks

Databricks is quickly transforming itself into the unified shared data platform for AI, governance, and analytics by creating an open and strategic approach to managing and extracting value from the unbounded amounts of data being generated by the world today.

"Together, Databricks and MosaicML will enable any company to build, own and secure best-in-class generative AI models while maintaining control of their data"

Recent AI workloads are emerging; LLM training, inference and generative AI are now amplifying the challenges organizations face with data management, pushing bottlenecks throughout the systems, sniffing out weaknesses and forcing new comprehensive solutions, like the kind that Databricks are delivering.?

No alt text provided for this image
Credit: Databricks - Evolution To Data Lakehouse


One thing we love about Databricks is their openness on current and future strategies, documented by their own engineers in the form of their engineering blogs.?

A recent blog that caught our attention is Databricks Project Lightspeed Update - Advancing Apache Spark Structured Streaming

Recognizing the importance of Apache Spark Structured Streaming, Databricks engineers have been working on RocksDB, the state storage engine for Spark Streaming and its components to improve performance for ultimate use in Databricks Lakehouse.?

This insightful Databricks blog summarizes their great work on Project Lightspeed in the last year. Calling out several RocksDB challenges in relation to using RocksDB as state store provider, the blog outlines how these issues were improved within the constraints of RocksDB.

Before we review Databricks engineering efforts to improve RocksDB for Spark Streaming, let's take a look at the default performance advantages Speedb provides out of the (software) box:

No alt text provided for this image
RocksDB vs Speedb - db_bench - 50/50 Random Read/Write Workload (IOPS and Time)


(The following section is quoted directly from this Databricks Engineering Blog)

Summary RocksDB Challenges and Solutions

"Unpredictable and Inconsistent (RocksDB) Performance"

In the existing model, when Structured Streaming pipelines used RocksDB state store provider, we used to observe higher and variable latency.?

"During a detailed investigation, we identified that commit operations related to the state store contributed to 50-80% of task duration and also accounted for the high, variable latency."

Here are some of the issues that we have seen:

  • Memory Growth/Usage Related Issues - For the RocksDB state store provider, all the updates were being stored in memory using writeBatchWithIndex. This meant that we had unbounded usage on a per instance basis as well as no global limits across state store instances on a single node. For stateful queries, the number of state store instances are usually proportional to the number of partitions, leading to spikes in memory usage for queries dealing with large state.
  • Database Write/Flush Related Slowdown - As part of the commit operations, we were also performing writes from the writeBatchWithIndex to the database as well as performing a synchronous flush, both of which could have unpredictable performance in the event of large buffer sizes. We were also forcing all writes to the WAL (write-ahead log) along with a sync for the Sorted String Table (SST) files resulting in duplication of updates. We would also explicitly wait for background work to complete before taking a snapshot of the database, leading to pauses associated with background compaction/flush operations.
  • Write Amplification - In the existing model, the size of the uploaded state to the distributed file system was not proportional to the size of the actual state data changed. This is because SST files of RocksDB are merged during Log Structured Merge (LSM) compaction. We would then try to identify all the changed SST files compared to the previous version and needed to sync a lot of files to the distributed file system, leading to write amplification and additional network and storage costs.


No alt text provided for this image
"There are over 10 million streaming jobs run per week on Databricks, a number that is still growing at more than 2.5x every year."

Towards Faster and Consistent Performance

"To address the issues discussed above, we have made a number of improvements to achieve faster and consistent performance."

  • Bounded Memory Usage - To fix the memory usage/growth issue, we now allow users to enforce bounded memory usage by using the write buffer manager feature in RocksDB. With this, users can set a single global limit to control memory usage for block cache, write buffers and filter/index blocks across state store DB instances. We also removed our reliance on writeBatchWithIndex so that updates are not buffered, but written directly to the database.
  • Database Write Related Improvements - With the improvements we implemented, we now just write and read directly from the database. However, since we don’t explicitly need the WAL (write-ahead log), we have disabled this RocksDB feature in our case. This allows us to serve all reads/writes primarily from memory and also allows us to flush periodically in case changelog checkpointing is enabled. We also no longer pause background operations since we can capture and upload the snapshot safely without interrupting background DB operations.
  • Changelog Checkpointing - The key idea in incremental checkpointing is to make the state of a micro-batch durable by syncing the change log instead of snapshotting the entire state to the checkpoint location. Furthermore, the process of snapshotting is pushed to a background task to avoid blocking task execution in the critical path. The snapshot interval can be configured to tradeoff between failure recovery and resource usage. Any version of the state can be reconstructed by picking a snapshot and replaying change logs created after that snapshot. This allows for faster and efficient state checkpointing with RocksDB state store provider.
  • Misc Improvements - We have also improved the performance of specific types of queries. For example, in stream-stream join queries, we now support performing state store commits for all state store instances associated with a partition to be performed in parallel leading to lower overall latency. Another optimization is where we skip going through the output commit writer for at-least once sinks (such as Kafka sink) since we don’t need to reach out to the driver and perform partition-level unique writes leading to better performance as well." (end Databricks Blog reference quotation)

Why Use RocksDB?

RocksDB started life as LevelDB, created by Google to run indexing and various forms of caching used by Google Chrome. Then Facebook’s team forked LevelDB and released it as open source in 2012 as RockDB, with some dramatic improvements, like multi-threading and API support. It was designed to be embedded, meaning it’s a library running inside of, or called by another application. This keeps the memory and related disk operations physically close to each other (by eliminating any networks for memory-to-disk processing) as well as requiring the least amount of system (server/resource) overhead.?

No alt text provided for this image
RocksDB Architecture


RocksDB is the most prevalent embedded storage engine library in use today for modern unstructured databases and hyperscale applications, having wide adoption across the entire hyperscaler landscape. Used by Twitter, Dropbox, Netflix, Google, Facebook, Pinterest, Microsoft (for Bing) and thousands of others, RocksDB is built into many of the world’s largest online services and is the storage engine for storing state in event streaming infrastructure like Kafka Streams and Apache Flink, and often in Spark Streaming, referenced here by Databricks.?

RocksDB is known for being FAST, but finicky, with over 300 variables that all affect each other, taking valuable engineering time and resource to get it optimized.

RocksDB uses a log structured database engine, written entirely in C++, for maximum performance. Keys and values are just arbitrarily-sized byte streams. Aggregating bytes in server memory, RocksDB transforms data into block formats that match physical SSDs block sizes and writes these arbitrary bytes sequentially to help speed large ingest, (aka - disk writes).

By being able to manipulate the format of the data in memory before writing to disk, a high level of efficiency for writes can be achieved with RocksDB, when compared to most other existing storage engines and databases which reorganize B-Trees (and its many forms) upon each write, naturally optimizing for reading back later.?

Why not use RocksDB?

The talented RocksDB team at Meta has produced (and is still producing) amazing work improving the RocksDB storage engine to make Meta internal use-cases faster and more efficient within their infrastructure, deployed on hundreds of thousands of distributed servers, used by hundreds of internal applications. RocksDB is well tuned for their use-cases, but not necessarily for everything (and everyone) else, even though RocksDB is commonly deployed for many varying use-cases, and works well-enough for most cases.

Challenges with RocksDB?(for the rest of us)

  1. Memory Management: RocksDB's memory management can be a challenge, particularly when dealing with large data sets. Configuring the BlockCache, MemTable, and Bloom Filters to efficiently utilize memory while avoiding excessive memory consumption can be complex. In some cases, users may need to carefully monitor and adjust memory settings to avoid excessive memory usage, memory fragmentation, or performance degradation.
  2. Compaction Overhead: RocksDB employs a compaction process to merge and compact data files, ensuring efficient space utilization. However, compactions can be resource-intensive and may impact performance during heavy write workloads. Users may experience increased CPU and disk usage during compaction, resulting in temporary performance slowdowns or increased latency.
  3. Write Amplification: Write amplification refers to the additional amount of data written to disk due to internal data structures and compactions. In some cases, RocksDB's default compaction settings or certain workloads may lead to higher write amplification, resulting in increased disk I/O and affecting overall write performance during heavy load.
  4. Storage Overhead: RocksDB's storage overhead can be a concern, especially when dealing with smaller key-value pairs or when the size of the data set is large. The internal metadata and indexes used by RocksDB can consume a significant amount of storage space compared to the actual data stored.
  5. Complexity in tuning and troubleshooting. With over 300 variables, and no particular guidance on how to tune, changing settings affects other settings and creates surprise outcomes. Hard to troubleshoot in production, and hard to tune for real-life scenarios in test!

While RocksDB engineering teams at hyperscalers and vendors like Databricks leverage their expertise to typically ‘tweak’, and in some cases, modify RocksDB, it’s not an easy job and takes time.?It also has limited potential unless significant engineering resource is applied to rearchitect various components manually and with great effort.

Enter Speedb - Hold My Beer..!?

Speedb was founded by veteran data structure developers who were confronted by various real-world RocksDB scaling issues and decided to re-design key parts of RocksDB to deliver a more performant, scalable, and resource efficient implementation, allowing RocksDB to scale from 50GB to multiple TBs per node (scale-up) and a cascade of other positive outcomes for larger per node data sizes.

When designing Speedb, the engineers wanted to maintain two ground rules:?

  1. Always rebase with Meta’s RocksDB every second release. This ensures full API compatibility and allows Speedb to be a forward/backward compatible drop-in replacement library. Simply clone the Speedb library or install the Speedb binary, each about a simple 30 second operation.?
  2. Do no harm. Meaning, all improvements to RocksDB should never come at the cost of making Speedb less performant or efficient than RocksDB. This makes Speedb's OSS or Enterprise adoption a virtually risk-free endeavor.

By using Speedb, you get RocksDB, but with all of Speedb’s innovations and improvements, making RocksDB more performant, efficient and simple to use at scale. And no downsides.?

Speedb’s core innovation is its new Multi-Dimensional Adaptive Compaction data structure, which adds more dimensions in the LSM-Tree, enabling micro-compactions to reduce WAF, increase performance and allow for TB scale per node.

In addition Speedb adds an adaptive component to the compaction, which optimizes the new LSM data structure for the particular IO workload, adjusting in seconds to keep the read/write flow optimized for best performance.

No alt text provided for this image
RocksDB vs Speedb - 'Seek While Write' During Compaction Run


No alt text provided for this image
RocksDB vs Speedb - 95% Write and 5% Read During Compaction Run


A key outcome of Speedb's Multi-Dimensional Adaptive Compaction is consistently reducing WAF (write amplification factor) by 85%, meaning you’ll need a lot less disk space to operate, improve performance and avoid thrashing and rewrites on your SSDs, extending their lifespan significantly.

No alt text provided for this image
Speedb Enterprise Reduces Write Amplification Factor vs RocksDB


Speedb is showing advantages in large-scale, real-world event streaming cases like XM Cyber, where 10x output was archived on Apache Flink (Streaming), and critical persistent state restore times reduced by 80% with Kafka Streams.

Speedb is being rapidly adopted by the industry as the natural starting point for large scale key-value store requirements, and especially by engineering teams who would rather reap developed benefits rather than trying to re-invent the key-value wheel.

Here you can find Speedb’s other innovations, captured in the OSS version on Github and in the Enterprise binary:?

  1. Write Flow
  2. Global Delayed Write
  3. Live Configuration Changes
  4. Report Index Size Per Column Family
  5. Proactive Flushing
  6. Sorted Has Memtable
  7. Paired Bloom Filter
  8. RocksDB/Speedb Log Parser Tool

No alt text provided for this image
The chart illustrates the dramatic increase in writing performance when using small objects with Speedb's Write Flow feature enabled.


We at Speedb applaud Databricks engineering effort to improve and stabilize RocksDB for their customer's benefit. By leveraging Speedb as an API compatible alternative to RocksDB, the chances of order-of-magnitude improvements can still be achieved.

We're standing by to help any and all RocksDB focused engineering teams who want a head-start in their key-value store engineering and tuning efforts.

Join our RocksDB/Speedb Discord Server: https://discord.gg/5fVUUtM2cG

Or Contact us for a cooperative effort to improve your strategic RocksDB needs: [email protected]

No alt text provided for this image
Speedb is the FINAL BOSS of key-value storage engines







要查看或添加评论,请登录

Speedb的更多文章

社区洞察

其他会员也浏览了