Understanding Delta Table Format and Architecture
Arabinda Mohapatra
Pyspark, SnowFlake,AWS, Stored Procedure, Hadoop,Python,SQL,Airflow,Kakfa,IceBerg,DeltaLake,HIVE,BFSI,Telecom
- Delta Lake is an open-source storage framework that enables building a format agnostic Lakehouse architecture with compute engines including Spark, PrestoDB, Flink, Trino, Hive, Snowflake, Google BigQuery, Athena, Redshift, Databricks, Azure Fabric and APIs for Scala, Java, Rust, and Python. With Delta Universal Format aka UniForm, you can read now Delta tables with Iceberg and Hudi clients.
- Delta Lake, an open source storage layer, addresses these challenges by introducing a transactional storage layer on top of your data lake
- Purpose of Each Component
- lastcheckpoint: Keeps track of the last successful checkpoint, which is used for recovery in case of failures
- COMMITTEDFILES: Lists all the files that have been successfully committed to the table
- INPROGRESS: Stores files that are currently being written and not yet committed
- _SUCCESS: Contains files that have been successfully written and are ready to be committed
- _temp: Used for temporary storage during the write process
- How Does Compaction Work?
- Transaction Log Files: Initially, each transaction in Delta Lake is recorded as a separate JSON file in the _delta_log directory. These files contain detailed information about the changes made during each transaction.
- CRC Files :The .crc files are checksum files that help in validating the integrity of the JSON files. They ensure that the transaction log files have not been corrupted during storage or transfer
- Error Detection: By using CRC (Cyclic Redundancy Check), Delta Lake can detect errors in the transaction log files, ensuring that only valid and uncorrupted data is processed
- Compaction Process: Periodically, Delta Lake compacts these JSON files into Parquet files. Parquet is a columnar storage format that is highly efficient for both storage and retrieval.
- Consolidation: During compaction, multiple JSON files are read, and their contents are consolidated into a single Parquet file. This Parquet file then replaces the individual JSON files in the _delta_log directory.
- Checkpointing: Along with compaction, Delta Lake also creates checkpoints. A checkpoint is a snapshot of the transaction log at a specific point in time, stored as a Parquet file. Checkpoints make it faster to read the transaction log because the system can start from the checkpoint rather than reading all the JSON files from the beginning
Step 1—Creating Databricks Delta Table:
CREATE TABLE students_info (
id INT,
name STRING,
age INT
)
USING DELTA
LOCATION '/user/hive/warehouse/student_info'
Step 2—Inserting Data Into Databricks Delta Table
Using SQL:
INSERT INTO students_info
VALUES (1, "Elon", 25),
(2, "Jeff", 30),
(3, "Larry", 35)
Using the DataFrame:
Step 3—Query the Delta Table
Using SQL:
SELECT * FROM students_info;
Using the DataFrame:
Step 4—Perform DML Operations (Update, Delete, Merge)
1) Updating Records in a Delta Table:
Using SQL:
领英推è
UPDATE students_info
SET name = "Tony Stark"
WHERE id = 1;
Using the DataFrame:
%python
from pyspark.sql.functions import *
# Read the Delta table into a DataFrame
students_df = spark.read.format("delta").load("/user/hive/warehouse/students_info_dataframe")
# Update the DataFrame
updated_df = students_df.withColumn(
"name",
when(students_df.id == 1, lit("Tony Stark"))
.otherwise(students_df.name)
)
# Write the updated DataFrame back to the Delta table
updated_df.write.format("delta").mode("overwrite").option("mergeSchema", "true").saveAsTable("students_info_dataframe")
2) Deleting Records in a Delta Table:
DELETE FROM students_info
WHERE age < 30;
Using the DataFrame:
%python
# Read the Delta Table into a DataFrame
delta_table_df = spark.read.format("delta").load("/user/hive/warehouse/students_info_dataframe")
# Filter the DataFrame to exclude rows to be deleted
filtered_df = delta_table_df.filter(delta_table_df.age <= 30)
# Write the filtered DataFrame back to the Delta Table
filtered_df.write.format("delta").mode("overwrite").save("/user/hive/warehouse/students_info_dataframe")
3) Merging Records in a Delta Table
Using SQL:
Using DataFrame:
%python
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
# Source DataFrame
source_data = [(1, 'Tony Stark', 35), (4, 'Bruce Wayne', 40)]
source_schema = StructType([
StructField("id", IntegerType(), True),
StructField("name", StringType(), True),
StructField("age", IntegerType(), True)
])
source_df = spark.createDataFrame(source_data, schema=source_schema)
# Loading target Databricks Delta table
target_df = spark.read.format("delta").table("student_info_dataframe")
# Merge operation
merged_df = target_df.alias("target").join(source_df.alias("source"), on="id",how= "outer") \
.withColumn("id", coalesce(target_df["id"], source_df["id"])) \
.withColumn("name", coalesce(source_df["name"], target_df["name"])) \
.withColumn("age", coalesce(source_df["age"], target_df["age"]))
# Display resulting DataFrame
merged_df.show()
# Overwrite the target Delta table
merged_df.write.format("delta").mode("overwrite").saveAsTable("student_info_dataframe")
Step 5—Using Time Travel Option in Delta Table (Optional)
Databricks Delta Tables offer a robust feature called “time travel,†which keeps a comprehensive history of all data changes. This capability allows you to query and revert to earlier versions of your data, making it extremely useful for tasks like auditing, debugging, and replicating experiments or reports.
Using the SQL:
To query a previous version of a Delta Table, you can use the SQL TIMESTAMP AS OF or VERSION AS OF clause.
SELECT *
FROM students_info
VERSION AS OF "-------";
OR
SELECT *
FROM students_info
TIMESTAMP AS OF "-------";
Using the DataFrame:
using Time Travel Option in Databricks Delta Table using DataFrame
Step 6—Optimizing Databricks Delta Table (Optional)
Will write further: