I spent 4 hours learning the architecture of BigQuery's storage engine
Vortex: The BigQuery's Stream-Oriented Storage Engine (Part 1)
My name is Vu Trinh, and I am a data engineer.
I’m trying to make my life less dull by spending time learning and researching “how it works“ in the data engineering field.
Here is a place where I share everything I’ve learned.
Not subscribe yet? Here you go:
Intro
This week, I decided to return to my biased cloud data warehouse—Google BigQuery. (It's simply because I've never used any cloud warehouse longer than BigQuery.)
We'll explore some cool stuff about BigQuery's storage engine from Google's recent paper, Vortex: A Stream-oriented Storage Engine For Big Data Analytics.
The paper offers numerous insights, so I'll divide them into two articles. You're reading the first part; the second part will be released next Tuesday.
Overview
The overall BigQuery architecture includes independent components for query execution, storage, a container management system, and a shuffler service:
Colossus: A distributed storage system that holds and stores data.
Dremel: The distributed query engine.
Borg is Google’s large-scale cluster management system that can reliably manage and orchestrate compute resources. (Borg is the predecessor of Kubernetes.) We will return to Borg when discussing the Vortex architecture.
Dedicate shuffle service: Dremel was inspired by the map-reduce paradigm to operate and manage the data shuffle between stages efficiently; Google built a separate shuffle service on top of disaggregated distributed memory. This service backs BigQuery and supports other services, such as Google Dataflow.
Recently, Google released a paper introducing us to the storage engine behind the Google BigQuery.
According to Wikipedia, a storage engine is a database management system software component that creates, reads, updates, and deletes data from a database.
The paper presented Vortex, a storage engine Google built to support real-time analytics in BigQuery. It is a storage system that supports streaming and batch data analytics. Instead of using infrastructure built for batch data to work with streaming, Google observe that it is better to create a storage system for streaming and then use it for batch. Vortex provides a highly distributed, synchronously replicated storage engine optimized for append-focused data ingestion. (This reminds me of ClickHouse MergeTree Storage Engine.)
I will dive into the Vortext concepts and architecture in the upcoming sections.
The Stream
In Vortex, a stream is an entity to which rows can be appended to the end. The stream identifier and its offset identify every row in. A table is an unordered set of streams. The clients can read a single stream concurrently at different offsets for the read operations. For the write operations, tens of thousands of clients can write to a table concurrently, each using their own stream.
Stream type
Vortex supports the following stream types:
UNBUFFERED: The system will return the successful response to the data append request only if the input data is durably committed to Vortex. Subsequent reads of this table are guaranteed to see these rows.
BUFFERED: The system will return the successful response when the input rows have been written to Vortex but haven’t been committed. Subsequent reads will not be able to see these rows until they are flushed.
PENDING: The input rows are not visible until the Stream is committed.
By providing different types of streams, Vortex unifies batch and stream data ingestion. With PENDING mode, Vortex guarantees ACID semantics on large transactions, which is common in batch ETL processes. With BUFFERED mode, Vortex guarantees the atomicity for smaller transactions, which is suitable for stream ingestion.
Typical process of writing data using the stream.
The first step for the client is to create a stream, which requires specifying the desired stream type.
After creating a stream, the clients will get the stream object, including the destination table schema. The clients then use this schema to serialize input data to a binary format (Vortex supports various data formats such as Protocol buffers or Avro). The clients can also input the row_offset here to specify the offset to which the input data is being appended.
The row_offet can ensure exactly-once semantics when more than one client tries to append to the same offset; only one will succeed. Clients can prioritize low latency over this guarantee by omitting the row_offset; the input data will only be written at the end of the stream (the default behavior.)
If the clients use the BUFFERED type, the data must be flushed. If the flush process returns success, it implies that all stream rows up to and including the row at row_offset have been committed.
With the PENDING type, data written to a stream remains invisible until the stream is committed. This mode is mainly used for batch data writing. The scenario involves multiple workers independently writing to the table concurrently. Each creates a PENDING stream, writes all its data to the stream, and reports completion to a coordinator node. Once the coordinator receives success confirmations from all workers, it issues a batch commit request to Vortex to commit all the streams atomically. This ensures the data in these streams becomes atomically visible to readers.
The final step is to finalize a stream. If the clients are done writing the data to the stream, they need to finalize the stream to prevent further appends.
Data Mutation
To implement data mutation in Vortex, the storage engine defines a virtual column called _CHANGE_TYPE in the table schema. The column specifies the type of ingested content in the stream. It can have three values associated with three actions: INSERT, UPSERT, and DELETE.
INSERT indicates that the row was appended to the table.
UPSERT indicates the row was updated if the row with the same existed or was inserted otherwise. (using a set of keys in the DML statement to check existed row)
DELETE indicates that all rows that match the set of keys in the DML statement will be deleted.
Note: While Google supports primary key(s) for BigQuery, it does not enforce uniqueness.
Concepts
Before moving to the Vortex architecture, let’s learn some metadata concepts:
Streamlets: A stream is divided into contiguous slices of rows called Streamlets, and a Stream is an ordered list of one or more Streamlets. To ensure data durability, each Streamlet is replicated within the region. Every Streamlet is stored in at least two Borg clusters in a BigQuery region. A Stream can only have one writable Streamlet at any given time, which is always the last Streamlet in the Stream.
Fragments: Each Streamlet is further split into contiguous blocks of rows called Fragments. Fragments typically are a range of rows inside a log file. Vortex stores log files in Colossus.
When reading about Streamlet and Fragment, I was remined of the Kafka internal in which topic is devide into paritions and each partition is replicated between broker, similar to the Vortex’ streamlet. Each Kafka partition of a topic corresponds to a logical log, which is implemented as a set of approximately same size segment files, similar to the Vortex’s Fragment.
Data formats: Vortex operates in two different data formats to optimize for two workloads. The write-optimized storage format (WOS) is the format in which data is written. The read-optimized storage format (ROS) is the format in which data is optimized for data reading. BigQuery tables use the Capacitor file format as ROS. Data is first written to the Vortex in the WOS format and later converted into the ROS by a dedicated service.
Based on my past research, using two different formats is somewhat similar to Hudi, which uses a row-oriented format (Avro) to achieve high-throughput data ingestion and later writes that data into a column-oriented format (Parquet) to achieve efficient data reading.
Now, let’s move on to the Vortex architecture.
Control Plane
The Vortex’s Control Plane is the Stream Metadata Server(s) (SMS). It handles the physical metadata of Streams, Streamlets, and Fragments. Google Cloud Spanner backs the SMS and stores the table’s logical metadata, such as the schema and parition/clustering information.
As described above, Vortex Stream provides an entry point for the client to append data to the table. More than one client can append to the table concurrently, and each client can append to its stream.
In most cases, a stream contains a single writable Streamlet. Still, Vortex creates an additional Streamlet whenever a Streamlet is closed due to moving the table to a new Borg cluster or Stream Server restarting.
Stream Server is the Vortex Data Plane and will be explore in the next section.
The SMS assigns a Streamlet to a specific Stream Server, which maintains a set of fragments for the Streamlet. When a Vortex client sends a request to append to a table, the SMS finds an available Stream (which has not been assigned to any client).
If no stream is available, Vortex creates a new stream and assigns it to a Stream Server. It then instructs the server to create the Streamlet. The SMS then responds to the client request with the Streamlet ID and the address of the Stream Server with the writable Streamlet. The client creates a long-lived connection to the Stream Server to append batches of rows to the Streamlet.
The seperation of metadata and data request reminded me of the architecture of HDFS.
Each BigQuery table is managed by a single SMS. When the SMS becomes unavailable, the system will redistribute the load by assigning the table to a new SMS.
Data Plane
The vortex data plane is the Stream Servers. A single server is responsible for a set of Streams and creates Fragments for those Streams. Besides the metadata from the SMS, the Stream Server has in-memory metadata about its Streams and Fragments. It persists the metadata by writing to a transaction log and periodically checkpointing it. The Stream Server stores Fragments, checkpoints, and transaction logs separately in Colossus. After creating a checkpoint, old logs and checkpoints are cleaned to free up space.
For each Fragment, the Stream Server’s in-memory metadata holds information like which Streamlet it belongs to, its size, its minimum and maximum record timestamp, whether it was finalized, its schema, and partition/clustering columns.
Each Borg cluster can have hundreds of Stream Servers. A specific Stream Server in a cluster can host Streamlets for any table that uses the cluster as its primary.
Stream data is replicated across two Borg clusters within a region. This leads to the bigger picture: a specific BigQuery table is managed by two Borg clusters in the same region. The first cluster serves as the primary, while the second acts as the secondary, to which failover occurs if the primary cluster becomes unavailable.
As mentioned above, the SMS assigns the Streamlet to the Stream Server. The SMS’s decision aims to balance CPU, memory, and network traffic load between the Stream Server. When the Stream Server gets the request to create a Streamlet from the SMS, it stores the Streamlet information in the metadata and tells the client that this Streamlet is ready to accept the append. Then, the client can send batches of append rows to the Stream Server.
Upon receiving the data row, the Stream Server appends it to the Streamlet’s latest Fragment. When the current Segment reaches a specific size, the Stream Server finalizes the Fragment and creates a new one. The Fragment’s max size needs to be carefully considered here. The reason is that the Fragment is the WOS-ROS conversion unit; its size is chosen to be small enough that the WOS-ROS conversion happens frequently (by the Storage Optimization Service), but if it is too small, the SS will need to manage a lot of Fragments metadata.
If the issue arises during the fragment writing process, the Stream Server finalizes the current fragment and retries to append it to the following fragment. If the following retries fail, the Stream Server finalizes the current Streamlet and marks the append request as failed. At this time, the client will request the SMS for a new Streamlet, which will most likely be placed on a different Stream Server to avoid a scenario when something weird happens with the previous Stream Server.
For the reading operation, the client can choose to read desired Fragments in a specific Streamlet because the Stream Server provides an API that returns the list of Streamlet’s Fragments and the the valid bytes to read from each Fragment.
Outro
Through this article, we’ve explored the overview and architecture of Vortex, BigQuery’s storage engine. See you in the second part of my series on Vortex, where I’ll dive deeper into how it handles data-related and system operations.
References
[1] Google, Vortex: A Stream-oriented Storage Engine For Big Data Analytics
Before you leave
If you want to discuss this further, please leave a comment or contact me via LinkedIn, Email, or Twitter.
It might take you five minutes to read, but it took me more than five days to prepare, so it would greatly motivate me if you considered subscribing to receive my writing.