I spent 6 hours learning how Apache Spark plans the execution for us
Catalyst, Adaptive Query Execution, and how Airbnb leverages Spark 3.
My name is Vu Trinh, and I am a data engineer.
I’m trying to make my life less dull by spending time learning and researching “how it works“ in the data engineering field.
Here is a place where I share everything I’ve learned.
Not subscribe yet? Here you go:
Intro
In this article, we'll explore how Apache Spark plans and optimizes its internal execution processes. The article is structured as follows: first, we'll dive into SparkSQL; next, we'll examine the Adaptive Query Execution framework introduced in Spark 3; and finally, we'll review a real case study from Airbnb, demonstrating how they leverage this framework to enhance their data warehouse.
Spark SQL
In a 2015 paper, Apache Spark presented the new model Spark SQL (it was officially released in May 2014). This new module lets Spark programmers leverage the benefits of relational processing (e.g., declarative queries and optimized storage). Spark SQL introduces two significant enhancements. First, it provides a seamless integration between relational and procedural processing through a declarative DataFrame API that works smoothly with procedural Spark code. Second, it incorporates a highly extensible optimizer, Catalyst, which leverages Scala's features to facilitate the addition of composable rules and manage code generation.
Originally, Spark was designed as a general-purpose cluster computing engine offering a functional programming API. Users can manipulate distributed collections called Resilient Distributed Datasets (RDDs). Each RDD is a collection of Java or Python objects partitioned across a cluster.
The goals of SparkSQL are:
Support relational processing of Spark’s native RDDs and external data sources using a convenient API.
Offering high performance using DBMS techniques.
Efficiently supporting new data sources,
Enabling extension with advanced analytics algorithms such as graph processing and machine learning.
Spark SQL runs as a library on top of Spark and exposes SQL interfaces, which can be accessed via JDBC/ODBC or the DataFrame API.
The primary abstraction in Spark SQL’s API is a DataFrame, a distributed collection of rows with the same schema. A DataFrame is equivalent to a table in a relational database and can be manipulated similarly to RDDs. It keeps track of their schema and supports various relational operations that lead to more optimized execution.
DataFrames can be created from tables from external data sources or existing RDDs of native Java/Python objects. Once constructed, they can be manipulated with relational operators, such as where and groupBy. On the surface, DataFrames provide the same operations as relational query languages like SQL. However, it lets users express the processing logic using programming languages like Java, Spark, or Python.
Another important note: unlike the traditional dataframe APIs (e.g., Pandas Dataframe), Spark Dataframe is lazy. Each DataFrame object represents a logical plan to compute a dataset, but no execution occurs until the user calls a special output operation. (This is similar to the transformation and action in RDD.)
Next, we will examine Spark SQL’s Catalyst Optimizer in more detail. Spark's creator designed Catalyst based on functional programming constructs in Scala. Catalyst supports both rule-based and cost-based optimization.
Rule-Based Optimization (RBO): Rule-based optimization in databases relies on a set of predefined rules and heuristics to choose the execution plan for a query. These rules are usually based on the structure of the query, the operations involved, and the database schema. The optimizer applies these rules in a fixed order, without considering the actual data distribution or workload.
Cost-Based Optimization (CBO): Cost-based optimization, on the other hand, uses statistical information about the data—such as table size, index selectivity, and data distribution—to estimate the cost of various execution plans. The optimizer evaluates multiple potential plans and chooses the one with the lowest estimated cost. However, it requires accurate statistics available.
Essentially, Catalyst includes a general library for representing trees and applying rules to manipulate them. Developers can extend Catalyst to support desired external data sources and user-defined types.
When we define the DataFrame transformation logic, before the actual execution, it must go through an optimized process that contains four phases: analyzing a logical plan, optimizing the logical plan, physical planning, and finally, code generation.
We will visit each phase below:
Analysis: The first phase begins with a relation to be processed. It can come from the abstract syntax tree retrieved from the SQL parser or the DataFrame object defined using the DataFrame API. Both cases have unresolved attribute references or relations; is the column/table name valid? What is the columns’ type? Spark SQL uses Catalyst rules and a Catalog object to resolve these attributes. It starts by building an “unresolved logical plan” tree with unbound attributes and data types, then applies predefined rules to resolve attributes.
The Spark SQL Catalog object enables interaction with metadata for databases, tables, and functions. It allows users to list, retrieve, and manage these entities, as well as refresh table metadata to keep Spark's view in sync with underlying data sources. It’s a central interface for managing and querying Spark SQL metadata.
Logical Optimization: After resolving the logical plan, Spark applies standard rule-based optimizations, such as predicate pushdown, projection pruning, null propagation, and more.
Physical Planning: Spark SQL takes a logical plan and generates one or more physical plans. It will select the final plan using a cost model.
Code Generation: The final query optimization phase involves generating Java bytecode for execution. Spark SQL uses code generation to speed up processing, especially for CPU-bound in-memory datasets. Catalyst simplifies this by leveraging Scala’s quasiquotes, which allow programmatic construction of abstract syntax trees (ASTs) compiled into bytecode at runtime. Catalyst transforms SQL expressions into Scala ASTs, which are compiled and executed.
After understanding the SparkSQL original optimizer, we will move to the next section, which describes Spark's new optimization framework.
Spark 3 - Adaptive Query Execution
As mentioned in the above section, the Catalyst optimizer uses the cost model framework to choose the optimal plan at the end of physical planning. The framework collects and uses different data statistics (e.g., row count, number of distinct values, NULL values, max/min values, etc.) to help Spark choose the optimal plan.
However, what happens when the statistics are outdated or unavailable? This can lead to suboptimal query plans. In Apache Spark 3, released in 2020, Adaptive Query Execution (AQE) was introduced to tackle such problems with the ability to adjust query plans based on runtime statistics collected during the execution.
Spark operators are typically pipelined and executed in parallel processes. A shuffle or broadcast exchange breaks the pipeline into query stages, where each stage materializes intermediate results. The next stage can only begin once the previous stage is complete. This pause creates an opportunity for re-optimization, as data statistics from all partitions are available before the following operations start.
Let's overview the flow of the AQE framework:
The Adaptive Query Execution (AQE) framework starts by executing leaf stages, which do not depend on other stages. (reading data input)
Once one or more of these stages complete materialization, the framework marks them as complete in the physical query plan. It updates the logical query plan with runtime statistics from the completed stages.
The framework uses these new statistics to run the optimizer, applying a selected list of logical optimization rules, the physical planner, and physical optimization rules, including adaptive-execution-specific rules like coalescing partitions and skew join handling.
The AQE framework identifies and executes new query stages with the newly optimized plan.
The execute-reoptimize-execute process repeats until the entire query is completed.
Next, we will explore the three features of the AQE framework: dynamically coalescing shuffle partitions, dynamically switching join strategies, and dynamically optimizing skew joins.
Dynamically coalescing shuffle partitions.
Shuffle operations can significantly impact performance when running queries on large datasets in Spark. Shuffle is expensive because it requires moving data around to redistribute it for downstream operators. Thus, the number of partitions directly affects the shuffle performance. However, determining the optimal number in the first place is challenging:
Too few partitions can result in large partitions that may cause tasks to spill data to disk.
Too many partitions lead to small partitions, causing inefficient I/O with numerous small network data fetches.
To address this, the user can start with a relatively large number of shuffle partitions and then combine smaller adjacent partitions at runtime by analyzing shuffle file statistics with the help of AQE. This approach helps balance partition sizes and improve query performance. Let's visit the example below for a better understanding:
Dynamically switching join strategies.
Spark offers several join strategies, with broadcast hash join typically the most efficient when one side of the join comfortably fits in memory. Spark initially plans a broadcast hash join if it estimates that the join relation's size is below a specified threshold at the planning phase. However, this estimate can be off due to factors like highly selective filters or complex joint operations.
To address this, Adaptive Query Execution (AQE) now replans the join strategy at runtime based on the actual size of the join relation collected during the execution.
Dynamically optimizing skew joins.
Data skew happens when data is not distributed evenly across partitions in a cluster, which can impact query performance, especially during joins. The AQE skew joins optimization addresses this by automatically detecting skew from shuffle file statistics. It then splits the skewed partitions into smaller partitions.
If we don’t employ AQE, our application will have a straggler executor (the one that lags significantly behind the others), which increases the system's overall latency. With the optimization in place, the data from the straggler is split into smaller partitions and is in charge of more executors, which helps improve the overall performance.
We’ll examine how Airbnb uses Spark AQE to overcome and solve a specific problem.
Case study at Airbnb
Context
In a blog post in 2017, Airbnb shared that they migrated Data Warehouse (DW) storage from legacy HDFS clusters to S3 to provide better stability and scalability. At that time, Aribnb stated they had already leveraged Spark for data processing.
Despite ongoing improvements, the scalability and flexibility of our S3 workloads remain constrained by their inherent characteristics and underlying infrastructure. A 2022 article highlighted how they enhanced their data infrastructure using Apache Spark 3 and Apache Iceberg. In this section, I’ll focus specifically on Spark, examining how Airbnb utilized Spark’s Adaptive Query Execution to optimize data analytics and processing, particularly within their ingestion pipelines.
How does Spark 3 help?
Airbnb's Hive-based data ingestion framework processes over 35 billion Kafka messages and 1,000+ tables daily, handling datasets from kilobytes to terabytes in hourly and daily partitions.
The first step in migrating to the new tech stack was moving Hive queries to Spark. Unlike Hive, Spark uses predefined shuffle partition values; Airbnb found it challenging to choose the correct number of partitions for the event data ingestion framework due to varying data volumes.
A fixed number of shuffle partitions won't work for all events in the ingestion framework; it may be too large for some jobs and too small for others. Spark 3 AQE is the solution, especially the dynamically coalescing shuffle partitions feature.
It helps Airbnb dynamically adjust shuffle partitions during execution. The feature adjusts partition sizes at runtime to balance data across tasks, resulting in a significant performance boost.
It combines small partitions into larger ones at runtime, adapting to the changing size of shuffle data throughout the job. As data grows or shrinks between stages, AQE continuously coalesces partitions to maintain efficient processing.
Outro
This article explores how Spark evolved into a robust analytics engine with the introduction of SparkSQL, which allows users to define processing logic declaratively while the Catalyst optimizer finds the most efficient execution path. Spark then introduced Adaptive Query Execution (AQE), a framework that dynamically adjusts the execution plan based on workload during runtime.
Through Airbnb’s use of AQE, we’ll also delve into how this feature enhances their data ingestion pipelines.
I hope you find this article both enjoyable and informative.
See you in the next article!
References
[1] James Mayfield, Krishna Puttaswamy, Swaroop Jagadish, and Kevin Long, Data Infrastructure at Airbnb (2016)
[2] Ronnie Zhu, Edgar Rodriguez, Jason Xu, Gustavo Torres, Kerim Oktay, Xu Zhang, Upgrading Data Warehouse Infrastructure at Airbnb (2022)
[3] Michael Armbrust, Reynold S. Xin, Cheng Lian, Yin Huai, Davies Liu, Joseph K. Bradley, Xiangrui Meng, Tomer Kaftan, Michael J. Franklin, Ali Ghodsi, Matei Zaharia Spark SQL: Relational Data Processing in Spark (2015)
[4] Wenchen Fan, Herman van Hövell, MaryAnn Xue, Adaptive Query Execution: Speeding Up Spark SQL at Runtime (2020)
Before you leave
If you want to discuss this further, please leave a comment or contact me via LinkedIn, Email, or Twitter.
It might take you five minutes to read, but it took me more than five days to prepare, so it would greatly motivate me if you considered subscribing to receive my writing.
This article has helped me clear my doubts very well. Thank you very much.
Thanks for the article! Super helpful.