I spent 8 hours relearning the Delta Lake table format
The format, Read/Write process, Concurrency, Data Mutation and more
My name is Vu Trinh, and I am a data engineer.
I’m trying to make my life less dull by spending time learning and researching “how it works“ in the data engineering field.
Here is a place where I share everything I’ve learned.
Not subscribe yet? Here you go:
Intro
This week, I will try to relearn the Delta Lake table format. You’re reading an extended version of my article from six months ago on Delta Lake.
I can't give a clear answer if you ask why I needed to extend my existing article. I’ve noticed that every time I revisit knowledge I’ve learned before, I discover something interesting that I might have missed the last time.
This time, it happened with Delta Lake. ;)
Background
Cloud Object Storage is everywhere.
In the rise of the cloud era, users prefer services like Amazon S3 or Google Cloud Storage over HDFS for the data lake backbone. Object stores are (theoretically) unlimited scale, pay-as-you-go billing, high durability, and reliability. With a few manual configs, you will have a repository for all kinds of data your organization wants to persist.
Popular open-source systems, including Apache Spark or Presto, support accessing cloud object stores using file formats such as Apache Parquet and ORC. Cloud services, including Google BigQuery or Redshift Spectrum, can also query directly against these file formats.
However, object storage has shortcomings when viewed in the Lakehouse context, which requires data warehouse capabilities right on top of the Datalake; object listing is expensive, and consistency guarantees are limited.
Let's imagine we store relational data natively in the cloud object storage. We considered each table to be stored as a set of Parquet file objects; this approach creates correctness and performance challenges for more complex workloads:
There is no isolation between queries: if a query needs to update multiple objects in the table, consumers will see partial updates as the query updates each object individually.
If an update query fails in the middle, the table is corrupted.
Metadata operations are expensive in large tables with millions of objects. (e.g., list)
To address these challenges, Databricks created Delta Lake, an ACID table storage layer on cloud object storage served to its customers in 2017 and was open-sourced in 2019. The core idea of Delta Lake is simple: keeping information about which objects belong to a table in an ACID manner, using a write-ahead log in the cloud object store. The design allows clients to:
Updating multiple objects simultaneously.
Replacing a set of objects with another in a serializable manner while still achieving high read and write performance as reading raw Parquet.
The log structure also contains metadata such as min/max statistics for each data file, enabling faster metadata search than traditional object listing operations.
Delta Lake’s design enables Databricks to other features unsupported in traditional cloud data lakes, such as Time Travel, UPSERT, DELETE and MERGE operations, Caching, Layout optimization, Schema Evolution, and audit logging.
The following section will learn Databricks’s motivation behind the Delta Lake format.
Motivation
We will examine the cloud object storage API and performance based on Databricks' observations to understand why implementing the lakehouse architecture using existing cloud object storage can be challenging.
Object Store APIs
Cloud object stores implement a key-value store aiming for excellent scalability. They allow users to create buckets that store multiple objects; each is a binary blob. The system uses a string key to identify the object. Object storages do not have the directory abstraction like file systems; the object’s path you see, for example, “/data/country=us/date=2024-11-01/object_name,” is just the whole key to identify the object.
The system also provides metadata APIs, such as S3’s LIST operation, that can list the objects in a bucket by lexicographic order of key, given a start key; this makes it possible to list the objects in a “directory” by starting a LIST request at the key that represents that directory prefix (e.g., “/data/country=us”). These APIs are expensive and have high latency.
Consistency Properties
The major cloud object stores support eventual consistency for each object’s key but not across keys. This characteristic raises some challenges when handling a table consisting of multiple objects. It is common in these systems that after a client operates on an object (load, update), other clients are not guaranteed to see the result in LIST or read operations immediately.
The consistency model differs by the cloud provider. Different cloud object storages will offer various levels of consistency based on the operation, but all cloud storage systems lack support for atomic operations across keys.
Performance
From the Databricks experiences, they found that achieving high throughput with object stores requires a balance of large sequential I/Os and parallelism:
Each sequential read operation takes at least 5–10 ms of latency and can read data of nearly 50–100 MB/s. Thus, it must read at least several hundred kilobytes to achieve at least half the peak throughput for sequential reads.
LIST operations also require significant parallelism to list large sets of objects quickly. For example, S3’s LIST operations can only return up to 1000 objects per request and take tens to hundreds of milliseconds, so clients need to request hundreds of LISTs in parallel to list large buckets or “directories.”
Write operations must replace a whole object. This implies that if a table needs to be updated later, its objects should be small enough to avoid expensive file rewriting.
Based on the above observations, analytics workloads that are stored in cloud object storage should consider the following points:
Organizing data sequentially close to each other. Columnar formats, like Parquet, can deal with this.
The size of the object must be carefully considered. Too large objects make updating data too expensive. On the other hand, too many small objects issue more operations (object listing, object get, or file open/close)
Avoiding LIST operations. If there are cases that need these operations, make them request lexicographic key ranges.
After understanding Databricks's motivation behind Delta Lake, the following sections will describe its storage format, access protocols, and transaction isolation levels in detail.
Delta Lake Storage Format
A Delta Lake table is the cloud object storage directory or file system that consists of the table’s data objects and a log of transaction operations.
Data Object
The data in a table is stored in Apache Parquet objects, which can be organized into directories using Hive’s partition convention (if the table is partitioned). Parquet is one of the most famous columnar formats currently, supporting various compression schemes and can handle nested and repeated data types. Moreover, the format is widely adopted in many processing engines, thus simplifying the connector development process for Databricks.
Delta Lake identifies which object belongs to which table’s version using the transaction log. Every data file in the delta table must be referenced from the transaction log. It is unreadable if a file can’t be referenced from a log.
Transaction Log
All operations must go through the transaction log
The delta log stores transaction metadata, such as data files added or removed during the transaction. With the transaction log, the user can construct a table snapshot in a specific version. In Delta Lake, the writer must commit all file changes in a single write to the delta log; if one of those files fails in the middle, the writer won’t write to the delta log, resulting in unreadable files.
In the Delta Lake table directory, the transaction log is managed in a sub-directory called _delta_log
. The log is a sequence of JSON objects with increasing, zero-padded numerical IDs (e.g., 000003) to store the log records and checkpoints for summarizing the log up to that point in Parquet format. Each log record object has a series of actions to apply to the previous version of the table to generate the next one. The available actions are:
Change metadata: The metaData action changes the table's current metadata. The first version of the table must contain a metaData action. Following metaData actions overwrite the current metadata. The metadata contains the schema, partition column names, data file format, and other configuration options.
Add or Remove Files: The add and remove actions modify the table’s data by adding or removing individual data objects. The table’s state is determined by the number of all added objects that have not been removed. The add action can include data statistics such as per-column min/max values or null counts, which can benefit the later read operation. The remove action includes a timestamp to indicate when it happened. The associated data objects are deleted after a retention time threshold.
Protocol Evolution: The protocol action is used to increase the version of the Delta protocol.
Add Provenance Information.: Each log record object can include provenance information in a commitInfo action.
Update Application Transaction IDs: Delta Lake also allows the application to ingest data inside log records, which can help implement end-to-end transactional applications. For example, if the streaming job fails, it must replay the process starting at the stream offset from the previous write. Delta Lake lets applications write a custom txn action with appId and version fields in the log record to track the input stream’s offset.
Log Checkpoints
Delta Lakes compresses the log periodically into the Parquet checkpoint files to achieve better performance. Checkpoints store all the non-redundant actions up to a specific log ID. Some sets of actions are redundant and can be discarded:
The add can be discarded when the add actions are followed by remove actions for the same data object. However, according to the table's data retention configuration, the remove actions should be kept as tombstones.
Subsequent adds for the same object can be handled by keeping the last one.
Multiple txn actions from the same appId can be replaced by the latest one.
The change metadata and protocol actions can also be merged to keep only the latest metadata.
Checkpointing into the Parquet file makes it ideal for querying table metadata; this helps avoid too many LIST operations. One more thing to note is that clients must efficiently find the last checkpoint without looping through all the objects in the _delta_log
directory. To deal with this, Checkpoint writers write their new checkpoint ID in the _delta_log/_last_checkpoint
.
Write and Read Process
The general idea of the read/write operations of Delta Lake is straightforward:
Write: The client writes a parquet file and commits those files by writing a log entry.
Read: The client reads the log to find the desired snapshot of the table.
Read
The read-only transactions have five steps:
Read the
_last_checkpoint
(is it exist) object in the table’s log directory.It issues a LIST operation to find newer JSON and parquet files in the log directory. The start key for the operation is the last checkpoint if it exists or zero in case it is not present.
Use the checkpoint and subsequent log records identified in the previous step to reconstruct the state of the table, which is the set of data objects and their associated data statistics.
Identifying data objects needed for the read query using the statistics.
Reading the necessary data objects. The read process can be done in parallel if the engine supports it.
Write
The data write transactions generally have the following steps:
Identifying a recent log record ID using steps 1–2 of the read protocol. For convenience, we call this current_version. (e.g., 00002.json)
Reading the data at current_version.
Write new data objects into the correct data directories, generating the object names using GUIDs. This step can be processed in parallel.
If no other client has written this object, it attempts to write the transaction’s log record for the newly added objects into the current_version + 1 (e.g., 00003.) log object. This process needs to be atomic. If the step fails, the transaction can be retried. (More on this in the following section)
Optionally, write a new checkpoint for log record current_version + 1. After this write is complete, update the _last_checkpoint file to point to checkpoint r + 1.
Concurrency control
Delta Lake supports multiple concurrent writers in a table with the support of optimistic concurrency control (OCC).
Optimistic concurrency control (OCC), is a non-locking concurrency control method applied to transactional systems. OCC assumes that multiple transactions can frequently complete without interfering with each other. — Wikipedia —
The principle of OCC in Delta Lake is to avoid overwriting delta log entry files. (e.g., two writes can not write the same log object 00002.json). Let checkout out an example for a better understanding of what happens if the two write operations try to write the log object with the same name:
Let's say we have two write operations, A and B, which both start writing new data with the current log version is 00001.json.
First, operation B writes file 2 and commits to the log transaction by writing the 00002.json log object. The 00002.json now refers to file 2.
Later, operation A writes file 3 and also commits to the log transaction with the writing of 00002.json log object. (Because both write observed the current version of the table is 00001.json)
At this time, 00002.json now refers to file 3, which makes file 2 orphaned; it can not be referred to from the delta log.
To ensure the log entry writing step is atomic, only one client should succeed in creating the log object with that name. Databricks implements this step for different storage systems:
They use existing atomic put-if-absent operations from Google Cloud Storage and Azure Blob Store.
Initially, Amazon S3 did not support the put-if-absent operation, so Databricks relied on an external service to manage the table lock. The client must first acquire the table lock to ensure that only one client can add a record with each log ID. However, in August, Amazon announced that they supported conditional writing to S3. I guess Databricks won’t need the external locking service anymore.
Let's review the process with the introduction of a put-if-absent operation to check the conflict. For this purpose, we will go back to the example right above:
In the example, at the write log step of both operations (Step 2 for Operation B and Step 5 for Operation A), the writers will use put-if-absent requests instead of regular write requests.
For operation B, because no log with the same name existed at the time of writing the log 00002.json, it will successfully write the log 00002.json.
For operation A, when trying to write the log 00002.json, because operation B already wrote the log with the same name, it must update the log version. To do this, the delta lake must be reloaded to check if any data conflicts with the previous writes; if yes, the write will be aborted. If not, the writer will update the version of the log (00003. json) and use a put-if-absent request to write this log; if there is a log with the same name that exists, the write will succeed.
Available Isolation Levels
Delta Lake only supported transactions in one table. All the write transactions are serializable, leading to a serial schedule that increases the order of log IDs. Read transactions can achieve snapshot isolation or serializability.
The read protocol described above only reads a snapshot of the table; clients using this protocol will achieve snapshot isolation. Clients requiring serializable reading can issue a read-write transaction that performs a dummy write to accomplish this.
Transaction Rates
Delta Lake’s write transaction rate is constrained by the latency of the put-if-absent operations described above. In practice, the latency of writes to object stores can be tens to hundreds of milliseconds, limiting the write transaction rate to several transactions per second.
Read transactions at the snapshot isolation level create no contention because they only read objects in the object store, so any number of these can run concurrently.
The following sections describe higher-level data management features, similar to many traditional analytical DBMSs.
Time Travel and Rollbacks
Because of Delta Lake’s data objects and log immutability, it’s easy to query a snapshot of the data in the past with the MVCC implementations. A client reads the table state using an older log record ID. Delta Lake lets users configure a data retention interval for each table to facilitate time travel. It supports reading table snapshots using timestamp or commit_id. Clients can also find which commit ID they read or wrote through Delta Lake’s API. Besides that, Databricks also developed the CLONE command to create a copy of an existing Delta Lake table at a specific version.
Data Mutation
Copy on write (COW)
At first, Delta Lake supports only copy-on-write for data mutations. Any mutations in Delta Lake will result in a new file being written with changes applied while the original files are unaffected. Given a data file with millions of rows and only two rows needing to be deleted or updated, the whole new data file needs to be written with millions of rows, plus the change applied.
Let’s imagine we have a table with two columns, name, and age; here is the data mutation process of this table with COW:
First, we insert two rows into the table: (‘Bruce‘, 25) and (‘Clark‘, 30). This results in writing File 1. The log object will have a single add action referring to File 1.
Then, we delete the record with the name ‘Bruce‘. This results in rewriting File 1 into File 2 without the deleted record (row with the name ‘Bruce’). The log object will contain two actions: the first is File 2 adding, and the second is File 1 removing.
Next, we want to update the value of the row's age column with the name ‘Clark’. This results in rewriting File 2 to File 3 with the Clark record with the new age value. The log object will contain two actions: the first is File 3 adding, and the second is File 2 removing.
Merge On Read (MOR)
COW does not deal with workloads that frequently require data mutations. Deleting or updating only one row from a hundred-gigabyte parquet file will cause the entire file to be rewritten with the single row change applied.
Later, Delta Lake supported the merge-on-read strategy by introducing deletion vectors (DV), which are additional files used to invalidate table rows. This approach allows Delta Lake to avoid rewriting the whole data file. In return, the read operations might need to perform extra tasks; they must merge the deletion vectors with the data files.
When the read operation loads a data file with an associated DV file, it skips all rows in the data file that are present in the DV file. In case of updating, a new data file can be written with only the new row values, and a deletion vector can be added to invalidate the row in the original data file.
We will go back to the example from the COW section, but this time, we will see how the data mutations are carried out with the MOR approach:
First, we insert two rows into the table: (‘Bruce‘, 25) and (‘Clark‘, 30). This results in writing File 1. The log will have a single add action referring to File 1.
Then, we delete the record with the name ‘Bruce‘. This results in writing DV file 1, which contains the location of the deleted row (row 1). The log object will have two actions: the first is to add file 1 and DV 1, and the second is to remove file 1. The remove action is to make the client aware of the record mutation.
Next, we want to update the row's age value with the name ‘Clark.’ This results in writing the DV file 2 and File 2 with ONLY the updated row. One thing to note is that DV file 2 includes the deleted row location from DV file 1 plus the deleted row location of the updated row. The log object will contain two actions: the first is DV 2 and File 2 adding, and the second is DV 1 removing.
If the clients read the table right after the above update operation, they must merge three files, File 1, DV 2, and File 2, to construct the table’s snapshot.
Streaming Ingest and Consumption
The system can treat the Delta Lake log as the message queue. This removes the need for the dedicated message buses in many cases. The log supports the streaming pipelines with the following features:
Write Compaction: Streaming producers typically write data into small objects for faster ingest performance. Still, the small files slow down the consumer side because too many small objects need to be processed. Delta Lake lets users run a background process that compacts small data objects transactionally without affecting readers.
Exactly-Once Streaming Writes: Writers can leverage the
txn
action to track which data was written into a Delta Lake table to implement “exactly-once” writes.Efficient Log Tailing: Delta Lake effectively allows users to query new-arrival data; the naming convention of the .json log objects with lexicographically increasing IDs is the key here. It makes the consumers efficiently run the LIST operations starting at the last log record ID to discover new logs.
Layout Optimization
Delta Lake can support a variety of layout optimizations without affecting concurrent operations because it updates the data structures that represent a table transactionally. From this advantage, Databricks implements various physical layout optimization:
OPTIMIZE command: Users can leverage the OPTIMIZE command on a table to compact small objects without affecting executing transactions. This operation will make each data object 1 GB in size by default, and Databricks allows users to customize this target size.
Z-Ordering by Multiple Attributes: Delta Lake supports reorganizing the records in a table in Z-order for a given set of attributes to achieve locality along multiple dimensions (related data stored close together). Z-ordering works with data statistics to make the queries read less data. For more detail, Z-ordering will tend to make each data object contain a small range of the possible values in each chosen attribute so that the scanning data process can skip more data objects.
Auto Optimize: Users can set the AUTO OPTIMIZE property on the table to have Databricks’s cloud service automatically handle the compact process for newly written data.
Audit Logging
Delta Lake’s transaction logs can be used for audit logging based on commitInfo records.
Outro
Thank you for reading this far.
In this article, we covered the background and motivation behind Delta Lake, its format and read/write protocol, and its concurrency support. Next, we explored strategies for mutating data on Delta Lake and examined some of its data management features.
See you in my next blog!
References
[1] Databricks, Delta Lake: High-Performance ACID Table Storage over Cloud Object Stores, (2020)
[2] Jack Vanlightly, Understanding Delta Lake's consistency model (2024)
Before you leave
Leave a comment or contact me via LinkedIn or Email if you:
Are interested in this article and want to discuss it further.
Would you like to correct any mistakes in this article or provide feedback?
This includes writing mistakes like grammar and vocabulary. I happily admit that I'm not so proficient in English :D
It might take you five minutes to read, but it took me more than five days to prepare, so it would greatly motivate me if you considered subscribing to receive my writing.
Awesome..a bit more than 5 mins to read. Felt like reading a database concepts book!!