Small Talk about Apache Hudi

About Hudi

Apache Hudi is a data lake platform that combines the capabilities of a database and a data warehouse within the data lake. It enables real-time analytics on streaming data with minimal latency, allowing for minute-level analysis. Hudi introduces a new framework for incremental processing, replacing the slow batch data processing commonly used. With Hudi, users can benefit from features such as tables, transactions, efficient upserts and deletes, advanced indexing, streaming data ingestion services, data clustering, compaction optimizations, and concurrency control. What sets Hudi apart is its ability to maintain data in open-source file formats, making it compatible with various query engines like Apache Spark, Flink, Presto,? Hive, and more. It is ideal for streaming workloads and also supports the creation of efficient incremental batch pipelines, ensuring high-performance analytics.

How Hudi update works

Hudi works based on indexing. It creates an index based on Primary key. If there are updates coming for the Primary key, it searches for the file that contains the record for this key & creates a new version of the file with updated records.

Write Operations

Before that, it may be helpful to understand the 3 different write operations provided by Hudi datasource or the delta streamer tool and how best to leverage them. These operations can be chosen/changed across each commit/deltacommit issued against the table.

Properties required for inserts.

  • OPERATION_OPT_KEY(`hoodie.datasource.write.operation`) : Refers to type of operation being performed with Hudi - UPSERT_OPERATION_OPT_VAL (default), BULK_INSERT_OPERATION_OPT_VAL, INSERT_OPERATION_OPT_VAL, DELETE_OPERATION_OPT_VAL
  • PARTITIONPATH_FIELD.key() (`hoodie.datasource.write.partitionpath.field`) : Refers to partition path where the record belongs to.
  • PRECOMBINE_FIELD.key() (`hoodie.datasource.write.precombine.field`) : Field used to de-dup multiple records among the batch of records thats being ingested.
  • RECORDKEY_FIELD_OPT_KEY (Required): Primary key field(s). Record keys uniquely identify a record/row within each partition.?
  • hoodie.insert.shuffle.parallelism : Refers to spark parallelism to use while ingesting records to hudi
  • hoodie.datasource.write.table.type : Refers to table type of the hudi table. There are two table types in Hudi, namely COPY_ON_WRITE(default) and MERGE_ON_READ.
  • TABLE_NAME (`hoodie.table.name`) : Refers to hudi table name.

COW Vs MOR?

Copy On Write Table

File slices in Copy-On-Write table only contain the base/columnar file and each commit produces new versions of base files. In other words, we implicitly compact on every commit, such that only columnar data exists. As a result, the write amplification (number of bytes written for 1 byte of incoming data) is much higher, where read amplification is zero. This is a much desired property for analytical workloads, which is predominantly read-heavy.

Following illustrates how this works conceptually, when data written into copy-on-write table and two queries running on top of it.

As data gets written, updates to existing file groups produce a new slice for that file group stamped with the commit instant time, while inserts allocate a new file group and write its first slice for that file group. These file slices and their commit instant times are color coded above. SQL queries running against such a table (eg: select count(*) counting the total records in that partition), first checks the timeline for the latest commit and filters all but latest file slices of each file group. As you can see, an old query does not see the current inflight commit's files color coded in pink, but a new query starting after the commit picks up the new data. Thus queries are immune to any write failures/partial writes and only run on committed data.

The intention of copy on write table, is to fundamentally improve how tables are managed today through

  • First class support for atomically updating data at file-level, instead of rewriting whole tables/partitions
  • Ability to incremental consume changes, as opposed to wasteful scans or fumbling with heuristics
  • Tight control of file sizes to keep query performance excellent (small files hurt query performance considerably).

Merge On Read Table

Merge on read table is a superset of copy on write, in the sense it still supports read optimized queries of the table by exposing only the base/columnar files in latest file slices. Additionally, it stores incoming upserts for each file group, onto a row based delta log, to support snapshot queries by applying the delta log, onto the latest version of each file id on-the-fly during query time. Thus, this table type attempts to balance read and write amplification intelligently, to provide near real-time data. The most significant change here, would be to the compactor, which now carefully chooses which delta log files need to be compacted onto their columnar base file, to keep the query performance in check (larger delta log files would incur longer merge times with merge data on query side)

Following illustrates how the table works, and shows two types of queries - snapshot query and read optimized query.

There are lot of interesting things happening in this example, which bring out the subtleties in the approach.

  • We now have commits every 1 minute or so, something we could not do in the other table type.
  • Within each file id group, now there is an delta log file, which holds incoming updates to the records already present in the base columnar files. In the example, the delta log files hold all the data from 10:05 to 10:10. The base columnar files are still versioned with the commit, as before. Thus, if one were to simply look at base files alone, then the table layout looks exactly like a copy on write table.
  • A periodic compaction process reconciles these changes from the delta log and produces a new version of base file, just like what happened at 10:05 in the example.
  • There are two ways of querying the same underlying table: Read Optimized query and Snapshot query, depending on whether we chose query performance or freshness of data.
  • The semantics around when data from a commit is available to a query changes in a subtle way for a read optimized query. Note, that such a query running at 10:10, wont see data after 10:05 above, while a snapshot query always sees the freshest data.
  • When we trigger compaction & what it decides to compact hold all the key to solving these hard problems. By implementing a compacting strategy, where we aggressively compact the latest partitions compared to older partitions, we could ensure the read optimized queries see data published within X minutes in a consistent fashion.

The intention of merge on read table is to enable near real-time processing directly on top of DFS, as opposed to copying data out to specialized systems, which may not be able to handle the data volume. There are also a few secondary side benefits to this table such as reduced write amplification by avoiding synchronous merge of data, i.e, the amount of data written per 1 bytes of data in a batch

Drawbacks

  • To utilize HUDI upsert, we need to get only updated/records from source.
  • HUDI upsert may not be helpful if almost all the records in the table will get updated. It adds indexing and search overhead.
  • We cannot directly delete a file or drop a partition in HUDI, as HUDI maintains all file name & record keys in its Metadata file. Need to perform these operation through HUDI delete.
  • Timestamp column is inaccessible from beeline. (It can be accessed through Spark )
  • File size is more compared to ORC. (They may also due to Metadata. we can see if we can reduce file size by compression)


要查看或添加评论,请登录

Ashok Kunkala的更多文章

  • MapReduce Vs Spark

    MapReduce Vs Spark

    Choosing the right one MapReduce or Spark Both are Open source technologies from Apache software foundation, both will…

  • The core components in Hadoop

    The core components in Hadoop

    The Core Components in Hadoop. Three core components of Haoop are : HDFS: The Java-based distributed file system that…

  • Big Data Vs Hadoop

    Big Data Vs Hadoop

    What is the difference between big data and Hadoop The difference between big data and the Hadoop is a distinct and…

    2 条评论

社区洞察

其他会员也浏览了