I spent 7 hours diving deep into Apache Iceberg
The more details on how everything works
My name is Vu Trinh, and I am a data engineer.
I’m trying to make my life less dull by spending time learning and researching “how it works“ in the data engineering field.
Here is a place where I share everything I’ve learned.
Not subscribe yet? Here you go:
Intro
After writing about the Apache Iceberg file format overview in this article, I decided to spend more time understanding its internals. This article includes all the lessons I learned after hours of reading books and experimenting with this file format using PySpark, PyIceberg, and the Nessie Catalog.
We will revisit the overview of Iceberg in the following sections.
Data Layer
This layer contains the actual table’s data and includes data and deleted files (present if the merge-on-read mode is chosen; more on this later). The data files store the table's records, while the delete files track rows that have been removed.
Apache Iceberg supports several file formats, including Apache Parquet, Apache ORC, and Apache Avro. In practice, Apache Parquet is the most commonly used format. I will use Parquet in the rest of this article when we go into the details of the Iceberg.
Metadata Layer
Iceberg organizes metadata as the tree architecture; the highest is the metadata files, then comes to the manifest lists, and the final is the manifest files. The metadata layer is crucial for managing large datasets and enabling key features like time travel and schema evolution.
Manifest Files
Manifest files keep track of data and delete files, as well as additional details and statistics about each file, such as the file format, the partition scheme, and the min/max, count, and null values for a data file’s columns.
In the Parquet file, some of these statistics are stored in the data files themselves (min/max of each column chunk); the reader has to open each Parquet file’s footer to find the needed statistic.
However, in Iceberge, a single manifest file stores these statistics for multiple Parquet data files, which means the reader only needs to open a single file to read the statistics for all the files tracked by this manifest file. This removes the need to open many data files and improves the read performance.
The engine records these statistics during the write operation.
Manifest Lists
Each Iceberg table snapshot is associated with a manifest list. It contains an array of structs. Each array's element keeps track of a single manifest file and includes information such as:
The manifest file’s location
The partition this manifest file belongs to
The upper and lower bounds of the non-null partition field values are calculated across the data files tracked by this manifest file.
Metadata files
These files store the Iceberg table’s metadata at a specific time, including information such as
The last sequence number tracks the order of snapshots in a table. This number is increased whenever the table changes.
The table update timestamp.
The table’s base location determines where to store data, manifests, and table metadata.
The table’s schema
The partition specification
Which snapshot is the current one
All snapshot information and its associated manifest lists.
The Catalog
All the requests must be routed through the catalog, which holds the current metadata pointer for each table. The catalog stores the location of both the current and previous metadata files, ensuring that the reader always accesses the most up-to-date information.
A critical requirement for an Iceberg catalog is supporting atomic operations when updating the metadata pointer. This ensures that all readers and writers interact with the table's consistent state at a particular time.
The following sections describe the details of Iceberge's read/write operations.
The write operation
When writing new data to the existing Iceberge table, the writer visits the catalog to get the current metadata file location. The writer read this file to understand the table's current schema and partition scheme to prepare for the later data writing.
After learning about these two pieces of information, the writer writes new data files following the partition scheme.
Then, the writer creates according to manifest files in Avro format. A manifest file contains the data file location plus the file’s statistics, such as the upper and lower bounds of a column and the null value counts. The writer computes the statistics during the writing process.
Next, the writer creates the manifest list to keep track of the manifest files. This file contains the manifest files’ location, the number of data files/rows added or deleted, the lower and upper bounds of the partition columns, etc.
Next, it writes the new metadata file with the latest snapshots and all previous snapshots. This file includes the table base location, manifest list location, snapshot ID, sequence number, updated timestamp, etc. The writer also marks the newly created snapshot as the current snapshot.
Finally, the writer updates the catalog's current pointer point to the newly created metadata file.
The read operation
The reader first visits the catalog to find the table's current metadata file location.
After retrieving the metadata file, the reader collects the table’s schema to prepare for the reading process. Then, it checks the table’s partition schemes to understand how the data is organized.
The next step is retrieving the snapshot that the reader wants to read. With a typical query, the current snapshot is selected. However, the older snapshot will be chosen for the time travel query when the user wants to read data in the previous state. Time-travel query can be executed by specifying the timestamp the application wants to read; Iceberg will look for snapshots older than that timestamp, which can be achieved thanks to the fact that the metadata file also stores the created timestamp of each snapshot. The query can also specify the snapshot ID directly.
After choosing the snapshot, the reader will locate the manifest list associated with that snapshot.
Then, the reader reads the manifest list to locate the manifest files’ location. It also collects the lower and upper bounds values in the partition column of each manifest file. Because a manifest file can keep track of multiple data files, these lower and upper values are calculated across those files. The reader can apply the partition filter to prune unnecessary manifest files.
After determining the needed manifest files, the reader opens each file to read. A single manifest file contains the information for all the data files it tracks; Iceberg represents each data file as an entry that records information such as the data file’s location, the lower/upper bound partition values for each file, the file’s format, the record count, etc.
When reading each entry, the reader can apply the partition pruning using the lower/upper bound partition values for each entry to prune the unneeded data files.
After navigating all the manifest files, the reader has all the data files it needs to read. It then starts to read all these files using the files’ path, also collected from the manifest file.
As you can see, the partition pruning process is carried out on two levels. This is possible because Iceberg records statistics about the partition column in both the manifest list and manifest files. The former is used to limit the needed manifest files, and the latter is used to limit the required data files.
When reading the Parquet data file, the reader can apply other query filters to limit the needed row groups and choose only necessary columns to read to avoid scanning the whole file.
The result is returned to the client.
The following sections describe some aspects related to the performance of the Iceberg table.
Compaction
Every change in the Iceberg table results in new data files. When reading the table, after determining the necessary data files, we must open each file to read the content and close it when done. This suggests that the process becomes less efficient as the number of files we read increases.
Imagine you have an Iceberge table partitioned by the updated timestamp with day granularity. An application frequently writes to this Iceberge table daily, resulting in many data files in a single partition. You must open and close all those files when you read this partition.
What if we combine all these files into a single file so we only need to open and scan one?
In Icerberg, periodically rewriting the data in all these small files into fewer, larger files is called compaction. The writer can write as many files as they want; the compaction process will rewrite those files into larger files so they can serve the reader more efficiently. Users can control the compacting process by specifying the compaction strategy, the filter to limit which files are rewritten, the target file size, etc.
Hidden Partitioning
Generally, partitioning a table using transformation on a column (e.g., partition by day requires transforming the timestamp expression to day) requires creating an extra column. Users have to use this exact column to benefit from the partition pruning.
For example, a table is partitioned by day, and every record must have an extra partition_day
column derived from the created_timestamp
column. When users query the table, they must filter on the exact partition_day
column so the query engine can only know which partitions it can skip. If the user isn’t aware of this and uses the created_timestamp
column instead, the query engine will scan the whole table.
However, the latter case is more common for data analysts or business users who want to answer an analytics question; they don’t need to know about the extra column used for technical purposes (partitioning).
This is where Iceberg’s hidden partitioning feature shines:
Instead of creating additional columns to partition based on transform values, Iceberg only records the transformation used on the column.
Thus, Iceberg stores less data in the storage because it doesn’t need to store extra columns.
Because the metadata records the transformation on the original column, the user can filter on that column, and the query engine will apply the transformation to prune the data.
Another challenge with traditional partitioning is that it relies on the physical structure of the files being laid out into subdirectories; changing how the table was partitioned required rewriting the whole table.
Apache Iceberg solves this problem by storing all the historical partition schemes. If the table is first partitioned by scheme A and then later partitioned by schema B, Iceberg exposes this information to the query engine to create two separate execution plans to evaluate the filter again with each partition scheme.
Given a table initially partitioned by the created_timestamp
field at a monthly granularity, the transformation month(created_timestamp)
is recorded as the first partitioning scheme. Later, the user updates the table to be partitioned by created_timestamp
at a daily granularity, with the transformation day(created_timestamp)
recorded as the second partitioning scheme.
Behind the scenes, the data is organized according to the partition scheme in place at the time of writing. For instance, the data is stored in monthly folders with month partitioning, whereas with day partitioning, it's organized into daily folders.
When the application queries this table using created_timestamp
, the query engine applies both the first and second transformations to created_timestamp
to enable partition pruning. Refer to the figure below to better understand this process.
Sorting
While partitioning helps organize data files based on the partition columns, Iceberg gives us more fine-grained control over how data is written to data files with sorting.
Given an Iceberg table partitioned by day, the data contains data from four cities—London, Milan, Paris, and Madrid—the user wants to query data from Milan on 2024-08-08. After the query engine prunes unnecessary partitions, it reads the relevant data files. For the 2024-08-08 partition, there are five files in total. Since data from all four cities is scattered across these files, the engine must open all five files to locate the Milan data. However, if the data were sorted by city, with Milan's data consolidated into two specific files, the query engine would only need to open those three files instead of all ten.
While reading data becomes more efficient with sorting, the process of writing data files may require additional effort due to the need to sort the data during writing. Moreover, to maintain global sorting across files, a compaction job is necessary to rewrite and sort the data across all files. This makes it crucial for users to carefully define the table’s sort order to leverage this optimization fully.
The best practice for determining the order based on Tabular:
Put columns most likely to be used in filters at the start of your write order, and use the lowest cardinality columns first.
End the order with a high cardinality column, like an ID or event timestamp.
Row-level updates
When writing to storage, data files are immutable and cannot be overwritten. Any changes or updates will create new data files, which enables benefits like snapshot isolation or time travel. In Iceberg, row-level updates are handled through two modes: copy-on-write and merge-on-read.
Copy-on-write (COW)
This mode is the default in Iceberg. If the table records are changed (updated or deleted), the data files associated with those records will be rewritten with the changes applied.
Pros: Fast reading; the reader only needs to read the data without merging it with deleted or updated files.
Cons: Slow writing; rewriting all data files will slow down updates, especially if they are too regular.
Merge-on-Read (MOR)
Instead of rewriting an entire data file, updates are made using the delete files, with changes tracked in separate files:
Deleting a record: The record is listed in a delete file; when the reader reads the table, it will merge the data and the delete file to decide which record to skip.
Updating a record: The modified record is also tracked in a delete file, and then the engine creates a new data file containing the record with the updated value. When reading the table, the engine will ignore the old version of the record thanks to the deleted file and use the new version in the new data file.
With MOR mode, there will be more files in the storage than in COW mode. To minimize the data reading cost, the user can run regular compression jobs behind the scenes to reduce the number of files.
The nature of MOR is letting the reader track which records need to be ignored in the future. There are two options to control this behavior:
Positional delete files: The delete file tracks rows to ignore based on their position, allowing readers to skip specific rows. While this minimizes the reading time, it increases the writing time since the writer must read the file to identify those positions.
Equality delete files: The delete files specify the deleted values; if the row has a field that matches this value, the row will be skipped. This option doesn’t require the writer to read the data file. However, it affects the reading performance because it needs to read the data file to compare each record to the deleted value.
Outro
Thank you for reading this far. As I haven't had the opportunity to work with Iceberg extensively in my career, my understanding of this table format may not be comprehensive, and I might have missed some details. If you notice any areas that need adjustment or improvement in this article, please feel free to leave a comment or reach out to me directly via my LinkedIn, Email, or Twitter.
References
[1] Tomer Shiran, Jason Hughes & Alex Merced, Apache Iceberg: The Definitive Guide (2024)
[2] Jason Hughes, Apache Iceberg: An Architectural Look Under the Covers
[3] Tabular, Setting table write order
Before you leave
If you want to discuss this further, please leave a comment or contact me via LinkedIn, Email, or Twitter.
It might take you five minutes to read, but it took me more than five days to prepare, so it would greatly motivate me if you considered subscribing to receive my writing.
It’s funny. Snowflake internally (and even more externally) take great efforts to hide the details. However, in doing so I find it harder to determine best practices.
Much of what you’ve written here is applicable to Snowflake under the hood - which is not surprising as the top tier computing research is a very small pool of incredibly intelligent people.
As Magamet said. Another great article. Thanks for sharing.
Hey, were you able to use hidden partitioning on object storage like S3? I tried it, but I had to explicitly include the 'modified' column in the query for the partitioning to work correctly. Other permutations I tried didn't seem to work.
example:
CREATE TABLE device_data_modelling.test
WITH (
format = 'PARQUET',
location = 's3://...,
table_type = 'ICEBERG',
partitioning = ARRAY['partition_date'],
is_external=False
)
I have to pass the partition_date in the select for this to recognise as the valid partitioning col